hadoop-logo
ようこそ Tech blogへ!
「これからHadoopを勉強しよう」という方はまず下記のページから

サイトの移行に伴って画像が表示されないなどの不具合が生じています

Pig on Spark を試す

pig

CDH6.x では公式にサポートされていませんが、Pig on Spark が動作します。

なぜ今Pig!?という声が…

Pig on Spark とは、Apache Spark をApache Pig の実行エンジンにすることです。詳細は PIG-4059 のJIRAをご覧ください。今となっては Pig Latinを新規に書くよりも、Sparkで DataFrame を用いてコードを記述する方が楽だと思います、が、大量のPigスクリプトの資産がある場合は試す価値があるかもしれません。

Pig on Spark は動作するとはいえClouderaではサポートしない非サポート機能です。利用される場合は自己責任で。

とはいえ、MapReduceと比べてどの程度性能が上がるのかという技術的な興味があったので、Pig on MapReduceとPig on Sparkで簡単な処理を行って測定してみました。

Pig on Spark を実行する

Pig on Spark を起動するにはいくつかの方法があります。

コマンドラインpig -x spark
プロパティexectype=spark
スクリプトset exectype=spark;

今回の懸賞ではコマンドラインから実行しました。なお、環境変数 SPARK_HOME を設定していないと起動に失敗するので注意してください。

Pig on Spark の起動

環境変数を設定し、-x オプションで spark を指定してpigを実行します。

$ export SPARK_HOME=/usr/lib/spark
$ pig -x spark
Using Spark Home:  /usr/lib/spark
WARNING: Use "yarn jar" to launch YARN applications.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
530  [main] INFO  org.apache.pig.ExecTypeProvider  - Trying ExecType : LOCAL
531  [main] INFO  org.apache.pig.ExecTypeProvider  - Trying ExecType : MAPREDUCE
533  [main] INFO  org.apache.pig.ExecTypeProvider  - Trying ExecType : TEZ_LOCAL
533  [main] INFO  org.apache.pig.ExecTypeProvider  - Trying ExecType : TEZ
534  [main] INFO  org.apache.pig.ExecTypeProvider  - Trying ExecType : SPARK
534  [main] INFO  org.apache.pig.ExecTypeProvider  - Picked SPARK as the ExecType
2018-12-25 01:38:11,643 [main] INFO  org.apache.pig.Main - Apache Pig version 0.17.0-cdh6.x-SNAPSHOT (rUnversioned directory) compiled Dec 22 2018, 09:24:40
2018-12-25 01:38:11,644 [main] INFO  org.apache.pig.Main - Logging error messages to: /tmp/tatsuo/pig_1545730691624.log
2018-12-25 01:38:11,670 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found
2018-12-25 01:38:11,997 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2018-12-25 01:38:11,997 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://nightly6x-unsecure-1.vpc.cloudera.com:8020
2018-12-25 01:38:12,748 [main] INFO  org.apache.pig.PigServer - Pig Script ID for the session: PIG-default-bbedd8bb-bbc2-4854-bb03-1bd54e4d56f9
2018-12-25 01:38:12,748 [main] WARN  org.apache.pig.PigServer - ATS is disabled since yarn.timeline-service.enabled set to false
grunt> 

シェルは無事に起動しているようです。

Pig Latin のテストコード

今回のテスト用に、昨日実行したImpala/Hiveのクエリ同等の処理を行う Pig Latin を記述しました。

pigtest.pig

sh date;
tripdataraw = LOAD '/tmp/201402_trip_data.csv' USING PigStorage(',') AS (tripid:long, duration:long, startdate:chararray, startstation:chararray, startterminal:long, enddate:chararray, endstation:chararray, endterminal:long, bikeid:long, subscriptiontype:chararray, zipcode:chararray); 
tripdata = FILTER tripdataraw BY tripid > 0;
grouped = GROUP tripdata BY (startstation, endstation);
counted = FOREACH grouped GENERATE FLATTEN(group) AS (start_station,end_stations), COUNT_STAR(tripdata) AS trips;
ordered = ORDER counted BY trips DESC; 
ordered_limit = LIMIT ordered 25; 
DUMP ordered_limit;
sh date;

しかし、このデータにはヘッダ行が含まれています。1行読み飛ばすために piggybank の CSVExcelStorage ローダーを利用しても良いのですが、今回の環境には jar ファイルがなかったので単純に1行スキップすることにしました。列名もあえて指定しています。なお、時間測定のために先頭と最後にシェルで日時を表示するようにしました。

GruntシェルからPig on Spark の動作確認

まずはシェルから実行し、動作するかを確認します。

grunt> tripdataraw = LOAD '/tmp/201402_trip_data.csv' USING PigStorage(',') AS (tripid:long, duration:long, startdate:chararray, startstation:chararray, startterminal:long, enddate:chararray, endstation:chararray, endterminal:long, bikeid:long, subscriptiontype:chararray, zipcode:chararray);
grunt> tripdata = FILTER tripdataraw BY tripid > 0;
grunt> grouped = GROUP tripdata BY (startstation, endstation);
grunt> counted = FOREACH grouped GENERATE FLATTEN(group) AS (start_station,end_stations), COUNT_STAR(tripdata) AS trips;
grunt> ordered = ORDER counted BY trips DESC;
grunt> ordered_limit = LIMIT ordered 25;
grunt> DUMP ordered_limit;
(略)
(Harry Bridges Plaza (Ferry Building),Embarcadero at Sansome,1330)
(Townsend at 7th,San Francisco Caltrain (Townsend at 4th),1322)
(San Francisco Caltrain 2 (330 Townsend),Townsend at 7th,1116)
(Market at Sansome,2nd at South Park,866)
(Embarcadero at Sansome,Steuart at Market,811)
(2nd at South Park,Market at Sansome,798)
(San Francisco Caltrain (Townsend at 4th),Harry Bridges Plaza (Ferry Building),782)
(2nd at Townsend,Harry Bridges Plaza (Ferry Building),757)
(Steuart at Market,Embarcadero at Sansome,717)
(略)

結果は正しそうです。MapReduceと比較するとかなり体感的には早いです。

結果

今回のテストはあくまでも動作確認レベルですが、以下の4パターンで実行しました

追記: Spark on Pig on YARN の実行と結果を追加しました。この環境だと明示的に指定しない限りSparkのローカルモードで実行されます。

  • Pig on MapReduce (on YARN)
    • pig -x mr
  • Pig on Spark on YARN(1回目: YARN/Executorが起動するオーバーヘッドを含む実行時間)
    • sudo -u spark -E SPARK_HOME=/usr/lib/spark -E SPARK_MASTER=yarn pig -x spark
  • Pig on Spark on YARN(2回目: Executor起動状態からの実行時間)
    • 同上
  • Pig on Spark on Local(1回目: Executorが起動するオーバーヘッドを含む実行時間)
    • sudo -u spark -E SPARK_HOME=/usr/lib/spark pig -x spark
  • Pig on Spark on YARN(2回目: Executor起動状態からの実行時間)
    • 同上
  • Pigのローカルモード
    • pig -x local
    • なお、ローカルモードでは csv ファイルをローカルにコピーして実行しているので HDFS にはアクセスしていません。

種類実行時間(秒)
Pig on MR on YARN (オーバーヘッド含む)88
Pig on Spark on YARN (オーバーヘッド含む)27
Pig on Spark on YARN (オーバーヘッド含まず)4
Pig on Spark on local (オーバーヘッド含む)9
Pig on Spark on local (オーバーヘッド含まず)3
Pig local mode6

それなりに期待できる結果になりましたね。

まとめ

このバージョンでの Spark on Pig がどこまで複雑な構文に対応しているのかは未検証ですが、Pig on MR の長時間かかるジョブで困っているなら選択する価値があるかもしれません。

くれぐれも本番環境への投入は自己責任で!

参考までにPig on Spark 実行時のSpark Web UI のスクリーンショットを貼っておきます。

 

 

 

コメント

  1. kernel023 より:

    このSpark on Pig はYARN上で走っていないことが確認できたので、YARNで実行する方法を追記しました