【Hadoop】文件删除过程以及Namenode RPC处理请求优化之IBR

时间:2024-11-16 08:41:46

文章目录

  • 前言
  • 一、文件删除过程
    • 1.提交删除任务
    • 2.文件删除总流程
    • 3.删除命名空间
    • 4.心跳生成删除命令
    • 5.块的删除
  • 二、Namenode的增量块机制
    • (增量块汇报)的延时批处理
  • 后记


前言

前文我们提到线上Hadoop集群因为频繁大批量删除文件导致集群卡顿,后期使用了Namenode的异步增量汇报解决了该故障。本文就简单介绍下文件删除的过程以及Namenode的增量块机制。


一、文件删除过程

该章节着重从源码层面介绍Hadoop删除文件的具体步骤和执行逻辑。

1.提交删除任务

- 提交删除命令:

public boolean delete(String src, boolean recursive) throws IOException {
   checkNNStartup();
   if (stateChangeLog.isDebugEnabled()) {
     stateChangeLog.debug("*DIR* : src=" + src
         + ", recursive=" + recursive);
   }
   namesystem.checkOperation(OperationCategory.WRITE);
   CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
   if (cacheEntry != null && cacheEntry.isSuccess()) {
     return true; // Return previous response
   }

   boolean ret = false;
   try {
     ret = namesystem.delete(src, recursive, cacheEntry != null);
   } finally {
     RetryCache.setState(cacheEntry, ret);
   }
   if (ret) 
     metrics.incrDeleteFileOps();
   return ret;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.文件删除总流程

- 删除命名空间等:

第一步:快照、INODE删除等
第二步:心跳生成删除命令/心跳处理删除命令
第三步:返回增量删除的块列表
第四步:更新editlog
第五步:增量进行快删除

boolean delete(String src, boolean recursive, boolean logRetryCache)
     throws IOException {
   final String operationName = "delete";
   BlocksMapUpdateInfo toRemovedBlocks = null;
   checkOperation(OperationCategory.WRITE);
   final FSPermissionChecker pc = getPermissionChecker();
   //加上写锁
   writeLock();
   boolean ret = false;
   try {
     checkOperation(OperationCategory.WRITE);
     //安全模式禁止操作
     checkNameNodeSafeMode("Cannot delete " + src);
     // Remove a file/directory from the namespace. ---//从命名空间删除相应的文件
     toRemovedBlocks = FSDirDeleteOp.delete(
         this, pc, src, recursive, logRetryCache);
     ret = toRemovedBlocks != null;
   } catch (AccessControlException e) {
     logAuditEvent(false, operationName, src);
     throw e;
   } finally {
     writeUnlock(operationName);
   }
   // 记录editlog
   getEditLog().logSync();
   logAuditEvent(true, operationName, src);
   删除数据块操作,如果删除的块比较大的话,会进行增量删除
   if (toRemovedBlocks != null) {
     removeBlocks(toRemovedBlocks); // Incremental deletion of blocks。 ---4.增量删除
   }
   return ret;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3.删除命名空间

-从namespace删除file/directory:unprotectedDelete()

最终通过FSDirDeleteOp类的unprotectedDelete(FSDirectory, INodesInPath, BlocksMapUpdateInfo, List, long)方法来执行删除操作.之所以叫做unprotectedDelet,是因为这个时候删除只是将该文件从命名空间中删除,并没有真正的写入editlog.

private static boolean unprotectedDelete(FSDirectory fsd, INodesInPath iip,
     ReclaimContext reclaimContext, long mtime) {
   assert fsd.hasWriteLock();

   // check if target node exists
   INode targetNode = iip.getLastINode();
   if (targetNode == null) {
     return false;
   }

   // record modification 快照
   final int latestSnapshot = iip.getLatestSnapshotId();
   targetNode.recordModification(latestSnapshot);

   // Remove the node from the namespace Inode
   long removed = fsd.removeLastINode(iip);
   if (removed == -1) {
     return false;
   }

   // set the parent's modification time
   final INodeDirectory parent = targetNode.getParent();
   parent.updateModificationTime(mtime, latestSnapshot);

   // collect block and update quota
   if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
     targetNode.destroyAndCollectBlocks(reclaimContext);
   } else {
     targetNode.cleanSubtree(reclaimContext, CURRENT_STATE_ID, latestSnapshot);
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

4.心跳生成删除命令

-心跳生成删除命令/心跳处理删除命令:()

具体生成删除相关命令的代码在以下方法中,(DatanodeRegistration, StorageReport[], String, long, long, int, int, int, VolumeFailureSummary).

//check block invalidation
       Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
       if (blks != null) {
         cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
             blockPoolId, blks));
       }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

5.块的删除

-块的删除:

 static BlocksMapUpdateInfo delete(
      FSNamesystem fsn, FSPermissionChecker pc, String src, boolean recursive,
      boolean logRetryCache) throws IOException {
    FSDirectory fsd = fsn.getFSDirectory();

    if (FSDirectory.isExactReservedName(src)) {
      throw new InvalidPathException(src);
    }

    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
    if (fsd.isPermissionEnabled()) {
      fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
                          FsAction.ALL, true);
    }
    if (fsd.isNonEmptyDirectory(iip)) {
      if (!recursive) {
        throw new PathIsNotEmptyDirectoryException(
            iip.getPath() + " is non empty");
      }
      checkProtectedDescendants(fsd, iip);
    }

    return deleteInternal(fsn, iip, logRetryCache); ----返回要删除的收集的块信息
  }

//
   public void incrFilesDeleted(long delta) {
    filesDeleted.incr(delta);
  }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

-返回收集的要删除的块信息包括快照、Inode等:BlocksMapUpdateInfo()

static BlocksMapUpdateInfo deleteInternal(
      FSNamesystem fsn, INodesInPath iip, boolean logRetryCache)
      throws IOException {
    assert fsn.hasWriteLock();
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* : " + iip.getPath());
    }

    FSDirectory fsd = fsn.getFSDirectory();
    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
    List<INode> removedINodes = new ChunkedArrayList<>();
    List<Long> removedUCFiles = new ChunkedArrayList<>();

    long mtime = now();
    // Unlink the target directory from directory tree
    long filesRemoved = delete(
        fsd, iip, collectedBlocks, removedINodes, removedUCFiles, mtime);
    if (filesRemoved < 0) {
      return null;
    }
    fsd.getEditLog().logDelete(iip.getPath(), mtime, logRetryCache);
    incrDeletedFileCount(filesRemoved);

    fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, true);

    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug(
          "DIR* : " + iip.getPath() +" is removed");
    }
    return collectedBlocks;
  }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

-增量删除块:removeBlocks()

blockDeletionIncrement 限制的目的是从blockManager中逐步删除块。并且每次到 blockDeletionIncrement 时,writeLock()会释放然后重新获取,确保其他服务能够进来

  void removeBlocks(BlocksMapUpdateInfo blocks) {
    List<BlockInfo> toDeleteList = blocks.getToDeleteList();
    Iterator<BlockInfo> iter = toDeleteList.iterator();
    while (iter.hasNext()) {
    // 写锁
      writeLock();
      try {
        for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
          blockManager.removeBlock(iter.next()); ---5.块的真正删除
        }
      } finally {
      //释放锁
        writeUnlock("removeBlocks");
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

数据块的真正删除操作如下:

public void removeBlock(BlockInfo block) {
    assert namesystem.hasWriteLock();
    // No need to ACK blocks that are being removed entirely
    // from the namespace, since the removal of the associated
    // file already removes them from the block map below.
    //设置这个块的字节为LONG最大
    block.setNumBytes(BlockCommand.NO_ACK);
    //添加到invalidates集合中
    addToInvalidates(block);
    //从BlocksMap中删除
    removeBlockFromMap(block);
    // Remove the block from pendingReconstruction and neededReconstruction
    PendingBlockInfo remove = pendingReconstruction.remove(block);
    if (remove != null) {
      DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
          .toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
    }
    neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
    postponedMisreplicatedBlocks.remove(block);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

二、Namenode的增量块机制

(增量块汇报)的延时批处理

该特性是从Hadoop2.以后的版本新引入的,简单可以理解为DataNode向Namenode进行块的汇报变成了异步且增量的模式,将同步即时处理方转变为了延时批处理的方式,以此带来系统性能上的提升,这样可以大大降低Namenode的RPC请求负载。

众所周知,HDFS NameNode内部的单一锁设计,使得这个锁显得极为的“重“。这里的重不仅仅说它很重要,而是说持有这个锁需要付出的代价很高。每个请求需要拿到这个锁,然后让NN 去处理这个请求,这里面就包含了很激烈的锁竞争。因此一旦说NN的这个锁被一个大的写操作(比如大目录的删除)持有很长时间的话,其它用户的任务将会马上收到影响。当然删大目录这样的行为并不是经常会发生的,这里笔者想表达的意思是我们应该尽量减少不必要的高密集的写锁持有操作,来减轻其对用户请求正常处理的影响。本文笔者将要阐述的这样的操作是HDFS内部增量块(IBR)的处理操作,现有IBR行为到底会对系统产生多大的影响呢?我们有什么办法可以优化其行为方式呢?这里不再赘述,参考一篇优秀博文,如下:

原文链接:/androidlushangderen/article/details/101643921

后记

本文笔者自己梳理,若有误请大牛指出