Apache Sparkでのデータの永続化を確認してみる

Hadoop関連(全部俺) Advent Calendar 2014:18日目の記事です

Apache Sparkはインメモリで処理を行いますが、ReduceByKeyなどでシャッフルされるような場合はディスクに永続化されます。(参考情報:Apache Sparkとデータの永続化

明示的にディスクにpersistすることもできますが(StorageLevel.DISK_ONLYを設定するなどして)、実際にどのようにローカルディスクに書き出されるかを確認してみました。

元にしたpythonのコード

file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b, 1)

このコードをベースにして、いくつかの条件でテストします。

1. アクション実行時
ワードカウントを実行します。シャッフル時にディスクに永続化されています。

$ MASTER=spark://quickstart.cloudera:7077 pyspark
>>> file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b, 1)
counts.collect()
[root@quickstart ~]# find  /tmp/spark-local-20141215234016-41a3/
/tmp/spark-local-20141215234016-41a3/
/tmp/spark-local-20141215234016-41a3/0e
/tmp/spark-local-20141215234016-41a3/0c
/tmp/spark-local-20141215234016-41a3/0c/shuffle_0_0_0
/tmp/spark-local-20141215234016-41a3/11
/tmp/spark-local-20141215234016-41a3/13
/tmp/spark-local-20141215234016-41a3/0d
/tmp/spark-local-20141215234016-41a3/0d/shuffle_0_1_0

2. persist(StorageLevel.DISK_ONLY)時の挙動

$ MASTER=spark://quickstart.cloudera:7077 pyspark
>>> file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b, 1)
counts.persist(StorageLevel.DISK_ONLY)

変換操作はLazy(すぐに処理されない)ので、まだディスクに永続化されていません。

[root@quickstart ~]# find /tmp/spark-local-20141215235810-7b74/
/tmp/spark-local-20141216000205-36c7

続いてアクション(collect())を実行します。

counts.collect()
[root@quickstart ~]# find /tmp/spark-local-20141216000205-36c7/
/tmp/spark-local-20141216000205-36c7/
/tmp/spark-local-20141216000205-36c7/0e
/tmp/spark-local-20141216000205-36c7/1a
/tmp/spark-local-20141216000205-36c7/1a/rdd_6_0
/tmp/spark-local-20141216000205-36c7/0c
/tmp/spark-local-20141216000205-36c7/0c/shuffle_0_0_0
/tmp/spark-local-20141216000205-36c7/11
/tmp/spark-local-20141216000205-36c7/13
/tmp/spark-local-20141216000205-36c7/0d
/tmp/spark-local-20141216000205-36c7/0d/shuffle_0_1_0

rdd_6_0 が永続化されていますね。

ちなみに toDebugString() の結果は以下の通り。

'(1) PythonRDD[6] at RDD at PythonRDD.scala:43
 |  MappedRDD[5] at values at NativeMethodAccessorImpl.java:-2
 |  ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:-2
 +-(3) PairwiseRDD[3] at RDD at PythonRDD.scala:261
    |  PythonRDD[2] at RDD at PythonRDD.scala:43
    |  hdfs://quickstart.cloudera/user/cloudera/input MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
    |  hdfs://quickstart.cloudera/user/cloudera/input HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2'

3. 続いてunpersist()を行います。
※何回かテストをしているので、途中のデータが違っていることにご注意ください。

実行前

[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a/0e
/tmp/spark-local-20141216002036-d30a/0b
/tmp/spark-local-20141216002036-d30a/1a
/tmp/spark-local-20141216002036-d30a/1a/rdd_6_0
/tmp/spark-local-20141216002036-d30a/0c
/tmp/spark-local-20141216002036-d30a/0c/shuffle_0_0_0
/tmp/spark-local-20141216002036-d30a/11
/tmp/spark-local-20141216002036-d30a/13
/tmp/spark-local-20141216002036-d30a/0d
/tmp/spark-local-20141216002036-d30a/0d/shuffle_0_1_0
counts.unpersist()

すぐに消去されました。

[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a/0e
/tmp/spark-local-20141216002036-d30a/0b
/tmp/spark-local-20141216002036-d30a/1a
/tmp/spark-local-20141216002036-d30a/0c
/tmp/spark-local-20141216002036-d30a/0c/shuffle_0_0_0
/tmp/spark-local-20141216002036-d30a/11
/tmp/spark-local-20141216002036-d30a/13
/tmp/spark-local-20141216002036-d30a/0d
/tmp/spark-local-20141216002036-d30a/0d/shuffle_0_1_0
[root@quickstart ~]#

アプリケーション(pyspark)を終了しても削除されます。

>>> quit()
[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
find: `/tmp/spark-local-20141216002036-d30a': No such file or directory
[root@quickstart ~]#

チェックポイントやttlは別の機会があればそのときに。

Pocket

Leave a Reply

Your email address will not be published. Required fields are marked *

CAPTCHA


日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)