
时间:2024-11-15 21:49:12


hadoop集群中,一个datanode执行启动操作后,会在namenode中进行节点的注册,然后namenode会与这个新注册的datanode通过心跳的形式,进行信息的传输,一方面datanode将会汇报自身的block块的情况,另一方面然后namenode接受到这些块后,进行一段分析,然后返回datanode相应的反馈命令.同时这个操作也用来判断,节点是否已经是dead状态了.但是这个过程只是宏观层面的一个过程描述,了解这点背景知识其实远远不够,一旦HDFS中出现了block块异常的情况,比如突然在某个时间点underReplicated blocks突然变多了,或者说pengdingDeleted blocks变多了,这个时候该怎么办,你需要了解这些块是如何被添加到这些操作对应的block列表里的,只有了解了hdfs中这些细节的处理,你才能够有根据的发现原因.本篇博文给大家分享的是namenode对与datanode上报块的处理过程,里面的很多东西还是非常有必要留意的.



  1. /**
  2. * Main loop for each BP thread. Run until shutdown,
  3. * forever calling remote NameNode functions.
  4. */
  5. private void offerService() throws Exception {
  6. ...
  7. //
  8. // Now loop for a long time....
  9. //
  10. while (shouldRun()) {
  11. try {
  12. ...
  13. List<DatanodeCommand> cmds = blockReport();
  14. processCommand(cmds == null ? null : (new DatanodeCommand[()]));
  15. ...

  1. private Collection<Block> processReport(
  2. final DatanodeStorageInfo storageInfo,
  3. final BlockListAsLongs report) throws IOException {
  4. // Normal case:
  5. // Modify the (block-->datanode) map, according to the difference
  6. // between the old and new block report.
  7. //
  8. ...
  9. }

  1. private Collection<Block> processReport(
  2. final DatanodeStorageInfo storageInfo,
  3. final BlockListAsLongs report) throws IOException {
  4. // Normal case:
  5. // Modify the (block-->datanode) map, according to the difference
  6. // between the old and new block report.
  7. //
  8. // 新添加的块
  9. Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
  10. // 待移除的块
  11. Collection<Block> toRemove = new TreeSet<Block>();
  12. // 无效的块
  13. Collection<Block> toInvalidate = new LinkedList<Block>();
  14. // 损坏的块
  15. Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
  16. // 正在复制中的块
  17. Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
  18. ...

ToAdd: 新添加的块


  1. private Collection<Block> processReport(
  2. final DatanodeStorageInfo storageInfo,
  3. final BlockListAsLongs report) throws IOException {
  4. // Normal case:
  5. // Modify the (block-->datanode) map, according to the difference
  6. // between the old and new block report.
  7. //
  8. Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
  9. Collection<Block> toRemove = new TreeSet<Block>();
  10. Collection<Block> toInvalidate = new LinkedList<Block>();
  11. Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
  12. Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
  13. reportDiff(storageInfo, report,
  14. toAdd, toRemove, toInvalidate, toCorrupt, toUC);
  15. ...
reportDiff方法,有点像git diff命令," 找不同"的意思,在这个方法中,这些列表都以参数的方法传入,reportDiff方法结束后,这些变量都将会被赋值.进入reportDiff方法:

  1. private void reportDiff(DatanodeStorageInfo storageInfo,
  2. BlockListAsLongs newReport,
  3. Collection<BlockInfoContiguous> toAdd, // add to DatanodeDescriptor
  4. Collection<Block> toRemove, // remove from DatanodeDescriptor
  5. Collection<Block> toInvalidate, // should be removed from DN
  6. Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
  7. Collection<StatefulBlockInfo> toUC) { // add to under-construction list
  8. ...
  9. // scan the report and process newly reported blocks
  10. for (BlockReportReplica iblk : newReport) {
  11. ReplicaState iState = ();
  12. BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
  13. iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
  14. // move block to the head of the list
  15. if (storedBlock != null &&
  16. (curIndex = (storageInfo)) >= 0) {
  17. headIndex = (storedBlock, curIndex, headIndex);
  18. }
  19. }
  20. ...


  1. if (shouldPostponeBlocksFromFuture &&
  2. (block)) {
  3. queueReportedBlock(storageInfo, block, reportedState,
  5. return null;
  6. }

  1. private Collection<Block> processReport(
  2. final DatanodeStorageInfo storageInfo,
  3. final BlockListAsLongs report) throws IOException {
  4. // Normal case:
  5. // Modify the (block-->datanode) map, according to the difference
  6. // between the old and new block report.
  7. //
  8. Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
  9. Collection<Block> toRemove = new TreeSet<Block>();
  10. Collection<Block> toInvalidate = new LinkedList<Block>();
  11. Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
  12. Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
  13. reportDiff(storageInfo, report,
  14. toAdd, toRemove, toInvalidate, toCorrupt, toUC);
  15. ...
  16. for (BlockInfoContiguous b : toAdd) {
  17. addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
  18. numBlocksLogged++;
  19. }
  20. ...
  21. }

  1. /**
  2. * Modify (block-->datanode) map. Remove block from set of
  3. * needed replications if this takes care of the problem.
  4. * @return the block that is stored in blockMap.
  5. */
  6. private Block addStoredBlock(final BlockInfoContiguous block,
  7. DatanodeStorageInfo storageInfo,
  8. DatanodeDescriptor delNodeHint,
  9. boolean logEveryBlock)
  10. throws IOException {
  11. ...
  12. // handle underReplication/overReplication
  13. short fileReplication = ();
  14. if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
  15. (storedBlock, numCurrentReplica,
  16. (), fileReplication);
  17. }
  18. ...


ToRemove: 待移除的块


  1. // collect blocks that have not been reported
  2. // all of them are next to the delimiter

  1. // place a delimiter in the list which separates blocks
  2. // that have been reported from those that have not





3.于是最后本轮没有汇报上来的块就全都在分隔符块的右侧了,应该说是next to delimeter.


  1. // place a delimiter in the list which separates blocks
  2. // that have been reported from those that have not
  3. BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
  4. AddBlockResult result = (delimiter);
  5. ...
  6. // scan the report and process newly reported blocks
  7. for (BlockReportReplica iblk : newReport) {
  8. ...
  9. // move block to the head of the list
  10. if (storedBlock != null &&
  11. (curIndex = (storageInfo)) >= 0) {
  12. headIndex = (storedBlock, curIndex, headIndex);
  13. }
  14. }
  15. // collect blocks that have not been reported
  16. // all of them are next to the delimiter
  17. Iterator<BlockInfoContiguous> it =
  18. BlockIterator((0));
  19. while(())
  20. (());
  21. (delimiter);

  1. for (Block b : toRemove) {
  2. removeStoredBlock(b, node);
  3. }

  1. /**
  2. * Modify (block-->datanode) map. Possibly generate replication tasks, if the
  3. * removed block is still valid.
  4. */
  5. public void removeStoredBlock(Block block, DatanodeDescriptor node) {
  6. ("BLOCK* removeStoredBlock: {} from {}", block, node);
  7. ...
  8. if (!(block, node)) {
  9. ("BLOCK* removeStoredBlock: {} has already been" +
  10. " removed from node {}", block, node);
  11. return;
  12. }
  13. ...

ToInvalidate: 无效的块


  1. // find block by blockId
  2. BlockInfoContiguous storedBlock = (block);
  3. if(storedBlock == null) {
  4. // If blocksMap does not contain reported block id,
  5. // the replica should be removed from the data-node.
  6. (new Block(block));
  7. return null;
  8. }










  1. for (Block b : toInvalidate) {
  2. addToInvalidates(b, node);
  3. }
  1. /**
  2. * Adds block to list of blocks which will be invalidated on specified
  3. * datanode and log the operation
  4. */
  5. void addToInvalidates(final Block block, final DatanodeInfo datanode) {
  6. if (!()) {
  7. return;
  8. }
  9. (block, datanode, true);
  10. }


  1. /**
  2. * Periodically calls computeReplicationWork().
  3. */
  4. private class ReplicationMonitor implements Runnable {
  5. @Override
  6. public void run() {
  7. while (()) {
  8. try {
  9. // Process replication work only when active NN is out of safe mode.
  10. if (()) {
  11. computeDatanodeWork();
  12. processPendingReplications();
  13. rescanPostponedMisreplicatedBlocks();
  14. }
  15. (replicationRecheckInterval);
  16. ...

  1. /**
  2. * Compute block replication and block invalidation work that can be scheduled
  3. * on data-nodes. The datanode will be informed of this work at the next
  4. * heartbeat.
  5. *
  6. * @return number of blocks scheduled for replication or removal.
  7. */
  8. int computeDatanodeWork() {

  1. int blockCnt = 0;
  2. for (DatanodeInfo dnInfo : nodes) {
  3. int blocks = invalidateWorkForOneNode(dnInfo);
  4. if (blocks > 0) {
  5. blockCnt += blocks;
  6. if (--nodesToProcess == 0) {
  7. break;
  8. }
  9. }
  10. }



  1. @Override
  2. @Metric
  3. public long getPendingDeletionBlocks() {
  4. return ();
  5. }
  1. /** Used by metrics */
  2. public long getPendingDeletionBlocksCount() {
  3. return ();
  4. }


ToCorrupt: 损坏的块



  1. ...
  2. BlockToMarkCorrupt c = checkReplicaCorrupt(
  3. block, reportedState, storedBlock, ucState, dn);
  4. if (c != null) {
  5. if (shouldPostponeBlocksFromFuture) {
  6. // If the block is an out-of-date generation stamp or state,
  7. // but we're the standby, we shouldn't treat it as corrupt,
  8. // but instead just queue it for later processing.
  9. // TODO: Pretty confident this should be s/storedBlock/block below,
  10. // since we should be postponing the info of the reported block, not
  11. // the stored block. See HDFS-6289 for more context.
  12. queueReportedBlock(storageInfo, storedBlock, reportedState,
  14. } else {
  15. (c);
  16. }
  17. return storedBlock;
  18. }
  19. ...

  1. private BlockToMarkCorrupt checkReplicaCorrupt(
  2. Block reported, ReplicaState reportedState,
  3. BlockInfoContiguous storedBlock, BlockUCState ucState,
  4. DatanodeDescriptor dn) {
  5. switch(reportedState) {
  6. case FINALIZED:
  7. switch(ucState) {
  8. case COMPLETE:
  9. case COMMITTED:
  10. if (() != ()) {
  11. final long reportedGS = ();
  12. return new BlockToMarkCorrupt(storedBlock, reportedGS,
  13. "block is " + ucState + " and reported genstamp " + reportedGS
  14. + " does not match genstamp in block map "
  15. + (), Reason.GENSTAMP_MISMATCH);
  16. } else if (() != ()) {
  17. return new BlockToMarkCorrupt(storedBlock,
  18. "block is " + ucState + " and reported length " +
  19. () + " does not match " +
  20. "length in block map " + (),
  21. Reason.SIZE_MISMATCH);
  22. } else {
  23. return null; // not corrupt
  24. }
  25. ...

  1. private void markBlockAsCorrupt(BlockToMarkCorrupt b,
  2. DatanodeStorageInfo storageInfo,
  3. DatanodeDescriptor node) throws IOException {
  4. ...
  5. // Add this replica to corruptReplicas Map
  6. (, node, ,
  7. );
  8. ...


  1. /** Returns number of blocks with corrupt replicas */
  2. @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
  3. public long getCorruptReplicaBlocks() {
  4. return ();
  5. }
  1. void updateState() {
  2. ...
  3. corruptReplicaBlocksCount = ();
  4. }

ToUc: 正在处理中的块


  1. if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
  2. (new StatefulBlockInfo(
  3. (BlockInfoContiguousUnderConstruction) storedBlock,
  4. new Block(block), reportedState));
  5. return storedBlock;
  6. }
  1. private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
  2. BlockUCState ucState, ReplicaState reportedState) {
  3. switch(reportedState) {
  4. case FINALIZED:
  5. switch(ucState) {
  8. return true;
  9. default:
  10. return false;
  11. }
  12. case RBW:
  13. case RWR:
  14. return (!());
  15. case RUR: // should not be reported
  16. case TEMPORARY: // should not be reported
  17. default:
  18. return false;
  19. }
  20. }

  1. // Process the blocks on each queue
  2. for (StatefulBlockInfo b : toUC) {
  3. addStoredBlockUnderConstruction(b, storageInfo);
  4. }
  1. void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
  2. DatanodeStorageInfo storageInfo) throws IOException {
  3. BlockInfoContiguousUnderConstruction block = ;
  4. (
  5. storageInfo, , );
  6. ...

  1. void addReplicaIfNotPresent(DatanodeStorageInfo storage,
  2. Block block,
  3. ReplicaState rState) {
  4. Iterator<ReplicaUnderConstruction> it = ();
  5. ...
  6. (new ReplicaUnderConstruction(block, storage, rState));
  7. }

  1. /**
  2. * Block replicas as assigned when the block was allocated.
  3. * This defines the pipeline order.
  4. */
  5. private List<ReplicaUnderConstruction> replicas;


