CDH 5.4でHive on Sparkを試す

Hive on Spark (on CDH5.4)

※Hive on Sparkはテクノロジープレビュー扱いです。現時点ではサポート対象外なのでご注意を。
CDH5.4に含まれているHiveはHive 1.1です。このバージョンのHiveから、Hiveの実行エンジンとしてSparkが利用できるようになりました。(Hive on Spark [1][2])
[1] https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
[2] https://issues.apache.org/jira/browse/HIVE-7292
ドキュメントも発見したので早速試してみます。
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_hos_config.html

Hive on Sparkを有効にする

Cloudera ManagerからHiveの設定を変更します。Hiveの設定画面で「Spark」という文字を検索します。(フィルタのSearchフィールドに入力する)
hiveonspark1Enable Hive on Spark (Unsupported) にチェックをして保存します。変更箇所はそれほど多くありません。
hiveonspark2[クラスタの再起動]ボタンを押して、サービスを再起動します。本来は必要なサービスのみで良いのですが、万全を期すためにここではボタンを押してみました。
hiveonspark3以下、手元の環境でハマった点です。根本的にこれが原因だったのかは不明。

1. Sparkサービス

Hive on SparkがStandaloneモードで動作するかどうかわからなかったので、Sparkサービスを再度インストールしました。Cloudera ManagerはSparkサービスが2種類あり、「Spark」がYARNを利用するサービス、「Spark (Standalone)」がスタンドアロンのサービスです。今回は前者を利用します。

2. Sparkのgatewayロール

ドキュメントに記載されていますが、SparkのgatewayロールをHiveServer2と同じノードにインストールする必要があります。今回は擬似分散なので特に何かをする必要はありませんが、分散環境の場合は注意が必要ですね。

Hive on Sparkを実行する

beelineを使ってクエリを投げてみましょう。
[code]
[cloudera@quickstart ~]$ beeline -u jdbc:hive2://quickstart.cloudera:10000 -u cloudera -p cloudera
scan complete in 2ms
Connecting to jdbc:hive2://quickstart.cloudera:10000
Connected to: Apache Hive (version 1.1.0-cdh5.4.0)
Driver: Hive JDBC (version 1.1.0-cdh5.4.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.1.0-cdh5.4.0 by Apache Hive
0: jdbc:hive2://quickstart.cloudera:10000>
[/code]
最初はHive on MapReduce。
[code]
0: jdbc:hive2://quickstart.cloudera:10000> set hive.execution.engine=mr;
No rows affected (0.053 seconds)
0: jdbc:hive2://quickstart.cloudera:10000> select * from sample_07 ORDER BY code DESC LIMIT 3;
INFO  : Number of reduce tasks determined at compile time: 1
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1430171180901_0004
INFO  : The url to track the job: http://quickstart.cloudera:8088/proxy/application_1430171180901_0004/
INFO  : Starting Job = job_1430171180901_0004, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1430171180901_0004/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/bin/hadoop job  -kill job_1430171180901_0004
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
INFO  : 2015-04-27 14:58:10,956 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-04-27 14:58:18,617 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 0.95 sec
INFO  : 2015-04-27 14:58:26,006 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.15 sec
INFO  : MapReduce Total cumulative CPU time: 2 seconds 150 msec
INFO  : Ended Job = job_1430171180901_0004
+—————–+————————————-+———————-+——————-+–+
| sample_07.code  |        sample_07.description        | sample_07.total_emp  | sample_07.salary  |
+—————–+————————————-+———————-+——————-+–+
| 53-7199         | Material moving workers, all other  | 43840                | 33170             |
| 53-7121         | Tank car, truck, and ship loaders   | 14870                | 35820             |
| 53-7111         | Shuttle car operators               | 2660                 | 41300             |
+—————–+————————————-+———————-+——————-+–+
3 rows selected (34.729 seconds)
0: jdbc:hive2://quickstart.cloudera:10000>
[/code]
続いて Hive on Spark。
が、失敗、、、、
[code]
0: jdbc:hive2://quickstart.cloudera:10000> set hive.execution.engine=spark;
No rows affected (0.149 seconds)
0: jdbc:hive2://quickstart.cloudera:10000> select * from sample_07 ORDER BY code DESC LIMIT 3;
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
ERROR : Failed to execute spark task, with exception ‘org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create spark client.)’
org.apache.hadoop.hive.ql.metadata.HiveException: Failed to create spark client.
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl.open(SparkSessionImpl.java:57)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:116)
at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:118)
at org.apache.hadoop.hive.ql.exec.spark.SparkTask.execute(SparkTask.java:97)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1638)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1397)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1181)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1047)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1042)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:145)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:70)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:209)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at com.google.common.base.Throwables.propagate(Throwables.java:156)
at org.apache.hive.spark.client.SparkClientImpl.<init>(SparkClientImpl.java:109)
at org.apache.hive.spark.client.SparkClientFactory.createClient(SparkClientFactory.java:80)
at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient.<init>(RemoteHiveSparkClient.java:88)
at org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory.createHiveSparkClient(HiveSparkClientFactory.java:65)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl.open(SparkSessionImpl.java:55)
… 22 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37)
at org.apache.hive.spark.client.SparkClientImpl.<init>(SparkClientImpl.java:99)
… 26 more
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at org.apache.hive.spark.client.rpc.RpcServer$2.run(RpcServer.java:134)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
… 1 more
ERROR : Failed to execute spark task, with exception ‘org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create spark client.)’
org.apache.hadoop.hive.ql.metadata.HiveException: Failed to create spark client.
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl.open(SparkSessionImpl.java:57)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:116)
at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:118)
at org.apache.hadoop.hive.ql.exec.spark.SparkTask.execute(SparkTask.java:97)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1638)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1397)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1181)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1047)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1042)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:145)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:70)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:209)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at com.google.common.base.Throwables.propagate(Throwables.java:156)
at org.apache.hive.spark.client.SparkClientImpl.<init>(SparkClientImpl.java:109)
at org.apache.hive.spark.client.SparkClientFactory.createClient(SparkClientFactory.java:80)
at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient.<init>(RemoteHiveSparkClient.java:88)
at org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory.createHiveSparkClient(HiveSparkClientFactory.java:65)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl.open(SparkSessionImpl.java:55)
… 22 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37)
at org.apache.hive.spark.client.SparkClientImpl.<init>(SparkClientImpl.java:99)
… 26 more
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for client connection.
at org.apache.hive.spark.client.rpc.RpcServer$2.run(RpcServer.java:134)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
… 1 more
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask (state=08S01,code=1)
[/code]
ログを見ると、App Masterまでは投げられているようです。
hiveonspark_fail試行錯誤を繰り返し、最終的にログとにらめっこをしたところ、hive-site.xmlのパーミッションが問題というエラーがでていることを発見。
hiveonspark_err_detailログの抜粋
[code]
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml (Permission denied)
[/code]
根本的な原因を探すのは後回しにして、ワークアラウンドでコマンドラインからファイルのパーミッションを変更して再度実行することにしました。
[code]
[cloudera@quickstart ~]$ ls -l /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml
-rw-r—– 1 hive hive 7189 Apr 27 14:46 /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml
[cloudera@quickstart ~]$ sudo chmod +r /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml
[cloudera@quickstart ~]$ ls -l /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml
-rw-r–r– 1 hive hive 7189 Apr 27 14:46 /var/run/cloudera-scm-agent/process/297-hive-HIVESERVER2/hive-site.xml
[cloudera@quickstart ~]$
[/code]
再度実行してみます。
[code]
0: jdbc:hive2://quickstart.cloudera:10000> set hive.execution.engine=spark;
No rows affected (0.118 seconds)
0: jdbc:hive2://quickstart.cloudera:10000> select * from sample_07 ORDER BY code DESC LIMIT 3;
INFO  : In order to change the average load for a reducer (in bytes):
INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
INFO  : In order to limit the maximum number of reducers:
INFO  :   set hive.exec.reducers.max=<number>
INFO  : In order to set a constant number of reducers:
INFO  :   set mapreduce.job.reduces=<number>
INFO  : Starting Spark Job = 8b40959d-bd9d-4dea-8c77-6fe29e180dff
INFO  :
Query Hive on Spark job[1] stages:
INFO  : 2
INFO  : 3
INFO  :
Status: Running (Hive on Spark job[1])
INFO  : Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
INFO  : 2015-04-27 15:50:48,744    Stage-2_0: 0(+1)/1    Stage-3_0: 0/1
INFO  : 2015-04-27 15:50:51,774    Stage-2_0: 0(+1)/1    Stage-3_0: 0/1
INFO  : 2015-04-27 15:50:54,914    Stage-2_0: 0(+1)/1    Stage-3_0: 0/1
INFO  : 2015-04-27 15:50:56,937    Stage-2_0: 1/1 Finished    Stage-3_0: 0(+1)/1
INFO  : 2015-04-27 15:50:57,984    Stage-2_0: 1/1 Finished    Stage-3_0: 1/1 Finished
INFO  : Status: Finished successfully in 16.35 seconds
INFO  : =====Spark Job[8b40959d-bd9d-4dea-8c77-6fe29e180dff] statistics=====
INFO  : HIVE
INFO  :     RECORDS_OUT: 0
INFO  :     CREATED_FILES: 1
INFO  :     RECORDS_IN: 823
INFO  :     RECORDS_OUT_INTERMEDIATE: 3
INFO  :     DESERIALIZE_ERRORS: 0
INFO  : Spark Job[8b40959d-bd9d-4dea-8c77-6fe29e180dff] Metrics
INFO  :     EexcutorDeserializeTime: 5304
INFO  :     ExecutorRunTime: 3138
INFO  :     ResultSize: 3094
INFO  :     JvmGCTime: 1085
INFO  :     ResultSerializationTime: 2
INFO  :     MemoryBytesSpilled: 0
INFO  :     DiskBytesSpilled: 0
INFO  :     BytesRead: 49465
INFO  :     RemoteBlocksFetched: 0
INFO  :     LocalBlocksFetched: 1
INFO  :     TotalBlocksFetched: 1
INFO  :     FetchWaitTime: 0
INFO  :     RemoteBytesRead: 0
INFO  :     ShuffleBytesWritten: 224
INFO  :     ShuffleWriteTime: 4856340
+—————–+————————————-+———————-+——————-+–+
| sample_07.code  |        sample_07.description        | sample_07.total_emp  | sample_07.salary  |
+—————–+————————————-+———————-+——————-+–+
| 53-7199         | Material moving workers, all other  | 43840                | 33170             |
| 53-7121         | Tank car, truck, and ship loaders   | 14870                | 35820             |
| 53-7111         | Shuttle car operators               | 2660                 | 41300             |
+—————–+————————————-+———————-+——————-+–+
3 rows selected (27.433 seconds)
0: jdbc:hive2://quickstart.cloudera:10000>
[/code]
今度はうまくいきました!Cloudera Managerがデプロイするhive-site.xmlの問題のようですが、なんだろう?

複雑なクエリ

もう少し複雑なクエリも試してみましょう。Hueのサンプルにある以下のクエリを実行してみます。
[code]
SELECT s07.description, s07.salary, s08.salary,  s08.salary – s07.salary
FROM
sample_07 s07 JOIN sample_08 s08
ON ( s07.code = s08.code)
WHERE
s07.salary < s08.salary
ORDER BY s08.salary-s07.salary DESC
LIMIT 1000;
[/code]
今回の環境は、擬似分散 & 仮想マシン上で実行 & メモリ割当は脆弱(仮想マシンへの割当はたった4GB) & ノンチューニングという劣悪な環境なので、以下の数値は全く参考になりません。が、せっかくなのでシェアします。

Execution EngineTime (sec)
Hive on MR (YARN)54.78 sec
57.224 sec
53.565 sec
Hive on Spark (YARN)34.199 sec
31.257 sec
25.301 sec
(参考: Impala)4.19 sec
1.26 sec
1.13 sec

※同じクエリを同一環境で3回実行した結果です。
SparkのWeb UIで確認したところ、Sparkのステージにかかっている時間はごくわずかでした。
Hive on Spark - Details for Job 9

まとめ

Hive on Sparkは、Hive on Tezと同様に、Hiveの実行エンジンをデフォルトのMapReduceから切り替えて実行します。Hive on Sparkの実行エンジンはSparkですが、フロントエンドはHiveなので「Hiveで書いていたクエリを変更することなく実行できる」というのが売りです。Hive on Sparkが安定してくれば、これまで実行してきたバッチ処理を今までよりも短い時間で実行するために利用できるようになるでしょう。
一方、分散クエリエンジンのImpalaは分析用に有利です。使ってみると分かりますが、圧倒的に早いです。しかし制約もありますし、ImpalaのSQLとHiveQLは完全に同一ではありません。Impalaでバッチ処理の置き換えをしようと考えている場合は十分にご注意ください。
最後に、

  • Hive on SparkはImpalaを置き換えるものではない
    ということと、
  • Hive on SparkはSpark SQLやSharkとは別物

ということだけ補足しておきます(ややこしいですね)。

コメント