Apache Sparkでのデータの永続化を確認してみる

Hadoop関連(全部俺) Advent Calendar 2014:18日目の記事です

Apache Sparkはインメモリで処理を行いますが、ReduceByKeyなどでシャッフルされるような場合はディスクに永続化されます。(参考情報:Apache Sparkとデータの永続化

明示的にディスクにpersistすることもできますが(StorageLevel.DISK_ONLYを設定するなどして)、実際にどのようにローカルディスクに書き出されるかを確認してみました。

元にしたpythonのコード
[java]
file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1)
[/java]
このコードをベースにして、いくつかの条件でテストします。
1. アクション実行時
ワードカウントを実行します。シャッフル時にディスクに永続化されています。
[java]
$ MASTER=spark://quickstart.cloudera:7077 pyspark
>>> file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1)
counts.collect()
[/java]
[shell]
[root@quickstart ~]# find  /tmp/spark-local-20141215234016-41a3/
/tmp/spark-local-20141215234016-41a3/
/tmp/spark-local-20141215234016-41a3/0e
/tmp/spark-local-20141215234016-41a3/0c
/tmp/spark-local-20141215234016-41a3/0c/shuffle_0_0_0
/tmp/spark-local-20141215234016-41a3/11
/tmp/spark-local-20141215234016-41a3/13
/tmp/spark-local-20141215234016-41a3/0d
/tmp/spark-local-20141215234016-41a3/0d/shuffle_0_1_0
[/shell]
2. persist(StorageLevel.DISK_ONLY)時の挙動
[java]
$ MASTER=spark://quickstart.cloudera:7077 pyspark
>>> file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1)
counts.persist(StorageLevel.DISK_ONLY)
[/java]
変換操作はLazy(すぐに処理されない)ので、まだディスクに永続化されていません。
[shell]
[root@quickstart ~]# find /tmp/spark-local-20141215235810-7b74/
/tmp/spark-local-20141216000205-36c7
[/shell]
続いてアクション(collect())を実行します。
[python]
counts.collect()
[/python]
[shell]
[root@quickstart ~]# find /tmp/spark-local-20141216000205-36c7/
/tmp/spark-local-20141216000205-36c7/
/tmp/spark-local-20141216000205-36c7/0e
/tmp/spark-local-20141216000205-36c7/1a
/tmp/spark-local-20141216000205-36c7/1a/rdd_6_0
/tmp/spark-local-20141216000205-36c7/0c
/tmp/spark-local-20141216000205-36c7/0c/shuffle_0_0_0
/tmp/spark-local-20141216000205-36c7/11
/tmp/spark-local-20141216000205-36c7/13
/tmp/spark-local-20141216000205-36c7/0d
/tmp/spark-local-20141216000205-36c7/0d/shuffle_0_1_0
[/shell]
rdd_6_0 が永続化されていますね。
ちなみに toDebugString() の結果は以下の通り。
[shell]
‘(1) PythonRDD[6] at RDD at PythonRDD.scala:43
| MappedRDD[5] at values at NativeMethodAccessorImpl.java:-2
| ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:-2
+-(3) PairwiseRDD[3] at RDD at PythonRDD.scala:261
| PythonRDD[2] at RDD at PythonRDD.scala:43
| hdfs://quickstart.cloudera/user/cloudera/input MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
| hdfs://quickstart.cloudera/user/cloudera/input HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2′
[/shell]
3. 続いてunpersist()を行います。
※何回かテストをしているので、途中のデータが違っていることにご注意ください。
実行前
[shell]
[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a/0e
/tmp/spark-local-20141216002036-d30a/0b
/tmp/spark-local-20141216002036-d30a/1a
/tmp/spark-local-20141216002036-d30a/1a/rdd_6_0
/tmp/spark-local-20141216002036-d30a/0c
/tmp/spark-local-20141216002036-d30a/0c/shuffle_0_0_0
/tmp/spark-local-20141216002036-d30a/11
/tmp/spark-local-20141216002036-d30a/13
/tmp/spark-local-20141216002036-d30a/0d
/tmp/spark-local-20141216002036-d30a/0d/shuffle_0_1_0
[/shell]
[python]
counts.unpersist()
[/python]
すぐに消去されました。
[shell]
[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a
/tmp/spark-local-20141216002036-d30a/0e
/tmp/spark-local-20141216002036-d30a/0b
/tmp/spark-local-20141216002036-d30a/1a
/tmp/spark-local-20141216002036-d30a/0c
/tmp/spark-local-20141216002036-d30a/0c/shuffle_0_0_0
/tmp/spark-local-20141216002036-d30a/11
/tmp/spark-local-20141216002036-d30a/13
/tmp/spark-local-20141216002036-d30a/0d
/tmp/spark-local-20141216002036-d30a/0d/shuffle_0_1_0
[root@quickstart ~]#
[/shell]
アプリケーション(pyspark)を終了しても削除されます。
[shell]
>>> quit()
[root@quickstart ~]# find /tmp/spark-local-20141216002036-d30a
find: `/tmp/spark-local-20141216002036-d30a’: No such file or directory
[root@quickstart ~]#
[/shell]
チェックポイントやttlは別の機会があればそのときに。

コメント