Hadoopにlzoの環境構築(3)

20日目です
先日はJavaのMapReduceを使って、lzoで圧縮されたファイルを正しく処理できるかどうかの実験を行いました。今回はHiveやPig、Impalaを使って実験します。LzoよりもRCFileやParquetなどを使って圧縮されたファイルのスプリットをやるべきかもしれませんが、本日は余力がないので簡単に。まずはファイルの確認から。ファイルは圧縮済み、indexは既に置いてあります。
[shell]
[training@localhost ~]$ hadoop fs -ls /user/training/1Glzo
Found 2 items
-rw-r–r–   1 training supergroup  189710462 2013-04-25 21:37 /user/training/1Glzo/1G.lzo
-rw-r–r–   1 training supergroup      30520 2013-04-25 21:37 /user/training/1Glzo/1G.lzo.index
[training@localhost ~]$
[/shell]

Hiveでlzoのファイルをワードカウント

Hiveを使ってlzo圧縮されたファイルWordCountしてみます。Hiveは(何となく)CSVのような形式のファイルを処理するイメージを持っている方も多いかもしれませんが、タブやカンマ区切りのファイルだけではなく、ログファイルやJSON形式のファイルなどもお手軽に処理できます。多分に漏れず、wordcountすることもできます。インターネットでも多くのサンプルコードが公開されているので、興味があれば検索してみると良いでしょう。
今回用いたHiveQLは下記の通りです。
[shell]
DROP TABLE 1Glzo;
DROP TABLE word_count;
SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;
CREATE EXTERNAL TABLE 1Glzo (line STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ STORED AS
INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" LOCATION ‘/user/training/1Glzo’;
CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, ‘\s’)) AS word FROM 1Glzo) w GROUP BY word
ORDER BY word;
[/shell]
肝は入力フォーマットです。Hiveには入力フォーマットとしてLzoのテキストを取ることができるので、今回は “com.hadoop.mapred.DeprecatedLzoTextInputFormat”を指定します。さぁ、実行してみましょう。
[shell]
training@localhost ~]$ hive -f /tmp/wordcount.q
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
Hive history file=/tmp/training/hive_job_log_training_201312210225_935986310.txt
OK
Time taken: 4.084 seconds
OK
Time taken: 0.028 seconds
OK
Time taken: 0.335 seconds
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201309171025_0016, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201309171025_0016
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201309171025_0016
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
2013-12-21 02:25:59,860 Stage-1 map = 0%,  reduce = 0%
2013-12-21 02:26:08,946 Stage-1 map = 3%,  reduce = 0%
2013-12-21 02:26:11,958 Stage-1 map = 5%,  reduce = 0%
(略)
[/shell]
快調に進んでいましたが、途中で処理が進まなくなってしまいました。ジョブトラッカーのログを見たところ、
[shell]
2013-12-21 00:00:00,220 WARN org.apache.hadoop.mapred.JobInProgress: No room for reduce task. Node tracker_localhost.localdomain:localhost.localdomain/127.0.0.1:47890 has 1574772736 bytes free; but we expect reduce input to take 2289072484
2013-12-21 00:00:00,526 WARN org.apache.hadoop.mapred.JobInProgress: No room for reduce task. Node tracker_localhost.localdomain:localhost.localdomain/127.0.0.1:47890 has 1574764544 bytes free; but we expect reduce input to take 2289072484
2013-12-21 00:00:00,827 WARN org.apache.hadoop.mapred.JobInProgress: No room for reduce task. Node tracker_localhost.localdomain:localhost.localdomain/127.0.0.1:47890 has 1574764544 bytes free; but we expect reduce input to take 2289072484
[/shell]
reduce用のディスクスペースが足りないようです。残念。
小さなデータセットでテストをやり直したところ、無事に完走しました。
[shell]
hive -f /tmp/wordcount.q2
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
Hive history file=/tmp/training/hive_job_log_training_201312211518_1160884487.txt
OK
Time taken: 1.995 seconds
OK
Time taken: 0.022 seconds
OK
Time taken: 0.654 seconds
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201309171025_0027, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201309171025_0027
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201309171025_0027
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2013-12-21 15:18:21,486 Stage-1 map = 0%,  reduce = 0%
2013-12-21 15:18:25,530 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.24 sec
2013-12-21 15:18:26,547 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.24 sec
2013-12-21 15:18:27,559 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.24 sec
2013-12-21 15:18:28,566 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.93 sec
2013-12-21 15:18:29,572 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.93 sec
MapReduce Total cumulative CPU time: 3 seconds 930 msec
Ended Job = job_201309171025_0027
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201309171025_0028, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201309171025_0028
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201309171025_0028
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2013-12-21 15:18:32,848 Stage-2 map = 0%,  reduce = 0%
2013-12-21 15:18:34,857 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.27 sec
2013-12-21 15:18:35,869 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.27 sec
2013-12-21 15:18:36,885 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.27 sec
2013-12-21 15:18:37,900 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.83 sec
2013-12-21 15:18:38,906 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.83 sec
2013-12-21 15:18:39,913 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.83 sec
MapReduce Total cumulative CPU time: 2 seconds 830 msec
Ended Job = job_201309171025_0028
Moving data to: hdfs://0.0.0.0:8020/user/hive/warehouse/word_counts
108151 Rows loaded to hdfs://0.0.0.0:8020/tmp/hive-training/hive_2013-12-21_15-18-18_416_3347184409148469838/-ext-10000
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 3.93 sec   HDFS Read: 0 HDFS Write: 0 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 2.83 sec   HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 760 msec
OK
Time taken: 21.554 seconds
[training@localhost ~]$
[/shell]

Pigでlzoのファイルをワードカウント

さて、次はPigです。Pigには残念ながらlzoに対応した入力フォーマットが同梱されていません。従って、カスタムの入力フォーマットが必要です。
幸いなことに twitter からelephant-bird がOSSで公開されているので、こちらを利用します。https://github.com/kevinweil/elephant-bird
が、手元の環境でビルドがうまくいかないので、別途試してみます。

コメント