Apache SparkでApache Kuduを利用する

Apache Kudu 1.3

最近Apache Kuduのリードとライトの流れについてのブログが公開され、昨日は日本語でもウェビナーが開催されたりと盛り上がってきた「分散ストレージエンジン」のApache Kuduですが、SQLでのアクセスにはApache Impala (Incubating) との組み合わせが便利です。他方、Apache Sparkからも簡単に利用できます。

Kudu Quickstart VMを利用して、まずはチュートリアルにしたがって Impala からデータを投入し、クエリした後にSparkのDataFrameからアクセスしてみました。以下の手順は仮想マシン上で試せるほど簡単なので、プレミアムフライデーで帰る前に試せるぐらい簡単ですね。

1. Kudu Quickstart VMのチュートリアルを実行する

チュートリアルに従ってデータを作成しました。

なお、チュートリアルと結果が違うのは、データ投入中にDELETE の実験をしたからです。

[demo@quickstart ~]$ impala-shell 
Starting Impala Shell without Kerberos authentication
Connected to quickstart.cloudera:21000
Server version: impalad version 2.8.0-cdh5.11.0 RELEASE (build e09660de6b503a15f07e84b99b63e8e745854c34)
***********************************************************************************
Welcome to the Impala shell.
(Impala Shell v2.8.0-cdh5.11.0 (e09660d) built on Wed Apr  5 19:51:24 PDT 2017)

When pretty-printing is disabled, you can use the '--output_delimiter' flag to set
the delimiter for fields in the same row. The default is ','.
***********************************************************************************
[quickstart.cloudera:21000] > SELECT * FROM sfmta ORDER BY speed DESC LIMIT 1;
Query: select * FROM sfmta ORDER BY speed DESC LIMIT 1
Query submitted at: 2017-04-27 22:31:57 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=14bb0d30cdfc1d4:59d3e08a00000000
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| report_time | vehicle_tag | longitude          | latitude          | speed             | heading |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| 1357022016  | 8522        | -122.4538803100586 | 37.74539184570312 | 65.27799987792969 | 24      |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
Fetched 1 row(s) in 0.42s
[quickstart.cloudera:21000] > SELECT COUNT(*) FROM sfmta;
Query: select COUNT(*) FROM sfmta
Query submitted at: 2017-04-27 22:33:03 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=e4812eb162470dd:8d087b0200000000
+----------+
| count(*) |
+----------+
| 842279   |
+----------+
Fetched 1 row(s) in 0.20s
[quickstart.cloudera:21000] > 


2. Apache Spark からアクセスする

Spark統合の参考ページはこちら

なお、テーブル名は Kudu の webui で調べました。
http://quickstart.cloudera:8051/

$ spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0
(略)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
17/04/27 22:52:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc (master = local[*], app id = local-1493358775875).
17/04/27 22:52:57 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.

scala> import org.apache.kudu.spark.kudu._
import org.apache.kudu.spark.kudu._

scala> val df = sqlContext.read.options(Map("kudu.master=" -> "quickstart.cloudera:7051","kudu.table" -> "impala::default.sfmta")).kudu
df: org.apache.spark.sql.DataFrame = [report_time: bigint, vehicle_tag: int, longitude: float, latitude: float, speed: float, heading: float]

scala> df.count
res0: Long = 842279                                                             

scala> df.show(5)
+-----------+-----------+----------+--------+------+-------+
|report_time|vehicle_tag| longitude|latitude| speed|heading|
+-----------+-----------+----------+--------+------+-------+
| 1356998401|       1479|-122.44695| 37.7222|   0.0|   51.0|
| 1356998401|       1525|-122.39674|37.79291|   0.0| 226.05|
| 1356998401|       1532|-122.45711|37.75034|14.305| 218.33|
| 1356998401|       1549|-122.43752| 37.7611|   0.0|  49.24|
| 1356998401|       5453|-122.42894|37.76705|   0.0|    0.0|
+-----------+-----------+----------+--------+------+-------+
only showing top 5 rows


scala> df.sort($"speed".desc).select("longitude","latitude","speed").show(3)
+----------+--------+------+
| longitude|latitude| speed|
+----------+--------+------+
|-122.45388|37.74539|65.278|
|-122.48861| 37.9074|  62.5|
|-122.48026|37.89245|55.833|
+----------+--------+------+
only showing top 3 rows

まとめ

SQLでアクセスしたい場合はImpalaから行うと便利ですが、データエンジニアリングを行いたい場合など、SQL以外でのアクセスにはSparkも便利ですね。

そうそう、Apache Sparkのパフォーマンスのベンチマーク結果を翻訳して公開しました。興味がある方はこちらもどうぞ。

Apache Impala、同時実行時の速度において従来の分析データベースやSQL on Hadoopに圧勝

 

Cloudera 5.11リリース

Cloudera 5.11が出たので久しぶりの更新です。

Hadoopも11年目を迎え、日本でも幅広い業種で使われるようになりました。数年前と比較すると圧倒的に使いやすくなっているので、過去に評価してそのままになっている方、既存の基盤で困っている方、そろそろ改めて検証しなおしてみてはいかがでしょうか?

例えば数年前と比べると、 (続きを読む)

Hadoop3.0の新機能!DataNodeディスク間バランサーを試す

DataNodeディスク間バランサー

昨日紹介したHDFSの再バランスは、DataNode間でディスク使用量を平準化するための機能でした。一方で、DataNodeが複数のディスクを持っている場合、それぞれのディスクの使用量のバランスが崩れてしまうこともあります。一例として、データの削除が激しく行われたり、もっとわかりやすい例は新しいディスクを増設した場合です。このように偏りが生じるとI/Oが特定のディスクに偏ってしまい、効率が良くありません。

(続きを読む)

HDFSの再バランス

HDFSには、DataNode間でディスク使用量の再バランスを行う balaner 機能があります。また、Hadoop3.xで導入される予定の、DataNode内のディスク間のリバランス機能(Disk Balancer)もあります。(この機能はCDH5.8.2以降でも利用できます)

今回はノード間の再バランスについて、次回は新しいディスク間のバランス機能について紹介する予定です。

(続きを読む)

Cloudera Enterprise 5.8 リリース!

本日 Cloudera Enterprise 5.8 (CDH 5.8, Cloudera Manager 5.8)がリリースされました!リリースノートはこちら

今回のバージョンではCloudera Navigator OptimizerのGA、セキュリティ、およびImpalaとHueのバージョンアップが目玉のようです。前回同様に品質向上に力を入れているようなので、本番クラスタで使うにも安心ですね。

Clouderaの技術ブログから、個人的に興味深いものを抜粋してみます。(特に注目は赤字)

  • パフォーマンス&スケーラビリティ
    • ImpalaのパフォーマンスがKerberos環境で3倍向上
  • ユーザビリティと管理
    • Cloudera Navigator OptimizerがGA
    • HueのSQLエディタが新しいデザインに
    • Hiveのジョブがセキュリティポリシーに基づいたYARNのリソースプールで実行
  • クラウド対応
    • Apache SentryがAmazon S3に対応
    • ImpalaがAmazon S3の読み書きができるように
  • OSSコンポーネントのバージョンアップ

週末にでも愛用のCloudera Quickstart VMをアップデートしてみましょうかね…

(更新)プレスリリースも出ていました

http://www.cloudera.com/about-cloudera/press-center/press-releases/2016-07-21-cloudera-eases-the-path-for-analytics-on-apache-hadoop-with-cloudera-enterprise-5-8.html

セキュアなHadoopクラスタ:Cloudera ManagerでのKerberos化

前回はCloudera Managerを使ってCloudera Quickstart VMを利用できるようにしました。今回はKerberosを使用してセキュアなクラスタ化を行います。

セキュア化の背景

Hadoopのメリットの一つはデータを一箇所に溜めておき、そのデータに対して様々なアプローチができることです。(データレイク、と呼ばれることも多い) (続きを読む)

Cloudera Quickstart VM 5.7 を使って見る

前回アップグレードが完了したCloudera Quickstart VM。ふと見ると本家に5.7が.. orz…

cm57quickstarthttp://www.cloudera.com/downloads/quickstart_vms/5-7.html

前回アップグレードした版、あるいはこのver 5.7のVMのどちらを使っても同じ結果になるはずですが、せっかくなので5.7をダウンロードし、新しい仮想マシンを使ってみます。 (続きを読む)