2日目です
Apache Sparkは「インメモリで処理できる分散処理基盤」ですが、状況によってはディスクへのアクセスがあります。
- 明示的にディスクに永続化した場合 (persist())
- チェックポイントを取った場合(checkpoint())
- シャッフル時(暗黙的)
3がわかりにくいので、簡単にまとめてみます。
明示的に永続化する
Spark Programming Guideから抜粋しますが、例えば以下のようなコードを書くと、明示的にディスク(またはコードを変更すればメモリに)にRDDを永続化できます。永続化しておくことで、処理をもう一度実行する際、先頭からRDDのリネージを辿る必要がなくなるため、繰り返し処理などに効率的です。
当然ですが、ディスクに永続化するよりもメモリに永続化する方が高速ですが、その分多くのメモリを必要とします。キャッシュされた古い情報はLRUで捨てられます。また、StorageLevelの指定により、メモリ、ディスクのどちらに保存するか、複製数をいくつにするかなどが調整できます。
[code]
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
lineLengths.persist(StorageLevel.DISK_ONLY)
[/code]
ディスクに永続化すると、ワーカーのローカルディスクに書き込まれます。
チェックポイントを取る
リネージが深くなる場合、明示的にチェックポイントを取ることで、スタックオーバーフローを回避することができます。(Sparkストリーミングでは、暗黙的にチェックポイントの処理が行われます)。
[code]
sc.setCheckpointDir(directory)
:
rdd1.checkpoint()
:
[/code]
チェックポイントは、指定したHDFSのディレクトリに保存されます。
シャッフル時に暗黙的に取られる
Spark Programming Guideにも記載されていますが、Sparkは障害時に最初から再計算し直すことを避けるため、シャッフル時に自動的に(暗黙的に)中間データをディスクに書き出します。
Spark also automatically persists some intermediate data in shuffle operations (e.g.
reduceByKey
), even without users callingpersist
. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users callpersist
on the resulting RDD if they plan to reuse it.
このデータもワーカーのローカルディスクに書き込まれます。
ということで、全てがインメモリで処理されるわけではありません。reduceしないような処理(例: map(....).map(...).map(...))であれば当該ノードでインメモリで処理できるので、可能な限り不要なreduce関連処理は避けましょう。
コメント