Hadoopのいろんな言語でwordcount(3) : Apache Crunch

Hadoop関連(全部俺) Advent Calendar 2014:16日目の記事です

#あと8日。そろそろ気力と体力の限界が、、、

wordcountの最終回です。今日はApache Crunchです。CrunchでMapReduceとSparkの両方でwordcountを実行してみましょう。

githubにコードを置きました:https://github.com/kawamon/wordcount.git

Crunch (MapReduce)

Apache CrunchはApacheプロジェクトのOSSで、GoogleのFlumeJavaをベースにしています。Crunchを使うことで容易に MapReduce(と現在はSpark)のパイプライン(と、現在はSparkのプログラム)を記述することができるライブラリです。(日本語の記事: Apache Crunch:MapReduceプログラミングを容易にするJavaライブラリ

CrunchはClouderaのチーフデータサイエンティスト、Josh Will氏が開発、メンテナンスしています。海外のデータサイエンティストって、必要なツールも自分で開発してるんですよね。(Cloudera OryxImpyla、他)。凄いな。

Crunchの参考リンク

http://crunch.apache.org/getting-started.html

WordCountのコードはcrunchのページを参考に、デモ用のコードをダウンロードして行いました。

git clone http://github.com/jwills/crunch-demo

 

src/main/java/com/example/WordCount.java

package com.example;

import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A word count example for Apache Crunch, based on Crunch's example projects.
 */
public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar"
                                      + " [generic options] input output");
            System.err.println();
            GenericOptionsParser.printGenericCommandUsage(System.err);
            return 1;
        }

        String inputPath = args[0];
        String outputPath = args[1];

        // Create an object to coordinate pipeline creation and execution.
        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
//        Pipeline pipeline = new SparkPipeline("local","sort");

        // Reference a given text file as a collection of Strings.
        PCollection<String> lines = pipeline.readTextFile(inputPath);

        // Define a function that splits each line in a PCollection of Strings into
        // a PCollection made up of the individual words in the file.
        // The second argument sets the serialization format.
        PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

        // Take the collection of words and remove known stop words.
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        // The count method applies a series of Crunch primitives and returns
        // a map of the unique words in the input PCollection to their counts.
        PTable<String, Long> counts = noStopWords.count();

        // Instruct the pipeline to write the resulting counts to a text file.
        pipeline.writeTextFile(counts, outputPath);

        // Execute the pipeline as a MapReduce.
        PipelineResult result = pipeline.done();

        return result.succeeded() ? 0 : 1;
    }
}

 

src/main/java/com/example/StopWordFilter.java

package com.example;

import java.util.Set;

import org.apache.crunch.FilterFn;

import com.google.common.collect.ImmutableSet;

/**
 * A filter that removes known stop words.
 */
public class StopWordFilter extends FilterFn<String> {

    // English stop words, borrowed from Lucene.
    private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] {
        "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it",
        "no", "not", "of", "on", "or", "s", "such",
        "t", "that", "the", "their", "then", "there", "these",
        "they", "this", "to", "was", "will", "with"
    });

    @Override
    public boolean accept(String word) {
        return !STOP_WORDS.contains(word);
    }
}

 

src/main/java/com/example/Tokenizer.java

package com.example;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;

import com.google.common.base.Splitter;

/**
 * Splits a line of text, filtering known stop words.
 */
public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter.onPattern("\\s+").omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

ビルド

Crunchのサンプルコードはmavenでビルドするようになっていたので、そのまま利用します。

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building crunch-demo 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo ---
[INFO] Compiling 3 source files to /home/cloudera/work/crunch/crunch-demo/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo ---
[INFO] Compiling 2 source files to /home/cloudera/work/crunch/crunch-demo/target/test-classes
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo ---
(略)

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.738s
[INFO] Finished at: Sun Dec 14 20:28:27 PST 2014
[INFO] Final Memory: 33M/311M
[INFO] ------------------------------------------------------------------------

実行

$ hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar  input crunch.output
14/12/14 20:30:53 INFO impl.FileTargetImpl: Will write output files to new path: crunch.output
14/12/14 20:30:54 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
14/12/14 20:30:55 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 20:30:56 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize
14/12/14 20:30:56 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 20:30:56 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50
14/12/14 20:30:56 INFO mapreduce.JobSubmitter: number of splits:1
14/12/14 20:30:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0027
14/12/14 20:30:57 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0027
14/12/14 20:30:57 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/
14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Running job "com.example.WordCount: Text(input)+S0+S1+Aggregate.count+GBK+combine+asText+Text... (1/1)"
14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Job status available at: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/
$

結果

$ hadoop fs -cat crunch.output/part-r-00000
[Bye,1]
[Goodbye,1]
[Hadoop,2]
[Hello,2]
[World,2]

Crunch (Spark)

そして最後はCrunchでSparkです。ソースコードはJavaとほとんど同じで、違うところはわずか1行だけです。 (MRPipeline -> SparkPipeline)

-//        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
+        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
-        Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");
+        //Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");

 

CrunchでSparkアプリを実行するための情報が少なく苦労しましたが、何とか動きました。

下記以外のファイルは同一です。

src/main/java/com/example/WordCount.java

package com.example;

import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A word count example for Apache Crunch, based on Crunch's example projects.
 */
public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar"
                                      + " [generic options] input output");
            System.err.println();
            GenericOptionsParser.printGenericCommandUsage(System.err);
            return 1;
        }

        String inputPath = args[0];
        String outputPath = args[1];

        // Create an object to coordinate pipeline creation and execution.
//        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
        //Pipeline pipeline = new SparkPipeline("local","CrunchWordCount");
        Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");

        // Reference a given text file as a collection of Strings.
        PCollection<String> lines = pipeline.readTextFile(inputPath);

        // Define a function that splits each line in a PCollection of Strings into
        // a PCollection made up of the individual words in the file.
        // The second argument sets the serialization format.
        PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

        // Take the collection of words and remove known stop words.
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        // The count method applies a series of Crunch primitives and returns
        // a map of the unique words in the input PCollection to their counts.
        PTable<String, Long> counts = noStopWords.count();

        // Instruct the pipeline to write the resulting counts to a text file.
        pipeline.writeTextFile(counts, outputPath);

        // Execute the pipeline as a MapReduce.
        PipelineResult result = pipeline.done();

        return result.succeeded() ? 0 : 1;
    }
}

ビルド

ビルドもmavenを使用します。全く同じです。

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building crunch-demo 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo ---
[INFO] Compiling 1 source file to /home/cloudera/work/crunch/crunch-demo/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo ---
[INFO] Surefire report directory: /home/cloudera/work/crunch/crunch-demo/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.StopWordFilterTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.139 sec
Running com.example.TokenizerTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.233 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ crunch-demo ---
[INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.3:single (make-assembly) @ crunch-demo ---
[INFO] Reading assembly descriptor: src/main/assembly/hadoop-job.xml
[INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.243s
[INFO] Finished at: Mon Dec 15 07:23:26 PST 2014
[INFO] Final Memory: 32M/297M
[INFO] ------------------------------------------------------------------------

実行

実行前に SPARK_CLASSPATH 環境変数を設定しておきます。これをしておかないと実行中にClassNotFoundの例外でエラーになります。

$ export SPARK_CLASSPATH=/usr/lib/crunch/*
$ spark-submit --class com.example.WordCount target/crunch-demo-1.0-SNAPSHOT-job.jar  input crunch.output
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/15 07:28:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/12/15 07:28:47 INFO FileTargetImpl: Will write output files to new path: crunch.output
14/12/15 07:28:47 WARN SparkConf:
SPARK_CLASSPATH was detected (set to '/usr/lib/crunch/*').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

14/12/15 07:28:47 WARN SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/lib/crunch/*' as a work-around.
14/12/15 07:28:47 WARN SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/lib/crunch/*' as a work-around.
14/12/15 07:28:47 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.220 instead (on interface eth1)
14/12/15 07:28:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/12/15 07:28:47 INFO SecurityManager: Changing view acls to: cloudera
14/12/15 07:28:47 INFO SecurityManager: Changing modify acls to: cloudera
14/12/15 07:28:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
14/12/15 07:28:47 INFO Slf4jLogger: Slf4jLogger started
14/12/15 07:28:47 INFO Remoting: Starting remoting
14/12/15 07:28:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.2.220:46973]
14/12/15 07:28:47 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.2.220:46973]
14/12/15 07:28:47 INFO Utils: Successfully started service 'sparkDriver' on port 46973.
14/12/15 07:28:48 INFO SparkEnv: Registering MapOutputTracker
14/12/15 07:28:48 INFO SparkEnv: Registering BlockManagerMaster
14/12/15 07:28:48 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141215072848-a8ab
14/12/15 07:28:48 INFO Utils: Successfully started service 'Connection manager for block manager' on port 38854.
14/12/15 07:28:48 INFO ConnectionManager: Bound socket to port 38854 with id = ConnectionManagerId(192.168.2.220,38854)
14/12/15 07:28:48 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
14/12/15 07:28:48 INFO BlockManagerMaster: Trying to register BlockManager
14/12/15 07:28:48 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38854 with 265.4 MB RAM
14/12/15 07:28:48 INFO BlockManagerMaster: Registered BlockManager
14/12/15 07:28:48 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a04d9dc5-d340-4237-bb12-cf45e390bbd5
14/12/15 07:28:48 INFO HttpServer: Starting HTTP Server
14/12/15 07:28:48 INFO Utils: Successfully started service 'HTTP file server' on port 46908.
14/12/15 07:28:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
14/12/15 07:28:48 INFO SparkUI: Started SparkUI at http://192.168.2.220:4040
14/12/15 07:28:48 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/crunchwordcount-1418657328679
14/12/15 07:28:49 INFO SparkContext: Added JAR file:/home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar at http://192.168.2.220:46908/jars/crunch-demo-1.0-SNAPSHOT-job.jar with timestamp 1418657329490
14/12/15 07:28:49 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077...
14/12/15 07:28:49 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=0, maxMem=278302556
14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.0 KB, free 265.3 MB)
14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=65576, maxMem=278302556
14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.3 MB)
14/12/15 07:28:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB)
14/12/15 07:28:49 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141215072850-0001
14/12/15 07:28:50 INFO AppClient$ClientActor: Executor added: app-20141215072850-0001/0 on worker-20141215063206-192.168.2.220-7078 (192.168.2.220:7078) with 2 cores
14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141215072850-0001/0 on hostPort 192.168.2.220:7078 with 2 cores, 512.0 MB RAM
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(260900) called with curMem=86133, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 254.8 KB, free 265.1 MB)
14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now LOADING
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(21040) called with curMem=347033, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.5 KB, free 265.1 MB)
14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38854 (size: 20.5 KB, free: 265.4 MB)
14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now RUNNING
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=368073, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 64.0 KB, free 265.0 MB)
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=433649, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.0 MB)
14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB)
14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
14/12/15 07:28:50 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/12/15 07:28:50 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at SparkRuntime.java:321
14/12/15 07:28:50 INFO deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize
14/12/15 07:28:50 INFO FileInputFormat: Total input paths to process : 2
14/12/15 07:28:50 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50
14/12/15 07:28:50 INFO DAGScheduler: Registering RDD 9 (mapToPair at PGroupedTableImpl.java:115)
14/12/15 07:28:50 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) with 1 output partitions (allowLocal=false)
14/12/15 07:28:50 INFO DAGScheduler: Final stage: Stage 0(saveAsNewAPIHadoopFile at SparkRuntime.java:321)
14/12/15 07:28:50 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/12/15 07:28:50 INFO DAGScheduler: Missing parents: List(Stage 1)
14/12/15 07:28:50 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115), which has no missing parents
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(6984) called with curMem=454206, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.8 KB, free 265.0 MB)
14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(3636) called with curMem=461190, maxMem=278302556
14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.6 KB, free 265.0 MB)
14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38854 (size: 3.6 KB, free: 265.3 MB)
14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
14/12/15 07:28:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115)
14/12/15 07:28:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/12/15 07:28:53 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.220:46808/user/Executor#-1997953566] with ID 0
14/12/15 07:28:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.220, ANY, 1556 bytes)
14/12/15 07:28:53 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38205 with 265.4 MB RAM
14/12/15 07:28:55 INFO ConnectionManager: Accepted connection from [192.168.2.220/192.168.2.220:47191]
14/12/15 07:28:55 INFO SendingConnection: Initiating connection to [/192.168.2.220:38205]
14/12/15 07:28:55 INFO SendingConnection: Connected to [/192.168.2.220:38205], 1 messages pending
14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38205 (size: 3.6 KB, free: 265.4 MB)
14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38205 (size: 20.5 KB, free: 265.4 MB)
14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38205 (size: 20.1 KB, free: 265.4 MB)
14/12/15 07:28:57 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 4136 ms on 192.168.2.220 (1/1)
14/12/15 07:28:57 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/15 07:28:57 INFO DAGScheduler: Stage 1 (mapToPair at PGroupedTableImpl.java:115) finished in 6.832 s
14/12/15 07:28:57 INFO DAGScheduler: looking for newly runnable stages
14/12/15 07:28:57 INFO DAGScheduler: running: Set()
14/12/15 07:28:57 INFO DAGScheduler: waiting: Set(Stage 0)
14/12/15 07:28:57 INFO DAGScheduler: failed: Set()
14/12/15 07:28:57 INFO DAGScheduler: Missing parents for Stage 0: List()
14/12/15 07:28:57 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307), which is now runnable
14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(69448) called with curMem=464826, maxMem=278302556
14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 67.8 KB, free 264.9 MB)
14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(25227) called with curMem=534274, maxMem=278302556
14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 24.6 KB, free 264.9 MB)
14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38854 (size: 24.6 KB, free: 265.3 MB)
14/12/15 07:28:57 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0
14/12/15 07:28:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307)
14/12/15 07:28:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/12/15 07:28:57 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, 192.168.2.220, PROCESS_LOCAL, 1022 bytes)
14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38205 (size: 24.6 KB, free: 265.3 MB)
14/12/15 07:28:57 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.220:41851
14/12/15 07:28:57 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 131 bytes
14/12/15 07:28:58 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 534 ms on 192.168.2.220 (1/1)
14/12/15 07:28:58 INFO DAGScheduler: Stage 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) finished in 0.534 s
14/12/15 07:28:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/15 07:28:58 INFO SparkContext: Job finished: saveAsNewAPIHadoopFile at SparkRuntime.java:321, took 7.709089524 s
14/12/15 07:28:58 INFO SparkUI: Stopped Spark web UI at http://192.168.2.220:4040
14/12/15 07:28:58 INFO DAGScheduler: Stopping DAGScheduler
14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Shutting down all executors
14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Asking each executor to shut down
14/12/15 07:28:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(192.168.2.220,38205)
14/12/15 07:28:58 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(192.168.2.220,38205)
14/12/15 07:28:58 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(192.168.2.220,38205) not found
14/12/15 07:28:59 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
14/12/15 07:28:59 INFO ConnectionManager: Selector thread was interrupted!
14/12/15 07:28:59 INFO ConnectionManager: ConnectionManager stopped
14/12/15 07:28:59 INFO MemoryStore: MemoryStore cleared
14/12/15 07:28:59 INFO BlockManager: BlockManager stopped
14/12/15 07:28:59 INFO BlockManagerMaster: BlockManagerMaster stopped
14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/12/15 07:28:59 INFO SparkContext: Successfully stopped SparkContext

結果

$ hadoop fs -cat crunch.output/part-r-00000
[Hello,2]
[Bye,1]
[World,2]
[Goodbye,1]
[Hadoop,2]

ということで、Apache CrunchはMapReduceとSparkのソースコードがほとんど共通のまま実行することができました。情報量が少ないところが取っ付きにくいですが。これはなかなか強力ですね。

おまけ

Spark Web UIのスクリーンショット

sparkwordcount1sparkwordcount2

Pocket

Leave a Reply

Your email address will not be published. Required fields are marked *

CAPTCHA


日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)