hadoop-logo
ようこそ Tech blogへ!
「これからHadoopを勉強しよう」という方はまず下記のページから

サイトの移行に伴って画像が表示されないなどの不具合が生じています

Hadoopのいろんな言語でwordcount(1)

Hadoop関連(全部俺) Advent Calendar 2014:14日目の記事です
Apache Crunchを少し調べたついでに、Hadoopでいろんな言語を用いてwordcountを実行してみました。まずはMapReduce, HadoopStreamingHivePigwordcountを実行します。
(追記)githubにコードを置きました:https://github.com/kawamon/wordcount.git
ワードカウントの余談
ワードカウントは、HadoopのMapReduceの最初の説明によく利用される、いわゆる”Hello World”のような意味をもちます。
HadoopのMapReduceではワードカウントがサンプルとして取り上げられる理由が曖昧なことが多いのですが、なぜワードカウントなのでしょうか?
いわゆる量の多いビッグデータを処理するには2つの課題があります。

  1. ストレージに保存された大量のデータをCPUで処理するために読み込むため、データの移動に時間がかかる
  2. 1台のマシンで実行するには時間がかかりすぎる(メモリに乗り切らない、あるいは1台のCPUではまかなえないほど量が多い)

このような課題の解決には、1台のマシンをスケールアップして対応するには限界があるかもしれません。そのため、複数のマシンを使ってスケールアウトする、「分散処理」が力を発揮します。ワードカウントは大量のデータを「分散」して処理できるサンプルとして取り上げられています。

では、先日セットアップしたCloudera Quickstart VMを使って実行してみましょう。

準備

まずはHDFSにテスト用のデータをコピーします。今回はCrunchのサンプルで利用されていた、シンプルな2つのファイル (file01, file02)を使用しました。※単純なファイルを使用したのは、結果を比較しやすくするためです。
[shell]
$ hadoop fs -cat input/file01
Hello World Bye World
$ hadoop fs -cat input/file02
Hello Hadoop Goodbye Hadoop
[/shell]


MapReduce (Java)

まずは定番のMapReduce (Java)。New APIによるWordCountです。下記のチュートリアルを参考にしましたが、Old APIだったので少々変更し、StringTokenizerを使用しないようにしています。
http://www.cloudera.com/content/cloudera/en/documentation/hadoop-tutorial/CDH5/Hadoop-Tutorial/ht_wordcount1_source.html

WordCount.java

[java]
package org.myorg;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: %s [generic options] <input dir> <output dir>\n", getClass()
.getSimpleName());
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName(this.getClass().getName());
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCount(), args);
System.exit(exitCode);
}
}
[/java]

WordMapper.java

[java]
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s = value.toString();
for (String w : s.split("\\W+")) {
if (w.length() > 0) {
word.set(w);
context.write(word, one);
}
}
}
}
[/java]

WordReducer.java

[java]
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int wordCount = 0;
for (IntWritable value : values) {
wordCount += value.get();
}
context.write(key, new IntWritable(wordCount));
}
}
[/java]

コンパイルと実行

[shell]
$ javac -classpath `hadoop classpath` org/myorg/*.java
Note: org/myorg/WordCount.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
[cloudera@quickstart mr_java]$ jar cvf wc.jar org/myorg/*.class
added manifest
adding: org/myorg/WordCount.class(in = 2253) (out= 1132)(deflated 49%)
adding: org/myorg/WordMapper.class(in = 1915) (out= 810)(deflated 57%)
adding: org/myorg/WordReducer.class(in = 1602) (out= 670)(deflated 58%)
[cloudera@quickstart mr_java]$ hadoop jar wc.jar org.myorg.WordCount input output
14/12/14 05:08:37 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:08:37 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 05:08:38 INFO mapreduce.JobSubmitter: number of splits:2
14/12/14 05:08:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0014
14/12/14 05:08:38 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0014
14/12/14 05:08:38 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0014/
14/12/14 05:08:38 INFO mapreduce.Job: Running job: job_1418545807639_0014
14/12/14 05:08:51 INFO mapreduce.Job: Job job_1418545807639_0014 running in uber mode : false
14/12/14 05:08:51 INFO mapreduce.Job: map 0% reduce 0%
14/12/14 05:09:01 INFO mapreduce.Job: map 100% reduce 0%
14/12/14 05:09:11 INFO mapreduce.Job: map 100% reduce 100%
14/12/14 05:09:12 INFO mapreduce.Job: Job job_1418545807639_0014 completed successfully
14/12/14 05:09:12 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=87
FILE: Number of bytes written=319063
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=296
HDFS: Number of bytes written=41
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=13884
Total time spent by all reduces in occupied slots (ms)=4067
Total time spent by all map tasks (ms)=13884
Total time spent by all reduce tasks (ms)=4067
Total vcore-seconds taken by all map tasks=13884
Total vcore-seconds taken by all reduce tasks=4067
Total megabyte-seconds taken by all map tasks=14217216
Total megabyte-seconds taken by all reduce tasks=4164608
Map-Reduce Framework
Map input records=2
Map output records=8
Map output bytes=82
Map output materialized bytes=101
Input split bytes=246
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=101
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=232
CPU time spent (ms)=2460
Physical memory (bytes) snapshot=700850176
Virtual memory (bytes) snapshot=2683498496
Total committed heap usage (bytes)=510656512
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=50
File Output Format Counters
Bytes Written=41
[/shell]

結果

[shell]
$ hadoop fs -cat output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
[/shell]


HadoopStreaming

PerlでHadoopStreamingしてみます。

mapper.pl

[perl]
#!/usr/bin/env perl
while (<>) {
chomp();
(@words) = split /\W+/;
foreach $w (@words) {
print "$w\t1\n"
}
}
[/perl]

reduce.pl

[perl]
#!/usr/bin/env perl
$sum = 0;
$last = "";
while(<>) {
chomp;
($key,$value) = split /\t/;
$last = $key if $last eq "";
if ($last ne $key) {
print "$last\t$sum\n";
$last = $key;
$sum = 0;
}
$sum += $value;
}
print "$key\t$sum\n";
[/perl]

実行

[shell]
$ $ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -mapper mapper.pl -reducer reduce.pl -file mapper.pl -file reduce.pl -input input -output streamoutput
14/12/14 05:53:58 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.pl, reduce.pl] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar] /tmp/streamjob8660928725375064201.jar tmpDir=null
14/12/14 05:53:59 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:54:00 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:54:01 INFO mapred.FileInputFormat: Total input paths to process : 2
14/12/14 05:54:01 INFO mapreduce.JobSubmitter: number of splits:3
14/12/14 05:54:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0025
14/12/14 05:54:01 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0025
14/12/14 05:54:01 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0025/
14/12/14 05:54:01 INFO mapreduce.Job: Running job: job_1418545807639_0025
14/12/14 05:54:13 INFO mapreduce.Job: Job job_1418545807639_0025 running in uber mode : false
14/12/14 05:54:13 INFO mapreduce.Job: map 0% reduce 0%
14/12/14 05:54:26 INFO mapreduce.Job: map 100% reduce 0%
14/12/14 05:54:36 INFO mapreduce.Job: map 100% reduce 100%
14/12/14 05:54:37 INFO mapreduce.Job: Job job_1418545807639_0025 completed successfully
14/12/14 05:54:37 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=83
FILE: Number of bytes written=437439
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=383
HDFS: Number of bytes written=41
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=33174
Total time spent by all reduces in occupied slots (ms)=3734
Total time spent by all map tasks (ms)=33174
Total time spent by all reduce tasks (ms)=3734
Total vcore-seconds taken by all map tasks=33174
Total vcore-seconds taken by all reduce tasks=3734
Total megabyte-seconds taken by all map tasks=33970176
Total megabyte-seconds taken by all reduce tasks=3823616
Map-Reduce Framework
Map input records=2
Map output records=8
Map output bytes=66
Map output materialized bytes=119
Input split bytes=330
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=119
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=222
CPU time spent (ms)=3050
Physical memory (bytes) snapshot=967741440
Virtual memory (bytes) snapshot=3570974720
Total committed heap usage (bytes)=719847424
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=53
File Output Format Counters
Bytes Written=41
14/12/14 05:54:37 INFO streaming.StreamJob: Output directory: streamoutput
[/shell]

結果

[shell]
$ hadoop fs -cat streamoutput/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
[/shell]


Hive

続いてHiveです。プログラミングHiveを参考にしています。Hiveではテキストファイルを外部テーブルとしました。

HiveQL

[shell]
DROP TABLE docs;
CREATE EXTERNAL TABLE docs (line STRING) LOCATION ‘/user/cloudera/input’;
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, ‘ ‘)) AS word FROM docs) w GROUP BY word
ORDER BY word;
[/shell]

実行と結果

[shell]
hive> SELECT word, count(1) AS count FROM
> (SELECT explode(split(line, ‘ ‘)) AS word FROM docs) w GROUP BY word
> ORDER BY word;
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1418545807639_0019, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0019/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1418545807639_0019
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-12-14 05:23:53,107 Stage-1 map = 0%, reduce = 0%
2014-12-14 05:24:00,570 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.29 sec
2014-12-14 05:24:10,901 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.49 sec
MapReduce Total cumulative CPU time: 2 seconds 490 msec
Ended Job = job_1418545807639_0019
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1418545807639_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1418545807639_0020
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2014-12-14 05:24:26,302 Stage-2 map = 0%, reduce = 0%
2014-12-14 05:24:33,842 Stage-2 map = 100%, reduce = 0%, Cumulative CPU 1.04 sec
2014-12-14 05:24:43,114 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 2.26 sec
MapReduce Total cumulative CPU time: 2 seconds 260 msec
Ended Job = job_1418545807639_0020
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.49 sec HDFS Read: 337 HDFS Write: 217 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 2.26 sec HDFS Read: 594 HDFS Write: 41 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 750 msec
OK
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Time taken: 66.765 seconds, Fetched: 5 row(s)
hive>
[/shell]


Pig

次はPigです。

PigLatinスクリプト

[shell]
docs = LOAD ‘/user/cloudera/input’ AS (line:chararray);
words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;
groupd = GROUP words BY word;
wordcount = FOREACH groupd GENERATE group, COUNT(words);
DUMP wordcount;
[/shell]

実行と結果

[shell]
grunt> docs = LOAD ‘/user/cloudera/input’ AS (line:chararray);
grunt> words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;
grunt> groupd = GROUP words BY word;
grunt> wordcount = FOREACH groupd GENERATE group, COUNT(words);
grunt> DUMP wordcount;
2014-12-14 05:27:00,067 [main] INFO org.apache.pig.tools.pigstats.ScriptState – Pig features used in the script: GROUP_BY
2014-12-14 05:27:00,112 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer – {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2014-12-14 05:27:00,230 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler – File concatenation threshold: 100 optimistic? false
(略)
2014-12-14 05:27:42,341 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat – Total input paths to process : 1
2014-12-14 05:27:42,341 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths to process : 1
(Bye,1)
(Hello,2)
(World,2)
(Hadoop,2)
(Goodbye,1)
grunt>
[/shell]
長くなったので続きは(多分)明日。

コメント