Apache Spark 2.xでKuduを利用する

以前のブログ(Apache SparkでApache Kuduを利用する)の Spark2.x版です。前回のブログからあまり変わっていませんが、前回のブログの手順はSpark2.xで動作しなかったという話を聞いたのでアップデートしておきます。

なお、昨日 Kudu 1.7 がリリースされました!今回の特徴は

  • (ついに)DECIMAL型をサポート!
  • 可用性とパフォーマンスの改善

などです。

そういえば、こんなツイートも公開されていました。クエリの結果が戻るのが早すぎて問題になるという… そんなこと言われても困りますよね(笑)

 

Spark 2.2 からKuduのテーブルにアクセスする

シェルを起動します。引数に注意。

$ spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.6.0
(略)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://xxx.xxx.xxx.xxx:4040
Spark context available as 'sc' (master = yarn, app id = application_1522096991758_0008).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0-SNAPSHOT
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

シェルが起動したらパッケージをインポートします。

scala> import org.apache.kudu.spark.kudu._
import org.apache.kudu.spark.kudu._

scala> import org.apache.kudu.client._
import org.apache.kudu.client._

データフレームを作成してアクセスします。Impalaで作成したテーブルは、my_first_table という名前です。念のため Impala で確認してみました。

[xxx.example.com:21000] > SHOW TABLES;
Query: SHOW TABLES
+----------------+
| name           |
+----------------+
| customers      |
| my_first_table |
| sample_07      |
| sample_08      |
| web_logs       |
+----------------+
Fetched 5 row(s) in 0.01s
[xxx.example.com:21000] > DESCRIBE my_first_table;
Query: describe my_first_table
+------+--------+---------+-------------+----------+---------------+---------------+---------------------+------------+
| name | type   | comment | primary_key | nullable | default_value | encoding      | compression         | block_size |
+------+--------+---------+-------------+----------+---------------+---------------+---------------------+------------+
| id   | bigint |         | true        | false    |               | AUTO_ENCODING | DEFAULT_COMPRESSION | 0          |
| name | string |         | false       | true     |               | AUTO_ENCODING | DEFAULT_COMPRESSION | 0          |
+------+--------+---------+-------------+----------+---------------+---------------+---------------------+------------+
Fetched 2 row(s) in 0.01s
[xxx.example.com:21000] > 

Sparkシェルからデータフレームを作成してアクセスします。

scala> val df = spark.read.options(Map("kudu.master" -> "xxx.example.com:7051","kudu.table" -> "impala::default.my_first_table")).kudu
18/03/27 05:02:31 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
18/03/27 05:02:33 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
df: org.apache.spark.sql.DataFrame = [id: bigint, name: string]


scala> df.printSchema
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)


scala> df.show
+---+------+
| id|  name|
+---+------+
|  3|  maru|
|  1|tatsuo|
|  4|   sho|
+---+------+


scala>

無事にアクセスできていますね。

なお、Hiveのメタストア経由(?)でアクセスしようとすると以下のように例外が発生します。テキストファイルが含まれるsample_07テーブルにはアクセスできますが、Kuduのテーブル my_first_table にはアクセスできません。ご注意を。

このJIRAも参照: https://issues.apache.org/jira/browse/HIVE-12971

scala> val x = spark.read.table("sample_07")
x: org.apache.spark.sql.DataFrame = [code: string, description: string ... 2 more fields]

scala> val x = spark.read.table("my_first_table")
java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Error in loading storage handler.com.cloudera.kudu.hive.KuduStorageHandler
  at org.apache.hadoop.hive.ql.metadata.Table.getStorageHandler(Table.java:295)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:395)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:382)
  at scala.Option.map(Option.scala:146)
(略)
Pocket

Leave a Reply

Your email address will not be published. Required fields are marked *

CAPTCHA


日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)