Apache Kuduを10分で試す(2) NiFi編

Kudu

前回のブログに続き、今回は NiFi からKuduにデータを書き込むチュートリアルを試してみましょう。

NiFiはNSA(国家安全保障局)が開発し、オープンソースとしてApacheに寄贈されたソフトウェアです。複雑なデータフローをビジュアルユーザーインターフェースで作成でき、データを生成元からターゲットに安全に着地させることができます。(参考: https://www.slideshare.net/KojiKawamura/nifi)

Kudu-NiFiのクイックスタートはこちらから参照できます。また、このチュートリアルでもDockerを使ってNiFiを起動します。よってインストール等は不要です。便利な時代になりましたね。

NiFiを起動する

$ docker run --name kudu-nifi --network="docker_default" -p 8080:8080 apache/nifi:latest
replacing target file  /opt/nifi/nifi-current/conf/nifi.properties
(略)
Bootstrap Config File: /opt/nifi/nifi-current/conf/bootstrap.conf

続いてブラウザから localhost:8080/nifi にアクセスします。(サービスが起動するまでには若干時間がかかります。ブラウザからアクセスして下記の画面が表示されないようであれば、少し待ってからリトライしてください)

KuduにNiFi用のテーブルを作成する

NiFiからデータを投入するために、予めKuduにテーブルを作成しておきます。チュートリアルに従いjshellを使って対話的に行います。

$ docker run -it --rm --network="docker_default" maven:latest bin/bash

以下を入力します
$ mkdir jars $ mvn dependency:copy \ -Dartifact=org.apache.kudu:kudu-client-tools:1.10.0 \ -DoutputDirectory=jars (略)
tools/1.10.0/kudu-client-tools-1.10.0.jar (13 MB at 2.4 MB/s)
[INFO] Copying kudu-client-tools-1.10.0.jar to /jars/kudu-client-tools-1.10.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:14 min
[INFO] Finished at: 2019-08-09T09:17:04Z
[INFO] ------------------------------------------------------------------------
$ jshell --class-path jars/*
Aug 09, 2019 9:19:17 AM java.util.prefs.FileSystemPreferences$1 run INFO: Created user preferences directory. | Welcome to JShell -- Version 11.0.4 | For an introduction type: /help intro jshell>

シェルが起動したら下記のコードを貼り付けます。

import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduClient
import org.apache.kudu.client.KuduClient.KuduClientBuilder
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.Schema
import org.apache.kudu.Type

KuduClient client =
  new KuduClientBuilder("kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251").build();

if(client.tableExists("random_user")) {
  client.deleteTable("random_user");
}

Schema schema = new Schema(Arrays.asList(
  new ColumnSchemaBuilder("ssn", Type.STRING).key(true).build(),
  new ColumnSchemaBuilder("firstName", Type.STRING).build(),
  new ColumnSchemaBuilder("lastName", Type.STRING).build(),
  new ColumnSchemaBuilder("email", Type.STRING).build())
);
CreateTableOptions tableOptions =
  new CreateTableOptions().setNumReplicas(3).addHashPartitions(Arrays.asList("ssn"), 4);
client.createTable("random_user", schema, tableOptions);

貼り付けたら Ctrl+D を押して jshell から抜け、exit と入力してコンテナから抜けます。ここではrandom_user という名前のテーブルを作成しています。テーブルのスキーマは次の通り。

列名ssnfirstamelastNameemail
データ型STRINGSTRINGSTRINGSTRING

これでテーブルが作成できました。必要であればKuduのWebUIで random_user テーブルが作成できたことを確認してください。(例: http://localhost:8050)

これで準備完了です。

テンプレートを読み込む

NiFiは、グラフィカルインターフェースを使ってブラウザから直感的にデータフローを定義することができますが、そのフローの定義の実態はXMLファイルです。今回のチュートリアルにはデータフローのテンプレートが用意されているので、GUIからゴリゴリ作成する必要はありません。このテンプレートファイルは、チュートリアルの最初の手順でgit cloneした際にローカルに保存されています。(Random_User_Kudu.xml という名前のファイル)

このファイルをNiFiのUIからインポートします。NiFiの画面から、下記のスクリーンショットの丸印で囲まれたアイコンをクリックします。

続いてテンプレートファイルを選択します。虫眼鏡のアイコンをクリックして Random_User_Kudu.xml ファイルが置かれている場所を選択してください。選択したらUPLOADボタンを押します。

完了すると次の画面が表示れます。

ここから先はチュートリアルだけではわかりにくい点でした。NiFi側で以下の操作を行います。

テンプレートの表示

インポートしたテンプレートを表示するには、NiFiの画面の右上にある3本線のメニューをクリックしてTemplate を選択します。

先ほどインポートしたテンプレートが下記のスクリーンショットのように一覧表示されるはずです。

テンプレートを利用する

テンプレートはまだ使えるようになっていません。テンプレートを利用できるようにするには、上部のメニューのテンプレートアイコンをキャンバス(方眼紙の部分)にドラッグします。右上のX印を押してNiFi Templates画面をクローズしてから、下記のTemplateアイコンをマウスでキャンパスにドラッグしてください。

ドラッグすると先ほどのテンプレートがAdd Templateダイアログに表示されます。Addボタンを押してキャンバスに追加します。

NiFi-Kuduのデータフロー

テンプレートをキャンバスに追加すると、下記のようなデータフローが表示されます。一画面に表示しきれない場合は左側のNavigateパネルの虫眼鏡の+または-アイコンをクリックして、拡大、縮小することができます。

さて、このフローは何を行っているのでしょうか?

NiFiでは、データをどう加工するかの要素を「プロセッサー」と呼びます。このフローには5つのプロセッサーが定義されています。

  1. Fetch User Data (InvokeHTTP)
  2. SplitJson (SplitJson)
  3. EvaluateJsonPath (EvaluateJsonPath)
  4. AttributesToJSON (AttributesToJSON)
  5. PutKudu (PutKudu)

これらを特に変更する必要はありません。それぞれのプロセッサーの詳細に興味がない方は以下の説明はスキップしてください。

1. FetchUserData

このプロセッサーでは http://randomuser.me からランダムに生成されたユーザーデータを取り込んでいます。Fetch User Data の四角い箱をダブルクリックするとプロセッサーの設定を行うことができます。

取得間隔は10秒です。

PROPERTIESタブのPropertyのRemote URL に設定されているように、ここでは100件のレコードを取得しています。

なお、APIで取得しているJSONは次のようになります。

2. SplitJson

SplitJsonプロセッサーは JsonPath の表現にしたがって JSON を配列に変換します。(詳細

この設定では $.results[*] という値が設定されているため、FetchUserData プロセッサーからの入力を配列に変換しています。

3. EvaluateJsonPath

このプロセッサーはフローファイルの内容に対するJsonPathの評価を行います。(詳細)

Destinationが「flowfile-attribute」に指定されている場合は、JsonPathにマッチする属性がそれぞれのプロパティに設定されます。ここでは email, firstName, lastName, ssn の4つの項目が設定されています。

4. AttributesToJson

このプロセッサーは属性(アトリビュート)をJSONに変換します。(詳細

3. のEvaluateJsonPathの4つの属性をJSONに変換しています。

5. PutKudu

このプロセッサでKuduに書き込みます。(詳細

Kudu Masterのアドレス、テーブル名はスクリーンショットにある通りです。レコードを読む方法をRecord Readerプロパティで指定します。今回は JsonTreeReader が設定されていましたが、CSVやAvroなども指定できます。

JsonTreeReaderの右側の矢印をクリックし、設定を表示します。

さらに歯車のアイコンをクリックするとJsonTreeReaderの設定を確認できます。

これはJsonのツリーを読み込みます。「Schema Access Strategy」の設定は Infer Schema(デフォルト)に設定されていました。これはドキュメントに詳細が記載されていますが、スキーマを手動で生成するのではなく、データの値に基づいてスキーマを推論する方法です。便利なためによく使われるようです。

NiFiのフローを開始する

ここではNiFiのフローを開始します。

コントロールサービスの有効化

コントロールサービスを有効化しないとフローを開始できません。まずはフローの一番下の四角い箱 (PutKudu) をダブルクリックします。PROPERTIESタブをクリックし、Record Readerの右側の矢印をクリックします。(スクリーンショットは上記参照)

続いて右側の稲妻のアイコンをクリックし、

その後右下の「ENABLE」をクリックします。CLOSEをクリックして前の画面に戻ると、StateがEnabledに変わっています。

 

右上のXを押してキャンバスに戻ると、PutKuduの状態が黄色の三角から赤い四角のアイコンに変わっていることがわかります。

フローの開始

フローを開始する準備ができました。左側の Operate パネルの三角ボタンを押してフローを開始します。

フローが開始すると、アイコンが右側の三角に変わります。

上記のスクリーンショットにあるように、10秒ごとに取り込んだデータをKuduに書き込んでいるように見えます。チュートリアルではここでNiFiを止めていますが、このままデータを書き続けながらデータを確認してみましょう。

Kuduに書き込まれたデータを確認する

詳細は次回のブログで記述しますが、spark を使ってデータを確認します。まだSparkがインストールされていない場合はチュートリアルに従ってインストールします。

インストールが終わったら、Spark を開始して

$ spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
(略)
Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92) Type in expressions to have them evaluated. Type :help for more information. scala>

:paste と入力し、次のコードを貼り付けます。貼り付けたら Ctrl+D を入力します。

val random_user = spark.read.
option("kudu.master", "localhost:7051,localhost:7151,localhost:7251").
option("kudu.table", "random_user").
// We need to use leader_only because Kudu on Docker currently doesn't
// support Snapshot scans due to `--use_hybrid_clock=false`
option("kudu.scanLocality", "leader_only").
format("kudu").load
random_user.createOrReplaceTempView("random_user")
spark.sql("SELECT count(*) FROM random_user").show()

最後の行に、次のように件数が表示されるはずです。

scala> spark.sql("SELECT count(*) FROM random_user").show()
+--------+                                                                      
|count(1)|
+--------+
|    9500|
+--------+

もう一度上記の行(spark.sql(“SELECT…”)を入力するか、あるいは矢印の上を押して上記のクエリをもう一度実行すると、徐々に件数が増えていることが確認できるでしょう。

scala> spark.sql("SELECT count(*) FROM random_user").show()
+--------+                                                                      
|count(1)|
+--------+
|    9600|
+--------+

ここでシェルを終了する場合は、scala> のプロンプトで :quit と入力してください。

次回は Spark のチュートリアルに従って操作を行う予定です。

コメント