Apache Kuduを10分で試す(3) Spark編

Kudu

Apache Kuduを10分で試すシリーズの3回目です。前回のブログに続き、今回は Spark からKuduにデータを書き込むチュートリアルを試してみましょう。

Kudu-Sparkのクイックスタートはこちらから参照できます。このチュートリアルではSparkを利用するため、Sparkのインストールが必要です。このチュートリアルではSparkをローカルマシンにインストールしていましたが、手元の環境(Mac)にはSparkがインストールされていたので、インストールの手順は省略します。

データの準備

今回のチュートリアルで利用するデータは「サンフランシスコ市交通局」が提供しているパブリックデータです。 https://data.sfgov.org/Transportation/Historical-raw-AVL-GPS-data/5fk7-ivit/data

日付ごとにgzipで圧縮したCSVファイル形式で提供されており、チュートリアルでは2013/1/1のデータをwgetでダウンロードして使用します。このデータセットはDOS形式の改行コード(CR+LF)のため、trコマンドを使用してUNIX形式(LF)に変換しています。

$wget http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz
gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \
  sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv

上記でダウンロード、および変換されたCSVファイルの先頭を表示して見ると次のようになりました。チュートリアルでは各列の意味には言及していませんが、データ分析をするのであればヘッダ行が参考になるかもしれません。

$head sfmtaAVLRawData01012013.csv
REV,REPORT_TIME,VEHICLE_TAG,LONGITUDE,LATITUDE,SPEED,HEADING,TRAIN_ASSIGNMENT,PREDICTABLEn1447,01/01/2013 00:00:53,03,-122.41201,37.79478,1.389,107.0,5901,1
1447,01/01/2013 00:02:23,03,-122.41201,37.79478,1.389,107.0,5901,1
1447,01/01/2013 00:03:53,03,-122.41201,37.79478,1.389,107.0,5901,0
1447,01/01/2013 00:05:23,03,-122.41201,37.79478,1.389,107.0,5901,0
1447,01/01/2013 00:06:53,03,-122.41201,37.79478,1.389,107.0,5901,0
1447,01/01/2013 00:08:23,03,-122.41201,37.79478,1.389,107.0,,0
1447,01/01/2013 00:09:53,03,-122.41201,37.79478,1.389,107.0,5901,1
1447,01/01/2013 00:11:23,03,-122.41201,37.79478,1.389,107.0,5901,1
1447,01/01/2013 00:12:50,03,-122.41201,37.79478,1.389,107.0,5901,1
1447,01/01/2013 00:14:23,03,-122.41201,37.79478,1.389,107.0,5901,1

Sparkシェルの開始

今回のチュートリアルではspark-shellを使っています。シェル起動時の引数に kudu-spark パッケージを指定します。

$ spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
Ivy Default Cache set to: /Users/kawasaki/.ivy2/cache
The jars for the packages stored in: /Users/kawasaki/.ivy2/jars
:: loading settings :: url = jar:file:/Users/kawasaki/anaconda/lib/python3.6/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.kudu#kudu-spark2_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0ed8057d-d68e-496a-ac17-3f7e066a9cba;1.0
	confs: [default]
	found org.apache.kudu#kudu-spark2_2.11;1.10.0 in central
:: resolution report :: resolve 195ms :: artifacts dl 3ms
	:: modules in use:
	org.apache.kudu#kudu-spark2_2.11;1.10.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0ed8057d-d68e-496a-ac17-3f7e066a9cba
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/5ms)
2019-08-29 19:19:28 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://172.16.64.17:4040
Spark context available as 'sc' (master = local[*], app id = local-1567073975570).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

spark-shellが起動しました。チュートリアルに従います。

CSVファイルのロード

先ほどダウンロードしたCSVファイルをロードします。上記で確認したように先ほどのCSVファイルにはヘッダ行があるので、オプションでヘッダを指定し、inferSchemaを指定して型の推論を行います。

:paste
val sfmta_raw = spark.sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("sfmtaAVLRawData01012013.csv")

sfmta_raw.printSchema
sfmta_raw.createOrReplaceTempView("sfmta_raw")
spark.sql("SELECT count(*) FROM sfmta_raw").show()
spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show()

1行目に :paste と書かれていますが、これらの行をコピペする場合は:pasteを含む全ての行をコピー&ペーストし、終わったら ctrl+Dを押してください。コピペしないのであれば、val から始まる行を全て入力します。なお、このコードでは次の操作を行なっています。

  1. CSVファイルをロード
  2. sfmta_rawという一時的なビューを作成
  3. SQLで行数をカウントして表示
  4. SQLで5件クエリして表示

次のような結果になりました。データは約85万件です。

// Exiting paste mode, now interpreting.

2019-08-29 19:21:57 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
root                                                                            
 |-- REV: integer (nullable = true)
 |-- REPORT_TIME: string (nullable = true)
 |-- VEHICLE_TAG: string (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- SPEED: double (nullable = true)
 |-- HEADING: double (nullable = true)
 |-- TRAIN_ASSIGNMENT: string (nullable = true)
 |-- PREDICTABLEn1447: integer (nullable = true)
 |-- 01/01/2013 00:00:53: string (nullable = true)
 |-- 03: string (nullable = true)
 |-- -122.41201: string (nullable = true)
 |-- 37.79478: string (nullable = true)
 |-- 1.389: string (nullable = true)
 |-- 107.0: string (nullable = true)
 |-- 5901: string (nullable = true)
 |-- 1: string (nullable = true)

+--------+                                                                      
|count(1)|
+--------+
|  859086|
+--------+

+----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+
| REV|        REPORT_TIME|VEHICLE_TAG| LONGITUDE|LATITUDE|SPEED|HEADING|TRAIN_ASSIGNMENT|PREDICTABLEn1447|01/01/2013 00:00:53|  03|-122.41201|37.79478|1.389|107.0|5901|   1|
+----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+
|1447|01/01/2013 00:02:23|         03|-122.41201|37.79478|1.389|  107.0|            5901|               1|               null|null|      null|    null| null| null|null|null|
|1447|01/01/2013 00:03:53|         03|-122.41201|37.79478|1.389|  107.0|            5901|               0|               null|null|      null|    null| null| null|null|null|
|1447|01/01/2013 00:05:23|         03|-122.41201|37.79478|1.389|  107.0|            5901|               0|               null|null|      null|    null| null| null|null|null|
|1447|01/01/2013 00:06:53|         03|-122.41201|37.79478|1.389|  107.0|            5901|               0|               null|null|      null|    null| null| null|null|null|
|1447|01/01/2013 00:08:23|         03|-122.41201|37.79478|1.389|  107.0|            null|               0|               null|null|      null|    null| null| null|null|null|
+----+-------------------+-----------+----------+--------+-----+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+

sfmta_raw: org.apache.spark.sql.DataFrame = [REV: int, REPORT_TIME: string ... 15 more fields]

データをKuduにロードする準備を行う

CSVファイルを正しくロードできたようなので、Kuduにデータを投入する前処理を行います。次のコードで行なっているのは主に次の2つです。

  1. REPORT_TIME列を文字列型からタイムスタンプ型に変換
  2. 主キーの列はNullを許容しないようにスキーマを変更
:paste
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = {
  val schema = df.schema
  // Modify [[StructField] for the specified columns.
  val newSchema = StructType(schema.map {
    case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m)
    case y: StructField => y
  })
  // Apply new schema to the DataFrame
  df.sqlContext.createDataFrame(df.rdd, newSchema)
}
val sftmta_time = sfmta_raw
  .withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss"))
val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG"))
sftmta_prep.printSchema
sftmta_prep.createOrReplaceTempView("sftmta_prep")
spark.sql("SELECT count(*) FROM sftmta_prep").show()
spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show()

このコードを同じようにシェルに貼り付けて Ctrl+Dを押して実行します。

スキーマを比較したところ、元のスキーマは次のようなデータ型(とnullable=true)でしたが

 |-- REPORT_TIME: string (nullable = true)
 |-- VEHICLE_TAG: string (nullable = true)

sftmta_prep DataFramでは次のように変更されていることが確認できます。(それ以外の結果は同じなので出力は割愛します)。

 |-- REPORT_TIME: timestamp (nullable = false)
 |-- VEHICLE_TAG: string (nullable = false)

Kuduにデータをロードする

まずKuduにテーブルを作成します。前回のチュートリアルではjshellを使って作成しましたが、今回はspark-shellで行います。次のコードを貼り付けて実行してください。

オリジナルのチュートリアルで紹介されているコードは字下げ文字にTabが使われており、そのまま貼り付けると失敗するのでご注意ください。

:paste
import collection.JavaConverters._
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext)

// Delete the table if it already exists.
if(kuduContext.tableExists("sfmta_kudu")) {
  kuduContext.deleteTable("sfmta_kudu")
}

kuduContext.createTable("sfmta_kudu", sftmta_prep.schema,
  /* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"),
  new CreateTableOptions()
    .setNumReplicas(3)
    .addHashPartitions(List("VEHICLE_TAG").asJava, 4))

ブラウザから localhost:8050 にアクセスし、テーブルが作成されたことを確認してみました。sfmta_kuduテーブルが一番下に出力されています。

作成したKuduのテーブルに事前準備したデータをインサートします。

このコードもチュートリアルで紹介されているコードをそのまま貼り付けると失敗するのでご注意ください。次のコードは修正済みなのでうまくいくはず。

:paste
kuduContext.insertRows(sftmta_prep, "sfmta_kudu")
// Create a DataFrame that points to the Kudu table we want to query.
val sfmta_kudu = spark.read
  .option("kudu.master", "localhost:7051,localhost:7151,localhost:7251")
  .option("kudu.table", "sfmta_kudu")
  // We need to use leader_only because Kudu on Docker currently doesn't
  // support Snapshot scans due to `--use_hybrid_clock=false`.
  .option("kudu.scanLocality", "leader_only")
  .format("kudu").load
sfmta_kudu.createOrReplaceTempView("sfmta_kudu")
spark.sql("SELECT count(*) FROM sfmta_kudu").show()
spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show()

結果は次のようになります。

+--------+                                                                      
|count(1)|
+--------+
|  859086|
+--------+

+-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+
|        REPORT_TIME|VEHICLE_TAG| REV| LONGITUDE|LATITUDE| SPEED|HEADING|TRAIN_ASSIGNMENT|PREDICTABLEn1447|01/01/2013 00:00:53|  03|-122.41201|37.79478|1.389|107.0|5901|   1|
+-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+
|2013-01-01 00:00:00|       5545|1447|-122.40904|37.79931| 3.056|  351.0|            4504|               0|               null|null|      null|    null| null| null|null|null|
|2013-01-01 00:00:01|       1479|1447|-122.44695| 37.7222|   0.0|   51.0|        MUNSCHED|               0|               null|null|      null|    null| null| null|null|null|
|2013-01-01 00:00:01|       1525|1447|-122.39674|37.79291|   0.0| 226.05|        JUNSCHED|               1|               null|null|      null|    null| null| null|null|null|
|2013-01-01 00:00:01|       1532|1447|-122.45711|37.75034|14.305| 218.33|        MUNSCHED|               1|               null|null|      null|    null| null| null|null|null|
|2013-01-01 00:00:01|       5556|1447|-122.44688|37.78351|   0.0|   81.0|            null|               0|               null|null|      null|    null| null| null|null|null|
+-------------------+-----------+----+----------+--------+------+-------+----------------+----------------+-------------------+----+----------+--------+-----+-----+----+----+

sfmta_kudu: org.apache.spark.sql.DataFrame = [REPORT_TIME: timestamp, VEHICLE_TAG: string ... 15 more fields]

Kuduにデータを挿入し、先ほどのCSVファイルと同様にデータをクエリすることができました。

データの読み込みとデータの変更

続いてのチュートリアルでは、データのクエリと変更を行います。

データのクエリ

車速が一番早いデータをクエリします。

spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show()

しかし、上記のクエリをそのまま実行すると全ての列が表示され、画面上で見にくいため、チュートリアルの出力例と同じになるように、クエリを以下のように変更しました。

spark.sql("SELECT report_time,vehicle_tag,longitude,latitude,speed,heading FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show(truncate=false)
+-------------------+-----------+----------+--------+------+-------+
|report_time |vehicle_tag|longitude |latitude|speed |heading|
+-------------------+-----------+----------+--------+------+-------+
|2013-01-01 06:39:02|5411 |-122.39688|37.76666|68.333|82.0 |
+-------------------+-----------+----------+--------+------+-------+

元々のチュートリアルでは上記の緯度と経度をGoogle mapで検索し、時速68マイルで東に向かっているバスだと記述されていました。(heading 82.0はどういう意味?)

ともあれ時速108キロはセンサーの異常だと思われるため、この5411のタグ(バス?)のデータを削除します。

データの変更

spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411")
kuduContext.deleteRows(toDelete, "sfmta_kudu")
spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()

上記のクエリを実行します。SQLでvehicle_tagが5411の条件のデータをクエリし、kuduContext.deleteRowsで削除を行います。

今気づいたけど、SQLで削除してないのは何か理由があるのかな?

結果、1169件あったデータが0件になりました。

scala> spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
+--------+                                                                      
|count(1)|
+--------+
|    1169|
+--------+


scala> val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411")
toDelete: org.apache.spark.sql.DataFrame = [REPORT_TIME: timestamp, VEHICLE_TAG: string ... 15 more fields]

scala> kuduContext.deleteRows(toDelete, "sfmta_kudu")
                                                                                
scala> spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+

CSVファイルやParquetの場合、ファイルの変更はできません。(別のファイルに新規に書き出すことは可能ですが)。

変更されないような大量なデータをParquet形式などでHDFSに保存しておいて分析することは問題ないでしょうが、直近で追加、変更されたデータをすぐに分析したい、という用途では不十分です。Kuduを使えば、変更された大量のデータもすぐに分析対象にすることができるので、ユースケースが広がりますね。

コメント