hadoop-logo
ようこそ Tech blogへ!
「これからHadoopを勉強しよう」という方はまず下記のページから

サイトの移行に伴って画像が表示されないなどの不具合が生じています

CSVデータをParquet形式に変換する

hive

昨日の続きです。読むのが面倒な方は「まとめ」をどうぞ。

OpenCSVSerDeによるCSVデータの扱い

昨日はHueでCSV形式のデータをインポートしましたが、このデータをHive以外から利用するには不便です。

Apache Impala から利用する

昨日も書いた通り、Hueからインポートする際に”csv”形式を選択すると、テーブル作成時にOpenCSVSerDeが利用されます。

CREATE TABLE `default`.`201402_trip_data`
(
 `Trip ID` bigint ,
 `Duration` bigint ,
 `Start Date` string ,
 `Start Station` string ,
 `Start Terminal` bigint ,
 `End Date` string ,
 `End Station` string ,
 `End Terminal` bigint ,
 `Bike ID` bigint ,
 `Subscription Type` string ,
 `Zip Code` string ) ROW FORMAT   SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
 WITH SERDEPROPERTIES ("separatorChar" = ",",
   "quoteChar"     = "\"",
   "escapeChar"    = "\\"
   )
 STORED AS TextFile TBLPROPERTIES("skip.header.line.count" = "1")

高速化のため、現在ImpalaはSerDe (Serializer/Deserializer)に対応していません。よって、このテーブルを Impalaから利用しようとするとエラーになってしまいます。

> SELECT * FROM `201402_trip_data` LIMIT 1;
AnalysisException: Failed to load metadata for table: '201402_trip_data' CAUSED BY: TableLoadingException: Failed to load metadata for table: default.201402_trip_data CAUSED BY: InvalidStorageDescriptorException: Impala does not support tables of this type. REASON: SerDe library 'org.apache.hadoop.hive.serde2.OpenCSVSerde' is not supported.

Impalaを用いてアドホックに分析したい場合には不便です。

Apache Spark から利用する

CDH6で利用できるSpark2を使用すれば、OpenCSVSerDeで作成したHiveテーブルにアクセスできそうです。

scala> spark.read.table("201402_trip_data").show(3)
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+
|trip id|duration|     start date|       start station|start terminal|       end date|         end station|end terminal|bike id|subscription type|zip code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal| Bike #|Subscription Type|Zip Code|
|   4576|      63|8/29/2013 14:13|South Van Ness at...|            66|8/29/2013 14:14|South Van Ness at...|          66|    520|       Subscriber|   94127|
|   4607|      70|8/29/2013 14:42|  San Jose City Hall|            10|8/29/2013 14:43|  San Jose City Hall|          10|    661|       Subscriber|   95138|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+
only showing top 3 rows

scala> 

一見うまくいっているように見えますが、よくよく見るとデータ型は全て String 型になっており、ヘッダ行の扱いなども正しく行われていません。

scala> spark.read.table("201402_trip_data").printSchema()
root
 |-- trip id: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- start date: string (nullable = true)
 |-- start station: string (nullable = true)
 |-- start terminal: string (nullable = true)
 |-- end date: string (nullable = true)
 |-- end station: string (nullable = true)
 |-- end terminal: string (nullable = true)
 |-- bike id: string (nullable = true)
 |-- subscription type: string (nullable = true)
 |-- zip code: string (nullable = true)

OpenCSVSerDe側で頑張って吸収してくれれば楽そうです。しかしImpala とは異なり、このデータを参照するだけなら何とかなるかもしれません。

しかし、テキスト形式 (CSVやJSON)のデータを分析するのは、パフォーマンス的に望ましくありません。また、列名に空白文字が含まれていると扱いにくく(後述)、分析ならカラムナ形式で保存しておく方が効果的です。よって、Parquet形式で保存しておきたいところです。

Apache Parquetは列指向のバイナリフォーマットです。Google の Dremel の論文を参考にTwitterとClouderaが共同で開発し、Apache に寄贈されています。

Parquet形式への変換

CSVからParquet形式に変換するETL処理は、何通りかの方法で行うことができます。

HiveによるParquet形式への変換

HiveのCTAS (Create Table as Select) を利用することで、Parquet形式への変換を行うことができます。しかし、このデータ(メタ情報)は列名に空白文字が入っているため、Parquetへの変換処理(MapReduce)で失敗します。

CREATE TABLE test_parquet STORED AS PARQUET AS SELECT * FROM `201402_trip_data`;

Task with the most failures(4): 
-----
Task ID:
  task_1545493272463_0011_m_000000

URL:
  http://nightly6x-unsecure-1.vpc.cloudera.com:8088/taskdetails.jsp?jobid=job_1545493272463_0011&tipid=task_1545493272463_0011_m_000000
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"trip id":"4576","duration":"63","start date":"8/29/2013 14:13","start station":"South Van Ness at Market","start terminal":"66","end date":"8/29/2013 14:14","end station":"South Van Ness at Market","end terminal":"66","bike id":"520","subscription type":"Subscriber","zip code":"94127"}
	at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:169)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
なお、予め Parquet 形式のテーブルを作成しておけば、INSERT OVERWRITE TABLE でデータを流し込むことは可能でした。

SparkによるParquet形式への変換

DataFrameを利用すれば簡単にParquetに変換できます。が、先ほどのHiveと同じように、空白文字列を含んだ列名が原因でエラーになってしまいます。

trip id 列がエラー

scala> spark.read.table("201402_trip_data").write.saveAsTable("sparktest")
org.apache.spark.sql.AnalysisException: Attribute name "trip id" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
  at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:570)
(略)

全ての空白文字を含む列名を、 withColumnRenamed や alias のように指定すれば何とかなりそうですが、やはり面倒です。

trip id 列を tripid にリネームした。しかし start date 列がエラー

scala> spark.read.table("201402_trip_data").withColumnRenamed("trip id", "tripid").write.saveAsTable("sparktest")
org.apache.spark.sql.AnalysisException: Attribute name "start date" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
  at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:570)
(略)

結果、HueでCSVファイルをインポートする際、列名に空白を含まないように指定するのがベストのようですね。

再: HueでCSVデータをインポートする

昨日同様に Hue から CSV ファイルをインポートします。その際、列名を編集して空白文字を取り除きます。

Hueを用いたCSVファイルのインポート

手順の詳細は昨日のブログを参照してください。

HiveのCATSでParqetに変換

HiveのCTASを用いてParquet形式に変換します。

今度はエラーになりませんでした。Parquet形式に変換したテーブルはImpalaやSparkから利用可能です。

実はまだ落とし穴があって、OpenCSVSerDeを利用したCSVファイルからParquetファイルを作成すると、全ての列がstring形式になってしまうようです。バグ?仕様?

Impalaからクエリを実行

Sparkからテーブルを使用

Sparkからもアクセスしてみます。

$ pyspark --master local
Python 2.7.5 (default, Nov  6 2016, 00:28:07) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0-cdh6.x-SNAPSHOT
      /_/

Using Python version 2.7.5 (default, Nov  6 2016 00:28:07)
SparkSession available as 'spark'.
>>> spark.read.table("pq_201402_trip_data").show(5)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/parquet-format-2.3.1-cdh6.x-SNAPSHOT.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/hive/hive-exec-2.1.1-cdh6.x-20181222.095319-3.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [shaded.parquet.org.slf4j.helpers.NOPLoggerFactory]
+------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+
|tripid|duration|      startdate|        startstation|startterminal|        enddate|          endstation|endterminal|bikeid|subscriptiontype|zipcode|
+------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+
|  4576|      63|8/29/2013 14:13|South Van Ness at...|           66|8/29/2013 14:14|South Van Ness at...|         66|   520|      Subscriber|  94127|
|  4607|      70|8/29/2013 14:42|  San Jose City Hall|           10|8/29/2013 14:43|  San Jose City Hall|         10|   661|      Subscriber|  95138|
|  4130|      71|8/29/2013 10:16|Mountain View Cit...|           27|8/29/2013 10:17|Mountain View Cit...|         27|    48|      Subscriber|  97214|
|  4251|      77|8/29/2013 11:29|  San Jose City Hall|           10|8/29/2013 11:30|  San Jose City Hall|         10|    26|      Subscriber|  95060|
|  4299|      83|8/29/2013 12:02|South Van Ness at...|           66|8/29/2013 12:04|      Market at 10th|         67|   319|      Subscriber|  94103|
+------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+
only showing top 5 rows

まとめ

色々試した結果、分析がメインの目的で Parquet 形式にすることが決まっているなら、csv形式で取り込んで変換するのではなく、最初からParquet形式でインポートした方が良いということでした。

とはいえ、CSVやJSON形式のテーブルから変換することは日常茶飯事ですし、さまざまな方法を知っておくと役立つでしょう。

おまけ: HueでCSVファイルをParquet形式でインポート

コメント