hiveでcogroup

Programming Hiveのレビューをしています(2)

cogroupのサンプル、中途半端にhttps://cwiki.apache.org/Hive/tutorial.html#Tutorial-CoGroupsをコピーして修正しているようんなんだけど、何カ所か中途半端に修正してあって、そのままだと絶対に動かない。

Chapter 14 Calculating Cogroups (CDH4.1.2で検証)
サンプルデータ(cog.txt.1)

1 100 "2013-03-29"
2 102 "2013-03-15"
3 200 "2013-03-11"
4 213 "2013-02-29"
5 134 "2013-01-29"
6 189 "2013-03-30"

サンプルデータ(cog.txt.2)

1 3 "2013-03-19"
2 102 "2013-03-15"
3 200 "2013-03-11"
4 213 "2013-02-29"
5 8 "2013-01-29"
6 21 "2013-03-30"

実行例 (reduce_script は具体例がないので、暫定的に /bin/cat で良いかと。そうじゃないとスクリプトを分散キャッシュで配布しないといけないし)

hive> CREATE TABLE order_log (userid INT, orderid int, ts STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

hive> CREATE TABLE clicks_log (userid INT, id INT, ts STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

hive> CREATE TABLE log_analysis (uid INT, id int, reduced_val STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

hive> load data local inpath 'cog.txt1' INTO TABLE order_log;
hive> load data local inpath 'cog.txt2' INTO TABLE clicks_log;

hive> FROM (
> FROM (
> FROM order_log ol
> -- User Id, order Id, and timestamp:
> SELECT ol.userid AS uid, ol.orderid AS id, ol.ts AS ts
>
> UNION ALL
>
> FROM clicks_log cl
> SELECT cl.userid AS uid, cl.id AS id, cl.ts AS ts
>) union_msgs
>SELECT union_msgs.uid, union_msgs.id, union_msgs.ts
>CLUSTER BY union_msgs.uid, union_msgs.ts) mapout
>INSERT OVERWRITE TABLE log_analysis
>SELECT TRANSFORM(mapout.uid, mapout.id, mapout.ts) USING 'reduce_script' AS (uid, id, >reduced_val);

Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2013-03-30 01:00:49,616 Stage-1 map = 0%, reduce = 0%
2013-03-30 01:00:53,626 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 0.74 sec
2013-03-30 01:00:54,631 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.5 sec
2013-03-30 01:00:55,635 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.5 sec
2013-03-30 01:00:56,638 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.36 sec
2013-03-30 01:00:57,644 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.36 sec
MapReduce Total cumulative CPU time: 2 seconds 360 msec
Ended Job = job_201303152339_0086
Loading data to table default.log_analysis
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /user/hive/warehouse/log_analysis
12 Rows loaded to log_analysis
MapReduce Jobs Launched:
Job 0: Map: 2 Reduce: 1 Cumulative CPU: 2.36 sec HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 360 msec
OK
Time taken: 10.534 seconds
hive> select * from log_analysis;
OK
1 3 "2013-03-19"
1 100 "2013-03-29"
2 102 "2013-03-15"
2 102 "2013-03-15"
3 200 "2013-03-11"
3 200 "2013-03-11"
4 213 "2013-02-29"
4 213 "2013-02-29"
5 8 "2013-01-29"
5 134 "2013-01-29"
6 189 "2013-03-30"
6 21 "2013-03-30"
Time taken: 0.065 seconds
hive>

hiveのGeneralMR

Programming Hiveのレビューをしています

O’ReillyのProgramming Hiveは良本だと思いますが、原書は複数の人が書いているためか、動かないサンプルが多いです。
その中で、GeneralMRではJavaのコードをストリーミングで呼び出すことができます。(通常はHadoop Streamingではスクリプトを使うことが多いでしょうね)
グーグル先生に聞いても情報が少なかったので、手元で動作したサンプルを貼っておきます。

Chapter 14 GenericMR Tools for Streaming to Java (CDH4.1.2で検証)
サンプルデータ(source.txt.3)

1 this
1 that
1 that
1 zxc
1 zxc

実行例

hive> create table source3 (key string, value string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
hive> load data local inpath "./source.txt.3" INTO TABLE source3;
hive> FROM (
> FROM source3
> MAP value, key
> USING 'java -cp /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.1.jar >org.apache.hadoop.hive.contrib.mr.example.IdentityMapper'
> AS k, v
> CLUSTER BY k) map_output
>REDUCE k, v
>USING 'java -cp /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.1.jar >org.apache.hadoop.hive.contrib.mr.example.WordCountReduce'
>AS k, v;

2013-03-29 23:45:29,437 Stage-1 map = 0%, reduce = 0%
2013-03-29 23:45:31,444 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.65 sec
2013-03-29 23:45:32,449 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.65 sec
2013-03-29 23:45:33,454 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.33 sec
2013-03-29 23:45:34,457 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.33 sec
2013-03-29 23:45:35,460 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.33 sec
MapReduce Total cumulative CPU time: 1 seconds 330 msec
Ended Job = job_201303152339_0081
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 1.33 sec HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 330 msec
OK
that 2
this 1
zxc 2
Time taken: 8.4 seconds
hive>

CDH4におけるHBaseのリージョンサイズ

CDH4ではHBASE-5574が適用されているため、HBaseのリージョンサイズは256MBではなく10GBに変更されています。

(追記)
JIRA
https://issues.apache.org/jira/browse/HBASE-4365
https://issues.apache.org/jira/browse/HBASE-5574

(追記2)2013-06-20
CDH4.2 (HBase 0.94.2) のchanges.logには記載されていないが、CDH4.2のソースに含まれているCHANGES.txtには以下の記載がある。CDH4.1.3には含まれないため、CDH4.2 (0.94以降)で対応ということになる。

[HBASE-4365] - Add a decent heuristic for region size
[HBASE-5574] - DEFAULT_MAX_FILE_SIZE defaults to a negative value

#研修のテキストが更新されていませんでした、、すみません。。。

HBase 0.94 での ResionServer Queue dump 機能

CDH4.2ではHBaseのバージョンが0.92.1->0.94.2へと変更になっています。主な変更点は
https://ccp.cloudera.com/display/CDH4DOC/New+Features+in+CDH4#NewFeaturesinCDH4-HBase420
を参照して下さい。今回はちょっとした機能についてご紹介します。

Resion Server Queue dump

リージョンサーバではコンパクションやスプリットが発生し、パフォーマンスに影響を及ぼします。
このときの「キュー」をダンブする機能が HBASE-2730 で実装されました。CDH4.2にも含まれています。

有効化するには hbase-site.xml に hbase.regionserver.servlet.show.queuedump
true
のように記述し、RegionServerを再起動します。

確認はブラウザから、
http://:60030/dump
にアクセスして下さい。一番下にキューの状態が表示されます。

RS Queue:
===========================================================
Compaction/Split Queue summary: compaction_queue=(0:0), split_queue=0
Compaction/Split Queue dump:
LargeCompation Queue:

SmallCompation Queue:

Split Queue:

Flush Queue summary: flush_queue=0
Flush Queue Queue dump:
Flush Queue:

このようなデバッグのための機能が強化されるのは嬉しいですね。

(参考)
手元の環境では全てゼロ、と分かりにくいので、JIRAに公開されていたスクリーンショットを貼っておきます
HBase dump queue
オリジナル画像はこちら