Apache Spark 2.xでKuduを利用する

以前のブログ(Apache SparkでApache Kuduを利用する)の Spark2.x版です。前回のブログからあまり変わっていませんが、前回のブログの手順はSpark2.xで動作しなかったという話を聞いたのでアップデートしておきます。
なお、昨日 Kudu 1.7 がリリースされました!今回の特徴は

  • (ついに)DECIMAL型をサポート!
  • 可用性とパフォーマンスの改善

などです。



そういえば、こんなツイートも公開されていました。クエリの結果が戻るのが早すぎて問題になるという… そんなこと言われても困りますよね(笑)


 

Spark 2.2 からKuduのテーブルにアクセスする

シェルを起動します。引数に注意。
[code]
$ spark-shell –packages org.apache.kudu:kudu-spark2_2.11:1.6.0
(略)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://xxx.xxx.xxx.xxx:4040
Spark context available as ‘sc’ (master = yarn, app id = application_1522096991758_0008).
Spark session available as ‘spark’.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
[/code]
シェルが起動したらパッケージをインポートします。
[code]
scala> import org.apache.kudu.spark.kudu._
import org.apache.kudu.spark.kudu._
scala> import org.apache.kudu.client._
import org.apache.kudu.client._
[/code]
データフレームを作成してアクセスします。Impalaで作成したテーブルは、my_first_table という名前です。念のため Impala で確認してみました。
[code]
[xxx.example.com:21000] > SHOW TABLES;
Query: SHOW TABLES
+—————-+
| name |
+—————-+
| customers |
| my_first_table |
| sample_07 |
| sample_08 |
| web_logs |
+—————-+
Fetched 5 row(s) in 0.01s
[xxx.example.com:21000] > DESCRIBE my_first_table;
Query: describe my_first_table
+——+——–+———+————-+———-+—————+—————+———————+————+
| name | type | comment | primary_key | nullable | default_value | encoding | compression | block_size |
+——+——–+———+————-+———-+—————+—————+———————+————+
| id | bigint | | true | false | | AUTO_ENCODING | DEFAULT_COMPRESSION | 0 |
| name | string | | false | true | | AUTO_ENCODING | DEFAULT_COMPRESSION | 0 |
+——+——–+———+————-+———-+—————+—————+———————+————+
Fetched 2 row(s) in 0.01s
[xxx.example.com:21000] >
[/code]
Sparkシェルからデータフレームを作成してアクセスします。
[code]
scala> val df = spark.read.options(Map("kudu.master" -> "xxx.example.com:7051","kudu.table" -> "impala::default.my_first_table")).kudu
18/03/27 05:02:31 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
18/03/27 05:02:33 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
df: org.apache.spark.sql.DataFrame = [id: bigint, name: string]
scala> df.printSchema
root
|– id: long (nullable = false)
|– name: string (nullable = true)
scala> df.show
+—+——+
| id| name|
+—+——+
| 3| maru|
| 1|tatsuo|
| 4| sho|
+—+——+
scala>
[/code]
無事にアクセスできていますね。
なお、Hiveのメタストア経由(?)でアクセスしようとすると以下のように例外が発生します。テキストファイルが含まれるsample_07テーブルにはアクセスできますが、Kuduのテーブル my_first_table にはアクセスできません。ご注意を。
このJIRAも参照: https://issues.apache.org/jira/browse/HIVE-12971
[code]
scala> val x = spark.read.table("sample_07")
x: org.apache.spark.sql.DataFrame = [code: string, description: string … 2 more fields]
scala> val x = spark.read.table("my_first_table")
java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Error in loading storage handler.com.cloudera.kudu.hive.KuduStorageHandler
at org.apache.hadoop.hive.ql.metadata.Table.getStorageHandler(Table.java:295)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:395)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:382)
at scala.Option.map(Option.scala:146)
(略)
[/code]

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です