Apache Spark 2.xでKuduを利用する

以前のブログ(Apache SparkでApache Kuduを利用する)の Spark2.x版です。前回のブログからあまり変わっていませんが、前回のブログの手順はSpark2.xで動作しなかったという話を聞いたのでアップデートしておきます。
なお、昨日 Kudu 1.7 がリリースされました!今回の特徴は

  • (ついに)DECIMAL型をサポート!
  • 可用性とパフォーマンスの改善

などです。


“Apache Spark 2.xでKuduを利用する” の続きを読む

Apache SparkでApache Kuduを利用する

Apache Kudu 1.3

最近Apache Kuduのリードとライトの流れについてのブログが公開され、昨日は日本語でもウェビナーが開催されたりと盛り上がってきた「分散ストレージエンジン」のApache Kuduですが、SQLでのアクセスにはApache Impala (Incubating) との組み合わせが便利です。他方、Apache Sparkからも簡単に利用できます。
Kudu Quickstart VMを利用して、まずはチュートリアルにしたがって Impala からデータを投入し、クエリした後にSparkのDataFrameからアクセスしてみました。以下の手順は仮想マシン上で試せるほど簡単なので、プレミアムフライデーで帰る前に試せるぐらい簡単ですね。

1. Kudu Quickstart VMのチュートリアルを実行する

チュートリアルに従ってデータを作成しました。
なお、チュートリアルと結果が違うのは、データ投入中にDELETE の実験をしたからです。
[code]
[demo@quickstart ~]$ impala-shell
Starting Impala Shell without Kerberos authentication
Connected to quickstart.cloudera:21000
Server version: impalad version 2.8.0-cdh5.11.0 RELEASE (build e09660de6b503a15f07e84b99b63e8e745854c34)
***********************************************************************************
Welcome to the Impala shell.
(Impala Shell v2.8.0-cdh5.11.0 (e09660d) built on Wed Apr 5 19:51:24 PDT 2017)
When pretty-printing is disabled, you can use the ‘–output_delimiter’ flag to set
the delimiter for fields in the same row. The default is ‘,’.
***********************************************************************************
[quickstart.cloudera:21000] > SELECT * FROM sfmta ORDER BY speed DESC LIMIT 1;
Query: select * FROM sfmta ORDER BY speed DESC LIMIT 1
Query submitted at: 2017-04-27 22:31:57 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=14bb0d30cdfc1d4:59d3e08a00000000
+————-+————-+——————–+——————-+——————-+———+
| report_time | vehicle_tag | longitude | latitude | speed | heading |
+————-+————-+——————–+——————-+——————-+———+
| 1357022016 | 8522 | -122.4538803100586 | 37.74539184570312 | 65.27799987792969 | 24 |
+————-+————-+——————–+——————-+——————-+———+
Fetched 1 row(s) in 0.42s
[quickstart.cloudera:21000] > SELECT COUNT(*) FROM sfmta;
Query: select COUNT(*) FROM sfmta
Query submitted at: 2017-04-27 22:33:03 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=e4812eb162470dd:8d087b0200000000
+———-+
| count(*) |
+———-+
| 842279 |
+———-+
Fetched 1 row(s) in 0.20s
[quickstart.cloudera:21000] >
[/code]

2. Apache Spark からアクセスする

Spark統合の参考ページはこちら
なお、テーブル名は Kudu の webui で調べました。
http://quickstart.cloudera:8051/
[code]
$ spark-shell –packages org.apache.kudu:kudu-spark_2.10:1.1.0
(略)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
17/04/27 22:52:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Spark context available as sc (master = local[*], app id = local-1493358775875).
17/04/27 22:52:57 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.
scala> import org.apache.kudu.spark.kudu._
import org.apache.kudu.spark.kudu._
scala> val df = sqlContext.read.options(Map("kudu.master=" -> "quickstart.cloudera:7051","kudu.table" -> "impala::default.sfmta")).kudu
df: org.apache.spark.sql.DataFrame = [report_time: bigint, vehicle_tag: int, longitude: float, latitude: float, speed: float, heading: float]
scala> df.count
res0: Long = 842279
scala> df.show(5)
+———–+———–+———-+——–+——+——-+
|report_time|vehicle_tag| longitude|latitude| speed|heading|
+———–+———–+———-+——–+——+——-+
| 1356998401| 1479|-122.44695| 37.7222| 0.0| 51.0|
| 1356998401| 1525|-122.39674|37.79291| 0.0| 226.05|
| 1356998401| 1532|-122.45711|37.75034|14.305| 218.33|
| 1356998401| 1549|-122.43752| 37.7611| 0.0| 49.24|
| 1356998401| 5453|-122.42894|37.76705| 0.0| 0.0|
+———–+———–+———-+——–+——+——-+
only showing top 5 rows
scala> df.sort($"speed".desc).select("longitude","latitude","speed").show(3)
+———-+——–+——+
| longitude|latitude| speed|
+———-+——–+——+
|-122.45388|37.74539|65.278|
|-122.48861| 37.9074| 62.5|
|-122.48026|37.89245|55.833|
+———-+——–+——+
only showing top 3 rows
[/code]

まとめ

SQLでアクセスしたい場合はImpalaから行うと便利ですが、データエンジニアリングを行いたい場合など、SQL以外でのアクセスにはSparkも便利ですね。
そうそう、Apache Sparkのパフォーマンスのベンチマーク結果を翻訳して公開しました。興味がある方はこちらもどうぞ。

Apache Impala、同時実行時の速度において従来の分析データベースやSQL on Hadoopに圧勝

 

SparkのWeb UIを調べてみた

SparkのWeb UIに記載されている項目の意味について(日本語で)まとまっている情報がなかったのでまとめてみました。(Spark 1.6ベース)
Spark 2.xへの対応と、SparkSQL、SparkStreamingは別途記載する予定。
間違いを見つけたらコメントお願いします。m(__)m

下記のスクリーンショットは原則としてDFSReadWriteTestサンプルを用いて実行し、網羅できない範囲はサンプルコードを用いて実行しています。スクリンショットをクリックするとポップアップします。

目次

1. メイン画面
2. ジョブの詳細
3. ステージの詳細
4. 永続化に関する情報 (Storage)
5. 環境 (Environment)
6. エグゼキュタ (Executors)
7. 参考資料

Sparkで取得したログを別のマシンのWeb UIで見る

#以前qiitaに投稿した記事を一部更新して再投稿しています
Apache SparkのWebUIはアプリケーションの開発時のデバッグにとても役立ちます。
sparkhistory7.png
しかし、本番運用に入ってしまうと、簡単にはWeb UIにアクセスできなかったり、問題が発生してから時間が過ぎてしまい見たかった情報が流れてしまうこともあります。
“Sparkで取得したログを別のマシンのWeb UIで見る” の続きを読む

Cloudera Quickstart VM 5.7 を使って見る

前回アップグレードが完了したCloudera Quickstart VM。ふと見ると本家に5.7が.. orz…
cm57quickstarthttp://www.cloudera.com/downloads/quickstart_vms/5-7.html
前回アップグレードした版、あるいはこのver 5.7のVMのどちらを使っても同じ結果になるはずですが、せっかくなので5.7をダウンロードし、新しい仮想マシンを使ってみます。 “Cloudera Quickstart VM 5.7 を使って見る” の続きを読む

Sparkの日本語ドキュメント

Spark 1.5.0 の日本語版ドキュメント。何かと参考になると思うのでメモ。
http://mogile.web.fc2.com/spark/index.html
誰が翻訳しているのかわかりませんが、このようなドキュメントはありがたいですね。OSSにはソースやパッチを書いたりするだけでなく、このような形での貢献の仕方も重要かと。