昨日の続きです。読むのが面倒な方は「まとめ」をどうぞ。
OpenCSVSerDeによるCSVデータの扱い
昨日はHueでCSV形式のデータをインポートしましたが、このデータをHive以外から利用するには不便です。
Apache Impala から利用する
昨日も書いた通り、Hueからインポートする際に"csv"形式を選択すると、テーブル作成時にOpenCSVSerDeが利用されます。
CREATE TABLE `default`.`201402_trip_data` ( `Trip ID` bigint , `Duration` bigint , `Start Date` string , `Start Station` string , `Start Terminal` bigint , `End Date` string , `End Station` string , `End Terminal` bigint , `Bike ID` bigint , `Subscription Type` string , `Zip Code` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ("separatorChar" = ",", "quoteChar" = "\"", "escapeChar" = "\\" ) STORED AS TextFile TBLPROPERTIES("skip.header.line.count" = "1")
高速化のため、現在ImpalaはSerDe (Serializer/Deserializer)に対応していません。よって、このテーブルを Impalaから利用しようとするとエラーになってしまいます。
> SELECT * FROM `201402_trip_data` LIMIT 1; AnalysisException: Failed to load metadata for table: '201402_trip_data' CAUSED BY: TableLoadingException: Failed to load metadata for table: default.201402_trip_data CAUSED BY: InvalidStorageDescriptorException: Impala does not support tables of this type. REASON: SerDe library 'org.apache.hadoop.hive.serde2.OpenCSVSerde' is not supported.
Impalaを用いてアドホックに分析したい場合には不便です。
Apache Spark から利用する
CDH6で利用できるSpark2を使用すれば、OpenCSVSerDeで作成したHiveテーブルにアクセスできそうです。
scala> spark.read.table("201402_trip_data").show(3) +-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+ |trip id|duration| start date| start station|start terminal| end date| end station|end terminal|bike id|subscription type|zip code| +-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+ |Trip ID|Duration| Start Date| Start Station|Start Terminal| End Date| End Station|End Terminal| Bike #|Subscription Type|Zip Code| | 4576| 63|8/29/2013 14:13|South Van Ness at...| 66|8/29/2013 14:14|South Van Ness at...| 66| 520| Subscriber| 94127| | 4607| 70|8/29/2013 14:42| San Jose City Hall| 10|8/29/2013 14:43| San Jose City Hall| 10| 661| Subscriber| 95138| +-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+-------+-----------------+--------+ only showing top 3 rows scala>
一見うまくいっているように見えますが、よくよく見るとデータ型は全て String 型になっており、ヘッダ行の扱いなども正しく行われていません。
scala> spark.read.table("201402_trip_data").printSchema() root |-- trip id: string (nullable = true) |-- duration: string (nullable = true) |-- start date: string (nullable = true) |-- start station: string (nullable = true) |-- start terminal: string (nullable = true) |-- end date: string (nullable = true) |-- end station: string (nullable = true) |-- end terminal: string (nullable = true) |-- bike id: string (nullable = true) |-- subscription type: string (nullable = true) |-- zip code: string (nullable = true)
OpenCSVSerDe側で頑張って吸収してくれれば楽そうです。しかしImpala とは異なり、このデータを参照するだけなら何とかなるかもしれません。
しかし、テキスト形式 (CSVやJSON)のデータを分析するのは、パフォーマンス的に望ましくありません。また、列名に空白文字が含まれていると扱いにくく(後述)、分析ならカラムナ形式で保存しておく方が効果的です。よって、Parquet形式で保存しておきたいところです。
Parquet形式への変換
CSVからParquet形式に変換するETL処理は、何通りかの方法で行うことができます。
HiveによるParquet形式への変換
HiveのCTAS (Create Table as Select) を利用することで、Parquet形式への変換を行うことができます。しかし、このデータ(メタ情報)は列名に空白文字が入っているため、Parquetへの変換処理(MapReduce)で失敗します。
CREATE TABLE test_parquet STORED AS PARQUET AS SELECT * FROM `201402_trip_data`; Task with the most failures(4): ----- Task ID: task_1545493272463_0011_m_000000 URL: http://nightly6x-unsecure-1.vpc.cloudera.com:8088/taskdetails.jsp?jobid=job_1545493272463_0011&tipid=task_1545493272463_0011_m_000000 ----- Diagnostic Messages for this Task: Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"trip id":"4576","duration":"63","start date":"8/29/2013 14:13","start station":"South Van Ness at Market","start terminal":"66","end date":"8/29/2013 14:14","end station":"South Van Ness at Market","end terminal":"66","bike id":"520","subscription type":"Subscriber","zip code":"94127"} at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:169) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
SparkによるParquet形式への変換
DataFrameを利用すれば簡単にParquetに変換できます。が、先ほどのHiveと同じように、空白文字列を含んだ列名が原因でエラーになってしまいます。
trip id 列がエラー
scala> spark.read.table("201402_trip_data").write.saveAsTable("sparktest") org.apache.spark.sql.AnalysisException: Attribute name "trip id" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583) at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:570) (略)
全ての空白文字を含む列名を、 withColumnRenamed や alias のように指定すれば何とかなりそうですが、やはり面倒です。
trip id 列を tripid にリネームした。しかし start date 列がエラー
scala> spark.read.table("201402_trip_data").withColumnRenamed("trip id", "tripid").write.saveAsTable("sparktest") org.apache.spark.sql.AnalysisException: Attribute name "start date" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583) at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:570) (略)
結果、HueでCSVファイルをインポートする際、列名に空白を含まないように指定するのがベストのようですね。
再: HueでCSVデータをインポートする
昨日同様に Hue から CSV ファイルをインポートします。その際、列名を編集して空白文字を取り除きます。
Hueを用いたCSVファイルのインポート
手順の詳細は昨日のブログを参照してください。
HiveのCATSでParqetに変換
HiveのCTASを用いてParquet形式に変換します。
今度はエラーになりませんでした。Parquet形式に変換したテーブルはImpalaやSparkから利用可能です。
Impalaからクエリを実行
Sparkからテーブルを使用
Sparkからもアクセスしてみます。
$ pyspark --master local Python 2.7.5 (default, Nov 6 2016, 00:28:07) [GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0-cdh6.x-SNAPSHOT /_/ Using Python version 2.7.5 (default, Nov 6 2016 00:28:07) SparkSession available as 'spark'. >>> spark.read.table("pq_201402_trip_data").show(5) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/parquet-format-2.3.1-cdh6.x-SNAPSHOT.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/spark/hive/hive-exec-2.1.1-cdh6.x-20181222.095319-3.jar!/shaded/parquet/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [shaded.parquet.org.slf4j.helpers.NOPLoggerFactory] +------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+ |tripid|duration| startdate| startstation|startterminal| enddate| endstation|endterminal|bikeid|subscriptiontype|zipcode| +------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+ | 4576| 63|8/29/2013 14:13|South Van Ness at...| 66|8/29/2013 14:14|South Van Ness at...| 66| 520| Subscriber| 94127| | 4607| 70|8/29/2013 14:42| San Jose City Hall| 10|8/29/2013 14:43| San Jose City Hall| 10| 661| Subscriber| 95138| | 4130| 71|8/29/2013 10:16|Mountain View Cit...| 27|8/29/2013 10:17|Mountain View Cit...| 27| 48| Subscriber| 97214| | 4251| 77|8/29/2013 11:29| San Jose City Hall| 10|8/29/2013 11:30| San Jose City Hall| 10| 26| Subscriber| 95060| | 4299| 83|8/29/2013 12:02|South Van Ness at...| 66|8/29/2013 12:04| Market at 10th| 67| 319| Subscriber| 94103| +------+--------+---------------+--------------------+-------------+---------------+--------------------+-----------+------+----------------+-------+ only showing top 5 rows
まとめ
色々試した結果、分析がメインの目的で Parquet 形式にすることが決まっているなら、csv形式で取り込んで変換するのではなく、最初からParquet形式でインポートした方が良いということでした。
とはいえ、CSVやJSON形式のテーブルから変換することは日常茶飯事ですし、さまざまな方法を知っておくと役立つでしょう。
コメント