Apache Sparkとデータの永続化

2日目です
Apache Sparkは「インメモリで処理できる分散処理基盤」ですが、状況によってはディスクへのアクセスがあります。

  1. 明示的にディスクに永続化した場合 (persist())
  2. チェックポイントを取った場合(checkpoint())
  3. シャッフル時(暗黙的)

3がわかりにくいので、簡単にまとめてみます。

明示的に永続化する

Spark Programming Guideから抜粋しますが、例えば以下のようなコードを書くと、明示的にディスク(またはコードを変更すればメモリに)にRDDを永続化できます。永続化しておくことで、処理をもう一度実行する際、先頭からRDDのリネージを辿る必要がなくなるため、繰り返し処理などに効率的です。
当然ですが、ディスクに永続化するよりもメモリに永続化する方が高速ですが、その分多くのメモリを必要とします。キャッシュされた古い情報はLRUで捨てられます。また、StorageLevelの指定により、メモリ、ディスクのどちらに保存するか、複製数をいくつにするかなどが調整できます。
[code]
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
lineLengths.persist(StorageLevel.DISK_ONLY)
[/code]
ディスクに永続化すると、ワーカーのローカルディスクに書き込まれます。

チェックポイントを取る

リネージが深くなる場合、明示的にチェックポイントを取ることで、スタックオーバーフローを回避することができます。(Sparkストリーミングでは、暗黙的にチェックポイントの処理が行われます)。
[code]
sc.setCheckpointDir(directory)
:
rdd1.checkpoint()
:
[/code]
チェックポイントは、指定したHDFSのディレクトリに保存されます。

シャッフル時に暗黙的に取られる

Spark Programming Guideにも記載されていますが、Sparkは障害時に最初から再計算し直すことを避けるため、シャッフル時に自動的に(暗黙的に)中間データをディスクに書き出します。

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

このデータもワーカーのローカルディスクに書き込まれます。
ということで、全てがインメモリで処理されるわけではありません。reduceしないような処理(例: map(….).map(…).map(…))であれば当該ノードでインメモリで処理できるので、可能な限り不要なreduce関連処理は避けましょう。
 

コメント