Hadoop入門

Featured

hadoop-logoようこそ Tech blogへ!これからHadoopを勉強しよう、という方の訪問が多いようなので、そのような方はまず下記のページからどうぞ。

HDFSの新しい機能3つ

HDFSは分散ファイルシステムとして確固たる地位を築きましたが、NFSのサポートやスナップショットなど、多くの機能が追加されています。

その中で、今後 Hadoop 3 で重要となる「HDFSのイレイジャーコーディング」を含めたClouderaのHDFSに関するブログ記事を日本語に翻訳しました。

全3回にわたって掲載する予定なので、興味があればご覧ください。

第1弾:Apache HadoopのHDFS Erasure Codingの紹介

第2弾:How-to: Apache Hadoopで新しいHDFS DataNodeディスク間バランサを使用する

第3弾:HDFSのメンテナンスステート

Apache SparkでApache Kuduを利用する

Apache Kudu 1.3

最近Apache Kuduのリードとライトの流れについてのブログが公開され、昨日は日本語でもウェビナーが開催されたりと盛り上がってきた「分散ストレージエンジン」のApache Kuduですが、SQLでのアクセスにはApache Impala (Incubating) との組み合わせが便利です。他方、Apache Sparkからも簡単に利用できます。

Kudu Quickstart VMを利用して、まずはチュートリアルにしたがって Impala からデータを投入し、クエリした後にSparkのDataFrameからアクセスしてみました。以下の手順は仮想マシン上で試せるほど簡単なので、プレミアムフライデーで帰る前に試せるぐらい簡単ですね。

1. Kudu Quickstart VMのチュートリアルを実行する

チュートリアルに従ってデータを作成しました。

なお、チュートリアルと結果が違うのは、データ投入中にDELETE の実験をしたからです。

[demo@quickstart ~]$ impala-shell 
Starting Impala Shell without Kerberos authentication
Connected to quickstart.cloudera:21000
Server version: impalad version 2.8.0-cdh5.11.0 RELEASE (build e09660de6b503a15f07e84b99b63e8e745854c34)
***********************************************************************************
Welcome to the Impala shell.
(Impala Shell v2.8.0-cdh5.11.0 (e09660d) built on Wed Apr  5 19:51:24 PDT 2017)

When pretty-printing is disabled, you can use the '--output_delimiter' flag to set
the delimiter for fields in the same row. The default is ','.
***********************************************************************************
[quickstart.cloudera:21000] > SELECT * FROM sfmta ORDER BY speed DESC LIMIT 1;
Query: select * FROM sfmta ORDER BY speed DESC LIMIT 1
Query submitted at: 2017-04-27 22:31:57 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=14bb0d30cdfc1d4:59d3e08a00000000
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| report_time | vehicle_tag | longitude          | latitude          | speed             | heading |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| 1357022016  | 8522        | -122.4538803100586 | 37.74539184570312 | 65.27799987792969 | 24      |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
Fetched 1 row(s) in 0.42s
[quickstart.cloudera:21000] > SELECT COUNT(*) FROM sfmta;
Query: select COUNT(*) FROM sfmta
Query submitted at: 2017-04-27 22:33:03 (Coordinator: http://quickstart.cloudera:25000)
Query progress can be monitored at: http://quickstart.cloudera:25000/query_plan?query_id=e4812eb162470dd:8d087b0200000000
+----------+
| count(*) |
+----------+
| 842279   |
+----------+
Fetched 1 row(s) in 0.20s
[quickstart.cloudera:21000] > 


2. Apache Spark からアクセスする

Spark統合の参考ページはこちら

なお、テーブル名は Kudu の webui で調べました。
http://quickstart.cloudera:8051/

$ spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0
(略)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
17/04/27 22:52:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc (master = local[*], app id = local-1493358775875).
17/04/27 22:52:57 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.

scala> import org.apache.kudu.spark.kudu._
import org.apache.kudu.spark.kudu._

scala> val df = sqlContext.read.options(Map("kudu.master=" -> "quickstart.cloudera:7051","kudu.table" -> "impala::default.sfmta")).kudu
df: org.apache.spark.sql.DataFrame = [report_time: bigint, vehicle_tag: int, longitude: float, latitude: float, speed: float, heading: float]

scala> df.count
res0: Long = 842279                                                             

scala> df.show(5)
+-----------+-----------+----------+--------+------+-------+
|report_time|vehicle_tag| longitude|latitude| speed|heading|
+-----------+-----------+----------+--------+------+-------+
| 1356998401|       1479|-122.44695| 37.7222|   0.0|   51.0|
| 1356998401|       1525|-122.39674|37.79291|   0.0| 226.05|
| 1356998401|       1532|-122.45711|37.75034|14.305| 218.33|
| 1356998401|       1549|-122.43752| 37.7611|   0.0|  49.24|
| 1356998401|       5453|-122.42894|37.76705|   0.0|    0.0|
+-----------+-----------+----------+--------+------+-------+
only showing top 5 rows


scala> df.sort($"speed".desc).select("longitude","latitude","speed").show(3)
+----------+--------+------+
| longitude|latitude| speed|
+----------+--------+------+
|-122.45388|37.74539|65.278|
|-122.48861| 37.9074|  62.5|
|-122.48026|37.89245|55.833|
+----------+--------+------+
only showing top 3 rows

まとめ

SQLでアクセスしたい場合はImpalaから行うと便利ですが、データエンジニアリングを行いたい場合など、SQL以外でのアクセスにはSparkも便利ですね。

そうそう、Apache Sparkのパフォーマンスのベンチマーク結果を翻訳して公開しました。興味がある方はこちらもどうぞ。

Apache Impala、同時実行時の速度において従来の分析データベースやSQL on Hadoopに圧勝

 

Cloudera 5.11リリース

Cloudera 5.11が出たので久しぶりの更新です。

Hadoopも11年目を迎え、日本でも幅広い業種で使われるようになりました。数年前と比較すると圧倒的に使いやすくなっているので、過去に評価してそのままになっている方、既存の基盤で困っている方、そろそろ改めて検証しなおしてみてはいかがでしょうか?

例えば数年前と比べると、 (続きを読む)

CERNで実施したベンチマークが公開されています

CERNが公開した、

「Apache Hadoopエコシステムにおける、異なるファイル形式とストレージエンジンのパフォーマンス比較」

を日本語化して公開しました。

Apache Avro, Apache Parquet, Apache HBase, Apache Kuduそれぞれの特性が興味深いです。取り込み重視なのか分析重視なのか、長期保存目的か、はたまた折衷なのかによって何を選択するべきかの参考になります。

これは必読です!

 

 

SparkのWeb UIを調べてみた

SparkのWeb UIに記載されている項目の意味について(日本語で)まとまっている情報がなかったのでまとめてみました。(Spark 1.6ベース)

Spark 2.xへの対応と、SparkSQL、SparkStreamingは別途記載する予定。

間違いを見つけたらコメントお願いします。m(__)m

下記のスクリーンショットは原則としてDFSReadWriteTestサンプルを用いて実行し、網羅できない範囲はサンプルコードを用いて実行しています。スクリンショットをクリックするとポップアップします。

目次

1. メイン画面
2. ジョブの詳細
3. ステージの詳細
4. 永続化に関する情報 (Storage)
5. 環境 (Environment)
6. エグゼキュタ (Executors)
7. 参考資料

HUEについてのまとめ 2016年版

2016年現在、HadoopやHadoopエコシステムを使っているほとんどの方はHue(ヒュー)をご存知でしょう。しかし、過去にHadoopを使っていた方、あるいはこれからHadoopを使おうという方の中にはご存知ない方がいらっしゃるかもしれません。この記事ではHueについて改めて簡単に紹介します。

(続きを読む)

RDBMSからSqoopを用いてParquet形式でデータを取り込む

Hadoop上、または SQL on Hadoopで分析を行う場合、テキストファイル(CSVやJSONなど)をそのまま使うとパフォーマンス的に不利になる場合が多いです。(ファイルから改行文字や区切り文字を探して都度都度処理をすれば、当然遅くなりますよね、、、)

Hadoopでは複数のバイナリ形式のファイルフォーマット(SequenceFile, Avroなど)が利用できるようになっていますが、対象が分析用途なら、カラムナ(列指向)フォーマットのフォーマットを利用すると効率的です。これは、バイナリ形式で保存するだけでなく、データを列単位で保存することで、必要な列に効率良くアクセスできるためです。

Apache Parquet

カラムナフォーマットとして広く使われているApache Parquet(パーケィ、と発音)は、Hadoopだけではなく、Spark、Impala等多くのエコシステムで利用できるフォーマットです。(Apache Kuduも列指向ですが、ファイルではなく、独自の形式でデータを扱います)

#Apache Parquetの説明は別の機会に

RDBMSからHadoopにデータを取り込み場合、Sqoopを使ってデータを取り込むことが一般的ですが、この際にParquetファイルとして取り込むことができます。多くのエコシステムがParquetに対応している一例ですね。

Quickstart VMを使ってHadoopに(というかHDFSに)取り込んでみましょう。



sqoop import-all-tables     -m 1     --connect jdbc:mysql://quickstart:3306/retail_db     --username=retail_dba     --password=cloudera     --compression-codec=snappy     --as-parquetfile     --warehouse-dir=/user/hive/warehouse     --hive-import

 

これだけです。

これは、RDBMS(MySQL)のretail_dbデータベースにある全てのテーブルを、 Parquet のファイルとして、HDFSの /user/hive/warehouse にインポートします。(さらに、Hiveのメタストアに登録までを行います)。-m 1 というのは並列数です。今回「1」にしているのは、Quickstart VMが1ノードの疑似分散環境だからです。

Sqoopはインポート時にMapReduceを使用し、複数のノードから分散して並列でインポートを行います。MapReduceを起動しておいてください。(また、今回はHiveのメタストアに登録も行うため、Hiveも起動しておく必要があります)

以下実行結果です。Hiveのbeelineシェルで確認してみました。

<br data-mce-bogus="1">

[cloudera@quickstart ~]$ beeline -u jdbc:hive2://quickstart:10000 -n cloudera

(略)
0: jdbc:hive2://quickstart:10000> SHOW TABLES;
INFO  : OK
+--------------+--+
|   tab_name   |
+--------------+--+
| categories   |
| customers    |
| departments  |
| order_items  |
| orders       |
| products     |
+--------------+--+
6 rows selected (0.139 seconds)
0: jdbc:hive2://quickstart:10000> SELECT COUNT(*) FROM categories;
(略)
+------+--+
| _c0  |
+------+--+
| 58   |
+------+--+
1 row selected (19.011 seconds)


Parquetファイルとして保存されていることを確認してみます。

0: jdbc:hive2://quickstart:10000>; DESCRIBE FORMATTED ORDERS;
INFO : Compiling command(queryId=hive_20161211152525_107ba790-aab1-414c-a5d3-d4a1d1b499d1): DESCRIBE FORMATTED ORDERS
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO : Completed compiling command(queryId=hive_20161211152525_107ba790-aab1-414c-a5d3-d4a1d1b499d1); Time taken: 0.037 seconds
INFO : Executing command(queryId=hive_20161211152525_107ba790-aab1-414c-a5d3-d4a1d1b499d1): DESCRIBE FORMATTED ORDERS
INFO : Starting task [Stage-0:DDL] in serial mode
INFO : Completed executing command(queryId=hive_20161211152525_107ba790-aab1-414c-a5d3-d4a1d1b499d1); Time taken: 0.016 seconds
INFO : OK
+-------------------------------+-----------------------------------------------------------------+--------------------------------------------------------------------------------------+--+
| col_name | data_type | comment |
+-------------------------------+-----------------------------------------------------------------+--------------------------------------------------------------------------------------+--+
| # col_name | data_type | comment |
| | NULL | NULL |
| order_id | int | |
| order_date | bigint | |
| order_customer_id | int | |
| order_status | string | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | default | NULL |
| Owner: | null | NULL |
| CreateTime: | Sun Dec 11 15:19:11 PST 2016 | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Protect Mode: | None | NULL |
| Retention: | 0 | NULL |
| Location: | hdfs://quickstart.cloudera:8020/user/hive/warehouse/orders | NULL |
| Table Type: | MANAGED_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | COLUMN_STATS_ACCURATE | false |
| | avro.schema.url | hdfs://quickstart.cloudera:8020/user/hive/warehouse/orders/.metadata/schemas/1.avsc |
| | kite.compression.type | snappy |
| | numFiles | 0 |
| | numRows | -1 |
| | rawDataSize | -1 |
| | totalSize | 0 |
| | transient_lastDdlTime | 1481498351 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | NULL |
| InputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | -1 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
+-------------------------------+-----------------------------------------------------------------+--------------------------------------------------------------------------------------+--+
34 rows selected (0.091 seconds)
0: jdbc:hive2://quickstart:10000>


 

 

参考情報
https://parquet.apache.org/documentation/latest/

Introducing Parquet: Efficient Columnar Storage for Apache Hadoop
http://blog.cloudera.com/blog/2013/03/introducing-parquet-columnar-storage-for-apache-hadoop/

Choosing an HDFS data storage format- Avro vs. Parquet and more – StampedeCon

Dremel made simple with Parquet
https://blog.twitter.com/2013/dremel-made-simple-with-parquet

HueからRDBMSをクエリする

Hueには、SQL on Hadoopとして一般的な、HiveやImpalaにクエリを行うためのクエリエディタが用意されています。最新のHueではクエリエディタが改善され、とても使いやすくなりました。一押しです。

hue1が、今回はクエリエディタ機能の説明ではありません。別のブログネタを仕込む途中に必要が生じたので、HueのDBクエリの設定を行います。

(続きを読む)