hadoop-logo
ようこそ Tech blogへ!
「これからHadoopを勉強しよう」という方はまず下記のページから

サイトの移行に伴って画像が表示されないなどの不具合が生じています

Sparkでワードカウントしてみる

本日Clouderaのブログで、「How-to: Run a Simple Apache Spark App in CDH 5」という記事が公開されました。一言で言えば、Apache SparkでWordCountを実行するというものです。
英語の記事でもそれほどハマるところはないと思いますが、手元にCDH5の環境があるので試してみました。

Sparkサービスを追加

Sparkは設定していなかったので、Cloudera Manager 5を使ってSparkサービスを追加します。(Cloudera Managerに慣れると、手作業での設定に戻れなくなる、、、)
Add serviceまずは「サービスを追加」を選びます。CM_wizard続いてSparkを選択します。
CM_wizard2依存関係でZookeeperがインストールされます。
CM_wizard1
この画面はわかりにくいですが、Masterをhadoop12号機、Workerを11,13,14,15号機に指定しました。
CM_wizard3あとは設定されるのを待ちます。(手動でやると面倒なんですが、、、)
CM_wizard4何も問題なくインストール完了です。

SparkでWordCountを実行

Clouderaのブログに従って進めていきます。

Gitレポジトリをクローン

まずはgitのレポジトリをクローンします。
[code]
$ git clone https://github.com/sryza/simplesparkapp
[/code]
ソースコードは simplesparkapp ディレクトリ以下にあります。

mavenのインストール

mavenを使ってコンパイルするようになっていますが、手元の環境には maven がインストールされていませんでした。apt-get を使ってインストールします。(※RHEL/CentOS系に慣れているので、apt-getはあまり詳しくないです)
[code]
$ sudo apt-get install maven
[/code]
依存関係で複数のパッケージがインストールされるので、しばらく待ちます。手元の環境では特にエラーも発生しませんでした。

コンパイル

手順に従いコンパイルします。
[code]
$ mvn package
[/code]
こちらも必要なパッケージのダウンロードなど、かなり時間がかかるかもしれません。が、特にエラーは発生しませんでした。コンパイルが終わると、jarファイルが作成されています。
[code]
$ ls target/
classes maven-archiver surefire
classes.timestamp sparkwordcount-0.0.1-SNAPSHOT.jar test-classes
[/code]

サンプルデータのアップロード

ワードカウントしたいデータをHDFSにアップロードします。実際、このデータは何でも良いのですが、今回はgitのレポジトリにあったデータを利用しました。
[code]
$ cd data
$ hadoop fs -mkdir sparkdata
$ hadoop fs -put inputfile.txt sparkdata
$ cd ..
[/code]

実行

実行はブログに書いてある通りに行います。ブログに書いてあった手順からの変更箇所は、以下の2カ所です。
1) Cloudera ManagerでセットアップしたJava7のパスが異なっていました。
2) アップロードしたHDFSのパス
1)に関して、時間がなかったので強制的にリンクを張り替えました。本来はalternativesを使うんですが、なぜかインストールされてなかったので、無理矢理です。
[code]
$ sudo rm /etc/alternatives/java
$ sudo ln -s /usr/lib/jvm/java-7-oracle-cloudera/jre/bin/java /etc/alternatives/java
[/code]
2) は、前述の /user/kawasaki/sparkdata/inputfile.txt を使用します。
なお、下記はlocalモードでの実行です。環境が脆弱なので、standaloneとYARNの環境での実行は、リソースが足りずに断念しました。ログは長いですが、参考までに全部貼っておきます。
[code]
source /etc/spark/conf/spark-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_45-cloudera
# system jars:
CLASSPATH=/etc/hadoop/conf
CLASSPATH=$CLASSPATH:$HADOOP_HOME/*:$HADOOP_HOME/lib/*
CLASSPATH=$CLASSPATH:$HADOOP_HOME/../hadoop-mapreduce/*:$HADOOP_HOME/../hadoop-mapreduce/lib/*
CLASSPATH=$CLASSPATH:$HADOOP_HOME/../hadoop-yarn/*:$HADOOP_HOME/../hadoop-yarn/lib/*
CLASSPATH=$CLASSPATH:$HADOOP_HOME/../hadoop-hdfs/*:$HADOOP_HOME/../hadoop-hdfs/lib/*
CLASSPATH=$CLASSPATH:$SPARK_HOME/assembly/lib/*
# app jar:
CLASSPATH=$CLASSPATH:target/sparkwordcount-0.0.1-SNAPSHOT.jar
$ $JAVA_HOME/bin/java -cp $CLASSPATH -Dspark.master=local com.cloudera.sparkwordcount.SparkWordCount sparkdata/inputfile.txt 2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/spark/assembly/lib/spark-assembly_2.10-0.9.0-cdh5.0.0-hadoop2.3.0-cdh5.0.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/04/15 02:33:46 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/15 02:33:46 INFO Remoting: Starting remoting
14/04/15 02:33:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@clouderavm.localdomain:59530]
14/04/15 02:33:46 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@clouderavm.localdomain:59530]
14/04/15 02:33:46 INFO spark.SparkEnv: Registering BlockManagerMaster
14/04/15 02:33:47 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140415023347-aaf7
14/04/15 02:33:47 INFO storage.MemoryStore: MemoryStore started with capacity 1064.7 MB.
14/04/15 02:33:47 INFO network.ConnectionManager: Bound socket to port 36739 with id = ConnectionManagerId(clouderavm.localdomain,36739)
14/04/15 02:33:47 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/04/15 02:33:47 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager clouderavm.localdomain:36739 with 1064.7 MB RAM
14/04/15 02:33:47 INFO storage.BlockManagerMaster: Registered BlockManager
14/04/15 02:33:47 INFO spark.HttpServer: Starting HTTP Server
14/04/15 02:33:47 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/04/15 02:33:47 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:57058
14/04/15 02:33:47 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.0.1.1:57058
14/04/15 02:33:47 INFO spark.SparkEnv: Registering MapOutputTracker
14/04/15 02:33:47 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e2dcb97b-30ee-49c0-84ae-ba547e42ac4c
14/04/15 02:33:47 INFO spark.HttpServer: Starting HTTP Server
14/04/15 02:33:47 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/04/15 02:33:47 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:56832
14/04/15 02:33:47 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/04/15 02:33:47 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/04/15 02:33:47 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/04/15 02:33:47 INFO ui.SparkUI: Started Spark Web UI at http://clouderavm.localdomain:4040
14/04/15 02:33:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
14/04/15 02:33:48 INFO storage.MemoryStore: ensureFreeSpace(154293) called with curMem=0, maxMem=1116418867
14/04/15 02:33:48 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 150.7 KB, free 1064.6 MB)
14/04/15 02:33:49 INFO mapred.FileInputFormat: Total input paths to process : 1
14/04/15 02:33:49 INFO spark.SparkContext: Starting job: collect at SparkWordCount.scala:42
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at SparkWordCount.scala:34)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Registering RDD 10 (reduceByKey at SparkWordCount.scala:40)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Got job 0 (collect at SparkWordCount.scala:42) with 1 output partitions (allowLocal=false)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at SparkWordCount.scala:42)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[4] at reduceByKey at SparkWordCount.scala:34), which has no missing parents
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[4] at reduceByKey at SparkWordCount.scala:34)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 1947 bytes in 9 ms
14/04/15 02:33:49 INFO executor.Executor: Running task ID 0
14/04/15 02:33:49 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/15 02:33:49 INFO rdd.HadoopRDD: Input split: hdfs://hadoop11.localdomain:8020/user/kawasaki/sparkdata/inputfile.txt:0+138
14/04/15 02:33:49 INFO executor.Executor: Serialized size of result for 0 is 758
14/04/15 02:33:49 INFO executor.Executor: Sending result for 0 directly to driver
14/04/15 02:33:49 INFO executor.Executor: Finished task ID 0
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Finished TID 0 in 238 ms on localhost (progress: 0/1)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 2.0 from pool
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(2, 0)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Stage 2 (reduceByKey at SparkWordCount.scala:34) finished in 0.253 s
14/04/15 02:33:49 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/04/15 02:33:49 INFO scheduler.DAGScheduler: running: Set()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: waiting: Set(Stage 0, Stage 1)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: failed: Set()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List(Stage 1)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[10] at reduceByKey at SparkWordCount.scala:40), which is now runnable
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[10] at reduceByKey at SparkWordCount.scala:40)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 1943 bytes in 0 ms
14/04/15 02:33:49 INFO executor.Executor: Running task ID 1
14/04/15 02:33:49 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/15 02:33:49 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks
14/04/15 02:33:49 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in  8 ms
14/04/15 02:33:49 INFO executor.Executor: Serialized size of result for 1 is 973
14/04/15 02:33:49 INFO executor.Executor: Sending result for 1 directly to driver
14/04/15 02:33:49 INFO executor.Executor: Finished task ID 1
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Finished TID 1 in 88 ms on localhost (progress: 0/1)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at SparkWordCount.scala:40) finished in 0.090 s
14/04/15 02:33:49 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/04/15 02:33:49 INFO scheduler.DAGScheduler: running: Set()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: failed: Set()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[12] at reduceByKey at SparkWordCount.scala:40), which is now runnable
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[12] at reduceByKey at SparkWordCount.scala:40)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2010 bytes in 0 ms
14/04/15 02:33:49 INFO executor.Executor: Running task ID 2
14/04/15 02:33:49 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/15 02:33:49 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks
14/04/15 02:33:49 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in  0 ms
14/04/15 02:33:49 INFO executor.Executor: Serialized size of result for 2 is 1285
14/04/15 02:33:49 INFO executor.Executor: Sending result for 2 directly to driver
14/04/15 02:33:49 INFO executor.Executor: Finished task ID 2
14/04/15 02:33:49 INFO scheduler.TaskSetManager: Finished TID 2 in 27 ms on localhost (progress: 0/1)
14/04/15 02:33:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/04/15 02:33:49 INFO scheduler.DAGScheduler: Stage 0 (collect at SparkWordCount.scala:42) finished in 0.030 s
14/04/15 02:33:49 INFO spark.SparkContext: Job finished: collect at SparkWordCount.scala:42, took 0.593445631 s
(e,6), (f,1), (a,4), (t,2), (u,1), (r,2), (v,1), (b,1), (c,1), (h,1), (o,2), (l,1), (n,4), (p,2), (i,1)
[/code]
このように、Cloudera Managerで構築したCDH5のクラスタで、Apache SparkのWordCountの実行は比較的簡単です。皆さんも是非試してみて下さい。

おまけ – Cloudera Managerで Spark Masterを表示
spark_master (hadoop12) - Cloudera Manager

コメント