Hadoopのノード間通信(1)

Hadoop アドベントカレンダー 23日目

Hadoopのマスターノードとスレーブノード各デーモン間では定期的にハートビート通信を行っています。(例:NameNode-DataNode, JobTracker-TaskTracker)
この間隔はクラスタの規模に合わせてチューニングする必要がありますが、いったいどのような情報がやり取りされているのでしょうか?
今回はwiresharkを使用してパケットキャプチャーを行い、ソースコードと照らし合せてその内容を調べてみます。

【検証環境】

  • version: CDH4.1.2
  • Cloudera Managerを利用して仮想環境において4ノードをインストール

topology

【Namenode-DataNode間のHeartbeat】

DataNodeはNameNodeに一定間隔でハートビートを送出する。
DN heartbeat interval (default)

  • パラメータ:dfs.heartbeat.interval
  • デフォルト値:3 (sec)

【HDFSのハートビートのプロトコル(DN->NN)】

現状のプロトコルでは以下の情報を送っている(詳細は [1][2]を参照)

  • DN登録の際の情報: (bpRegistrationオブジェクト:DNがNNに登録する時に使用。NNがDNを識別するのに必要な全情報)
  • レポート(report):(容量、使用量、残り、BlockPoolの使用量(CDH3には含まれていない。Federation, HAのサポートに伴い追加されたか?))
  • このDNから他への転送数
  • Xceiverの数
  • 不正なボリューム数

WireSharkのログは以下の通り。WireShark用のプラグインを書こうかな。。
DN heartbeat

【HDFSのハートビートのプロトコル(応答:NN->DN】

NameNodeはハートビートを受け取り、その内容に応じて対処する(詳細は [3][4]を参照)
チェック項目:

  • DataNodeが登録されているかどうか
  • Check if this datanode should actually be shutdown instead.
  • Check lease recovery
  • check pending replication
  • check block invalidation
  • check for balancer bandwidth update

WireSharkのログ。
DN heartbeat response

【HDFSのブロックレポート (DN->NN)】

各DataNodeはNameNodeに対して、登録後、または一定間隔毎にDNが保持しているブロック情報を送出する。
パラメータ:fs.blockreport.intervalMsec
デフォルト値:21600000 (msec)
現状のプロトコルでは以下の情報を送っている(詳細は [5][6][7][8]を参照)

  • ストレージID
  • ステータス
  • ブロックのリスト(longの配列)

WireSharkのログ
DN blockReport
MapReduceのデーモン(JobTrackerとTaskTracker)の通信については、また別の機会に。

参考

[1] org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
[java]
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
* It also gives the NameNode a chance to return
* an array of "DatanodeCommand" objects in HeartbeatResponse.
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
* @param registration datanode registration information
* @param reports utilization report per storage
* @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @throws IOException on error
*/
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException;
[/java]
[2] org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
[java]
HeartbeatResponse sendHeartBeat() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat from service actor: " + this);
}
// reports number of failed volumes
StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
false,
dn.getFSDataset().getCapacity(),
dn.getFSDataset().getDfsUsed(),
dn.getFSDataset().getRemaining(),
dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
return bpNamenode.sendHeartbeat(bpRegistration, report,
dn.getXmitsInProgress(),
dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes());
}
[/java]
[3] org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
[java]
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
readLock();
try {
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
if (cmds == null || cmds.length == 0) {
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
if (cmd != null) {
cmds = new DatanodeCommand[] {cmd};
}
}
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
}
}
[/java]
[4]
[java] org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {

[/java]
[5] org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
[java]
/**
* blockReport() tells the NameNode about all the locally-stored blocks.
* The NameNode returns an array of Blocks that have become obsolete
* and should be deleted. This function is meant to upload *all*
* the locally-stored blocks. It's invoked upon startup and then
* infrequently afterwards.
* @param registration
* @param poolId - the block pool ID for the blocks
* @param reports - report of blocks per storage
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
*
* @return - the next command for DN to process.
* @throws IOException
*/
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException;
[/java]
[6] org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
[java]
DatanodeCommand blockReport() throws IOException {
// send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
// Flush any block information that precedes the block report. Otherwise
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
reportReceivedDeletedBlocks();
// Create block report
long brCreateStartTime = now();
BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
bpos.getBlockPoolId());
// Send block report
long brSendStartTime = now();
StorageBlockReport[] report = { new StorageBlockReport(
new DatanodeStorage(bpRegistration.getStorageID()),
bReport.getBlockListAsLongs()) };
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);

[/java]
[7] org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java
[8] org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java