Apache Kuduを10分で試すシリーズの3回目です。前回のブログに続き、今回は Spark からKuduにデータを書き込むチュートリアルを試してみましょう。
Kudu-Sparkのクイックスタートはこちらから参照できます。このチュートリアルではSparkを利用するため、Sparkのインストールが必要です。このチュートリアルではSparkをローカルマシンにインストールしていましたが、手元の環境(Mac)にはSparkがインストールされていたので、インストールの手順は省略します。
データの準備
今回のチュートリアルで利用するデータは「サンフランシスコ市交通局」が提供しているパブリックデータです。 https://data.sfgov.org/Transportation/Historical-raw-AVL-GPS-data/5fk7-ivit/data
日付ごとにgzipで圧縮したCSVファイル形式で提供されており、チュートリアルでは2013/1/1のデータをwgetでダウンロードして使用します。このデータセットはDOS形式の改行コード(CR+LF)のため、trコマンドを使用してUNIX形式(LF)に変換しています。
$wget http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \ sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv
上記でダウンロード、および変換されたCSVファイルの先頭を表示して見ると次のようになりました。チュートリアルでは各列の意味には言及していませんが、データ分析をするのであればヘッダ行が参考になるかもしれません。
$head sfmtaAVLRawData01012013.csv REV,REPORT_TIME,VEHICLE_TAG,LONGITUDE,LATITUDE,SPEED,HEADING,TRAIN_ASSIGNMENT,PREDICTABLEn1447,01/01/2013 00:00:53,03,-122.41201,37.79478,1.389,107.0,5901,1 1447,01/01/2013 00:02:23,03,-122.41201,37.79478,1.389,107.0,5901,1 1447,01/01/2013 00:03:53,03,-122.41201,37.79478,1.389,107.0,5901,0 1447,01/01/2013 00:05:23,03,-122.41201,37.79478,1.389,107.0,5901,0 1447,01/01/2013 00:06:53,03,-122.41201,37.79478,1.389,107.0,5901,0 1447,01/01/2013 00:08:23,03,-122.41201,37.79478,1.389,107.0,,0 1447,01/01/2013 00:09:53,03,-122.41201,37.79478,1.389,107.0,5901,1 1447,01/01/2013 00:11:23,03,-122.41201,37.79478,1.389,107.0,5901,1 1447,01/01/2013 00:12:50,03,-122.41201,37.79478,1.389,107.0,5901,1 1447,01/01/2013 00:14:23,03,-122.41201,37.79478,1.389,107.0,5901,1
Sparkシェルの開始
今回のチュートリアルではspark-shellを使っています。シェル起動時の引数に kudu-spark パッケージを指定します。
$ spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0 Ivy Default Cache set to: /Users/kawasaki/.ivy2/cache The jars for the packages stored in: /Users/kawasaki/.ivy2/jars :: loading settings :: url = jar:file:/Users/kawasaki/anaconda/lib/python3.6/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.kudu#kudu-spark2_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-0ed8057d-d68e-496a-ac17-3f7e066a9cba;1.0 confs: [default] found org.apache.kudu#kudu-spark2_2.11;1.10.0 in central :: resolution report :: resolve 195ms :: artifacts dl 3ms :: modules in use: org.apache.kudu#kudu-spark2_2.11;1.10.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 1 | 0 | 0 | 0 || 1 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-0ed8057d-d68e-496a-ac17-3f7e066a9cba confs: [default] 0 artifacts copied, 1 already retrieved (0kB/5ms) 2019-08-29 19:19:28 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://172.16.64.17:4040 Spark context available as 'sc' (master = local[*], app id = local-1567073975570). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92) Type in expressions to have them evaluated. Type :help for more information. scala>
spark-shellが起動しました。チュートリアルに従います。
CSVファイルのロード
先ほどダウンロードしたCSVファイルをロードします。上記で確認したように先ほどのCSVファイルにはヘッダ行があるので、オプションでヘッダを指定し、inferSchemaを指定して型の推論を行います。
:paste val sfmta_raw = spark.sqlContext.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load("sfmtaAVLRawData01012013.csv") sfmta_raw.printSchema sfmta_raw.createOrReplaceTempView("sfmta_raw") spark.sql("SELECT count(*) FROM sfmta_raw").show() spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show()
1行目に :paste と書かれていますが、これらの行をコピペする場合は:pasteを含む全ての行をコピー&ペーストし、終わったら ctrl+Dを押してください。コピペしないのであれば、val から始まる行を全て入力します。なお、このコードでは次の操作を行なっています。
- CSVファイルをロード
- sfmta_rawという一時的なビューを作成
- SQLで行数をカウントして表示
- SQLで5件クエリして表示
次のような結果になりました。データは約85万件です。
// Exiting paste mode, now interpreting. 2019-08-29 19:21:57 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException root |-- REV: integer (nullable = true) |-- REPORT_TIME: string (nullable = true) |-- VEHICLE_TAG: string (nullable = true) |-- LONGITUDE: double (nullable = true) |-- LATITUDE: double (nullable = true) |-- SPEED: double (nullable = true) |-- HEADING: double (nullable = true) |-- TRAIN_ASSIGNMENT: string (nullable = true) |-- PREDICTABLEn1447: integer (nullable = true) |-- 01/01/2013 00:00:53: string (nullable = true) |-- 03: string (nullable = true) |-- -122.41201: string (nullable = true) |-- 37.79478: string (nullable = true) |-- 1.389: string (nullable = true) |-- 107.0: string (nullable = true) |-- 5901: string (nullable = true) |-- 1: string (nullable = true) +--------+ |count(1)| +--------+ | 859086| +--------+ +----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ | REV| REPORT_TIME|VEHICLE_TAG| LONGITUDE|LATITUDE|SPEED|HEADING|TRAIN_ASSIGNMENT|PREDICTABLEn1447|01/01/2013 00:00:53| 03|-122.41201|37.79478|1.389|107.0|5901| 1| +----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ |1447|01/01/2013 00:02:23| 03|-122.41201|37.79478|1.389| 107.0| 5901| 1| null|null| null| null| null| null|null|null| |1447|01/01/2013 00:03:53| 03|-122.41201|37.79478|1.389| 107.0| 5901| 0| null|null| null| null| null| null|null|null| |1447|01/01/2013 00:05:23| 03|-122.41201|37.79478|1.389| 107.0| 5901| 0| null|null| null| null| null| null|null|null| |1447|01/01/2013 00:06:53| 03|-122.41201|37.79478|1.389| 107.0| 5901| 0| null|null| null| null| null| null|null|null| |1447|01/01/2013 00:08:23| 03|-122.41201|37.79478|1.389| 107.0| null| 0| null|null| null| null| null| null|null|null| +----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ sfmta_raw: org.apache.spark.sql.DataFrame = [REV: int, REPORT_TIME: string ... 15 more fields]
データをKuduにロードする準備を行う
CSVファイルを正しくロードできたようなので、Kuduにデータを投入する前処理を行います。次のコードで行なっているのは主に次の2つです。
- REPORT_TIME列を文字列型からタイムスタンプ型に変換
- 主キーの列はNullを許容しないようにスキーマを変更
:paste import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = { val schema = df.schema // Modify [[StructField] for the specified columns. val newSchema = StructType(schema.map { case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m) case y: StructField => y }) // Apply new schema to the DataFrame df.sqlContext.createDataFrame(df.rdd, newSchema) } val sftmta_time = sfmta_raw .withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss")) val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG")) sftmta_prep.printSchema sftmta_prep.createOrReplaceTempView("sftmta_prep") spark.sql("SELECT count(*) FROM sftmta_prep").show() spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show()
このコードを同じようにシェルに貼り付けて Ctrl+Dを押して実行します。
スキーマを比較したところ、元のスキーマは次のようなデータ型(とnullable=true)でしたが
|-- REPORT_TIME: string (nullable = true) |-- VEHICLE_TAG: string (nullable = true)
sftmta_prep DataFramでは次のように変更されていることが確認できます。(それ以外の結果は同じなので出力は割愛します)。
|-- REPORT_TIME: timestamp (nullable = false) |-- VEHICLE_TAG: string (nullable = false)
Kuduにデータをロードする
まずKuduにテーブルを作成します。前回のチュートリアルではjshellを使って作成しましたが、今回はspark-shellで行います。次のコードを貼り付けて実行してください。

オリジナルのチュートリアルで紹介されているコードは字下げ文字にTabが使われており、そのまま貼り付けると失敗するのでご注意ください。
:paste import collection.JavaConverters._ import org.apache.kudu.client._ import org.apache.kudu.spark.kudu._ val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext) // Delete the table if it already exists. if(kuduContext.tableExists("sfmta_kudu")) { kuduContext.deleteTable("sfmta_kudu") } kuduContext.createTable("sfmta_kudu", sftmta_prep.schema, /* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"), new CreateTableOptions() .setNumReplicas(3) .addHashPartitions(List("VEHICLE_TAG").asJava, 4))
ブラウザから localhost:8050 にアクセスし、テーブルが作成されたことを確認してみました。sfmta_kuduテーブルが一番下に出力されています。
作成したKuduのテーブルに事前準備したデータをインサートします。

このコードもチュートリアルで紹介されているコードをそのまま貼り付けると失敗するのでご注意ください。次のコードは修正済みなのでうまくいくはず。
:paste kuduContext.insertRows(sftmta_prep, "sfmta_kudu") // Create a DataFrame that points to the Kudu table we want to query. val sfmta_kudu = spark.read .option("kudu.master", "localhost:7051,localhost:7151,localhost:7251") .option("kudu.table", "sfmta_kudu") // We need to use leader_only because Kudu on Docker currently doesn't // support Snapshot scans due to `--use_hybrid_clock=false`. .option("kudu.scanLocality", "leader_only") .format("kudu").load sfmta_kudu.createOrReplaceTempView("sfmta_kudu") spark.sql("SELECT count(*) FROM sfmta_kudu").show() spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show()
結果は次のようになります。
+--------+ |count(1)| +--------+ | 859086| +--------+ +-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ | REPORT_TIME|VEHICLE_TAG| REV| LONGITUDE|LATITUDE| SPEED|HEADING|TRAIN_ASSIGNMENT|PREDICTABLEn1447|01/01/2013 00:00:53| 03|-122.41201|37.79478|1.389|107.0|5901| 1| +-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ |2013-01-01 00:00:00| 5545|1447|-122.40904|37.79931| 3.056| 351.0| 4504| 0| null|null| null| null| null| null|null|null| |2013-01-01 00:00:01| 1479|1447|-122.44695| 37.7222| 0.0| 51.0| MUNSCHED| 0| null|null| null| null| null| null|null|null| |2013-01-01 00:00:01| 1525|1447|-122.39674|37.79291| 0.0| 226.05| JUNSCHED| 1| null|null| null| null| null| null|null|null| |2013-01-01 00:00:01| 1532|1447|-122.45711|37.75034|14.305| 218.33| MUNSCHED| 1| null|null| null| null| null| null|null|null| |2013-01-01 00:00:01| 5556|1447|-122.44688|37.78351| 0.0| 81.0| null| 0| null|null| null| null| null| null|null|null| +-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+ sfmta_kudu: org.apache.spark.sql.DataFrame = [REPORT_TIME: timestamp, VEHICLE_TAG: string ... 15 more fields]
Kuduにデータを挿入し、先ほどのCSVファイルと同様にデータをクエリすることができました。
データの読み込みとデータの変更
続いてのチュートリアルでは、データのクエリと変更を行います。
データのクエリ
車速が一番早いデータをクエリします。
spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show()
しかし、上記のクエリをそのまま実行すると全ての列が表示され、画面上で見にくいため、チュートリアルの出力例と同じになるように、クエリを以下のように変更しました。
spark.sql("SELECT report_time,vehicle_tag,longitude,latitude,speed,heading FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show(truncate=false)
+-------------------+-----------+----------+--------+------+-------+
|report_time |vehicle_tag|longitude |latitude|speed |heading|
+-------------------+-----------+----------+--------+------+-------+
|2013-01-01 06:39:02|5411 |-122.39688|37.76666|68.333|82.0 |
+-------------------+-----------+----------+--------+------+-------+
元々のチュートリアルでは上記の緯度と経度をGoogle mapで検索し、時速68マイルで東に向かっているバスだと記述されていました。(heading 82.0はどういう意味?)
ともあれ時速108キロはセンサーの異常だと思われるため、この5411のタグ(バス?)のデータを削除します。
データの変更
spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show() val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411") kuduContext.deleteRows(toDelete, "sfmta_kudu") spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
上記のクエリを実行します。SQLでvehicle_tagが5411の条件のデータをクエリし、kuduContext.deleteRowsで削除を行います。

今気づいたけど、SQLで削除してないのは何か理由があるのかな?
結果、1169件あったデータが0件になりました。
scala> spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show() +--------+ |count(1)| +--------+ | 1169| +--------+ scala> val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411") toDelete: org.apache.spark.sql.DataFrame = [REPORT_TIME: timestamp, VEHICLE_TAG: string ... 15 more fields] scala> kuduContext.deleteRows(toDelete, "sfmta_kudu") scala> spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show() +--------+ |count(1)| +--------+ | 0| +--------+
CSVファイルやParquetの場合、ファイルの変更はできません。(別のファイルに新規に書き出すことは可能ですが)。
変更されないような大量なデータをParquet形式などでHDFSに保存しておいて分析することは問題ないでしょうが、直近で追加、変更されたデータをすぐに分析したい、という用途では不十分です。Kuduを使えば、変更された大量のデータもすぐに分析対象にすることができるので、ユースケースが広がりますね。
コメント