通过源码了解hdfs客户端写文件流程

时间:2021-08-09 08:33:36

之前梳理了一下hdfs客户端的简单流程,但为了跟深刻的了解,所以尝试通过源码了解一次。

先准备demo代码,将断点打在fs.copyFromLocalFile()。

public class HdfsClientDemo {	FileSystem fs = null;
Configuration conf = null;
@Before
public void init() throws Exception{

conf = new Configuration();
//可以直接传入 uri和用户身份
fs = FileSystem.get(new URI("hdfs://hadoop001:9000"),conf,"hadoop");
}

/**
* 上传文件
* @throws Exception
*/
@Test
public void testUpload() throws Exception {
if (fs.exists(new Path("/apollo_update.log"))) {
fs.delete(new Path("/apollo_update.log"), true);
}
fs.copyFromLocalFile(new Path("c:/apollo_update.log"), new Path("/apollo_update.log"));
fs.close();
}
}

在文件空间系统之间拷贝文件,这里可以看到如果目标文件是文件夹的话,会通过一个递归操作。

        in = srcFS.open(src);
        out = dstFS.create(dst, overwrite);

这里从localFilesystem打开一个文件,通过类型为DistributedFileSystem的变量dfsFilesystem通过create方法获取一个FSDataOutputStream实例,通过该实例向dataNode写数据,而且里面包含了一个

成员变量dfs的类型为DFSClient,dfs通过RPC与nameNode进行通讯。

  public static boolean copy(FileSystem srcFS, FileStatus srcStatus,                             FileSystem dstFS, Path dst,                             boolean deleteSource,                             boolean overwrite,                             Configuration conf) throws IOException {    Path src = srcStatus.getPath();    dst = checkDest(src.getName(), dstFS, dst, overwrite);    if (srcStatus.isDirectory()) {      checkDependencies(srcFS, src, dstFS, dst);      if (!dstFS.mkdirs(dst)) {        return false;      }      FileStatus contents[] = srcFS.listStatus(src);      for (int i = 0; i < contents.length; i++) {        copy(srcFS, contents[i], dstFS,             new Path(dst, contents[i].getPath().getName()),             deleteSource, overwrite, conf);      }    } else {      InputStream in=null;      OutputStream out = null;      try {        in = srcFS.open(src);        out = dstFS.create(dst, overwrite);        IOUtils.copyBytes(in, out, conf, true);      } catch (IOException e) {        IOUtils.closeStream(out);        IOUtils.closeStream(in);        throw e;      }    }    if (deleteSource) {      return srcFS.delete(src, true);    } else {      return true;    }    }

DFSOutputStream实例被构造的代码如下。

  public DFSOutputStream create(String src,                              FsPermission permission,                             EnumSet<CreateFlag> flag,                              boolean createParent,                             short replication,                             long blockSize,                             Progressable progress,                             int buffersize,                             ChecksumOpt checksumOpt,                             InetSocketAddress[] favoredNodes) throws IOException {    checkOpen();    if (permission == null) {      permission = FsPermission.getFileDefault();    }    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);    if(LOG.isDebugEnabled()) {      LOG.debug(src + ": masked=" + masked);    }    String[] favoredNodeStrs = null;    if (favoredNodes != null) {      favoredNodeStrs = new String[favoredNodes.length];      for (int i = 0; i < favoredNodes.length; i++) {        favoredNodeStrs[i] =             favoredNodes[i].getHostName() + ":"                          + favoredNodes[i].getPort();      }    }    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,        src, masked, flag, createParent, replication, blockSize, progress,        buffersize, dfsClientConf.createChecksum(checksumOpt),        favoredNodeStrs);    beginFileLease(result.getFileId(), result);    return result;  }

dfsClient.namenode.create ()这里是通过rpc调用与nameNode通讯,申请上传文件。

DFSOutputStream含有一个成员变量streamer类型为DataStreamer,主要是负责启动一个线程将文件通过网络传输至hdfs,获取实例后,会调用start的方法,该方法是启动一个pipeline。

streamer里面同时含有一个成员变量response类型为ResponseProcessor,改变量会启动一个线程负责处理dataNode返回的ack包。

  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,      short replication, long blockSize, Progressable progress, int buffersize,      DataChecksum checksum, String[] favoredNodes) throws IOException {    HdfsFileStatus stat = null;    // Retry the create if we get a RetryStartFileException up to a maximum    // number of times    boolean shouldRetry = true;    int retryCount = CREATE_RETRY_COUNT;    while (shouldRetry) {      shouldRetry = false;      try {        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,            new EnumSetWritable<CreateFlag>(flag), createParent, replication,            blockSize, SUPPORTED_CRYPTO_VERSIONS);        break;      } catch (RemoteException re) {        IOException e = re.unwrapRemoteException(            AccessControlException.class,            DSQuotaExceededException.class,            FileAlreadyExistsException.class,            FileNotFoundException.class,            ParentNotDirectoryException.class,            NSQuotaExceededException.class,            RetryStartFileException.class,            SafeModeException.class,            UnresolvedPathException.class,            SnapshotAccessControlException.class,            UnknownCryptoProtocolVersionException.class);        if (e instanceof RetryStartFileException) {          if (retryCount > 0) {            shouldRetry = true;            retryCount--;          } else {            throw new IOException("Too many retries because of encryption" +                " zone operations", e);          }        } else {          throw e;        }      }    }    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,        flag, progress, checksum, favoredNodes);    out.start();    return out;  }
然后进入IOUtils.copyBytes(in, out, conf, true);可以看到一个参数io.file.buffer,是读取文件的缓冲区大小,默认是4096byte,感觉这个太细了,如果设置到大小和一个package大小会不会好点?

  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)    throws IOException {    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);  }

  public static void copyBytes(InputStream in, OutputStream out, int buffSize)     throws IOException {    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;    byte buf[] = new byte[buffSize];    int bytesRead = in.read(buf);    while (bytesRead >= 0) {      out.write(buf, 0, bytesRead);      if ((ps != null) && ps.checkError()) {        throw new IOException("Unable to write to output stream.");      }      bytesRead = in.read(buf);    }  }

DFSOutputStream也拥有自己的缓存区,如果该缓冲区被写满,就用调用writeChecksumChunks这个方法。

  private int write1(byte b[], int off, int len) throws IOException {    if(count==0 && len>=buf.length) {      // local buffer is empty and user buffer size >= local buffer size, so      // simply checksum the user buffer and send it directly to the underlying      // stream      final int length = buf.length;      writeChecksumChunks(b, off, length);      return length;    }        // copy user data to local buffer    int bytesToCopy = buf.length-count;    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;    System.arraycopy(b, off, buf, count, bytesToCopy);    count += bytesToCopy;    if (count == buf.length) {      // local buffer is full      flushBuffer();    }     return bytesToCopy;  }

里面会在调用一个writeChunk的方法

  private void writeChecksumChunks(byte b[], int off, int len)  throws IOException {    sum.calculateChunkedSums(b, off, len, checksum, 0);    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());    }  }
由于循环太多了,为了编码收到while循环的影响,建议在out.close();打断点,里面同样会调用writeChunk这个方法。

里面又一个比较重要的函数waitAndQueueCurrentPacket。

  protected synchronized void writeChunk(byte[] b, int offset, int len,      byte[] checksum, int ckoff, int cklen) throws IOException {    dfsClient.checkOpen();    checkClosed();    if (len > bytesPerChecksum) {      throw new IOException("writeChunk() buffer size is " + len +                            " is larger than supported  bytesPerChecksum " +                            bytesPerChecksum);    }    if (cklen != 0 && cklen != getChecksumSize()) {      throw new IOException("writeChunk() checksum size is supposed to be " +                            getChecksumSize() + " but found to be " + cklen);    }    if (currentPacket == null) {      currentPacket = createPacket(packetSize, chunksPerPacket,           bytesCurBlock, currentSeqno++);      if (DFSClient.LOG.isDebugEnabled()) {        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +             currentPacket.seqno +            ", src=" + src +            ", packetSize=" + packetSize +            ", chunksPerPacket=" + chunksPerPacket +            ", bytesCurBlock=" + bytesCurBlock);      }    }    currentPacket.writeChecksum(checksum, ckoff, cklen);    currentPacket.writeData(b, offset, len);    currentPacket.numChunks++;    bytesCurBlock += len;    // If packet is full, enqueue it for transmission    //    if (currentPacket.numChunks == currentPacket.maxChunks ||        bytesCurBlock == blockSize) {      if (DFSClient.LOG.isDebugEnabled()) {        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +            currentPacket.seqno +            ", src=" + src +            ", bytesCurBlock=" + bytesCurBlock +            ", blockSize=" + blockSize +            ", appendChunk=" + appendChunk);      }      waitAndQueueCurrentPacket();      // If the reopened file did not end at chunk boundary and the above      // write filled up its partial chunk. Tell the summer to generate full       // crc chunks from now on.      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {        appendChunk = false;        resetChecksumBufSize();      }      if (!appendChunk) {        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);        computePacketChunkSize(psize, bytesPerChecksum);      }      //      // if encountering a block boundary, send an empty packet to       // indicate the end of block and reset bytesCurBlock.      //      if (bytesCurBlock == blockSize) {        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);        currentPacket.lastPacketInBlock = true;        currentPacket.syncBlock = shouldSyncBlock;        waitAndQueueCurrentPacket();        bytesCurBlock = 0;        lastFlushOffset = 0;      }    }  }

这里是把一见写满的packet放入队列,准备发送。并唤醒等待的应该是DFSOutputStream用过DataStreamer创建的传输线程。


  private void queueCurrentPacket() {    synchronized (dataQueue) {      if (currentPacket == null) return;      dataQueue.addLast(currentPacket);      lastQueuedSeqno = currentPacket.seqno;      if (DFSClient.LOG.isDebugEnabled()) {        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);      }      currentPacket = null;      dataQueue.notifyAll();    }  }

DataStreamer找出这个类的run方法,尝试打个断点。尝试验证一下,还真进去了。

nextBlockOutputStream进去这个函数看看,发现里面调用了locateFollowingBlock

    public void run() {      long lastPacket = Time.now();      TraceScope traceScope = null;      if (traceSpan != null) {        traceScope = Trace.continueSpan(traceSpan);      }      while (!streamerClosed && dfsClient.clientRunning) {        // if the Responder encountered an error, shutdown Responder        if (hasError && response != null) {          try {            response.close();            response.join();            response = null;          } catch (InterruptedException  e) {            DFSClient.LOG.warn("Caught exception ", e);          }        }        Packet one;        try {          // process datanode IO errors if any          boolean doSleep = false;          if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {            doSleep = processDatanodeError();          }          synchronized (dataQueue) {            // wait for a packet to be sent.            long now = Time.now();            while ((!streamerClosed && !hasError && dfsClient.clientRunning                 && dataQueue.size() == 0 &&                 (stage != BlockConstructionStage.DATA_STREAMING ||                  stage == BlockConstructionStage.DATA_STREAMING &&                  now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);              timeout = timeout <= 0 ? 1000 : timeout;              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?                 timeout : 1000;              try {                dataQueue.wait(timeout);              } catch (InterruptedException  e) {                DFSClient.LOG.warn("Caught exception ", e);              }              doSleep = false;              now = Time.now();            }            if (streamerClosed || hasError || !dfsClient.clientRunning) {              continue;            }            // get packet to be sent.            if (dataQueue.isEmpty()) {              one = createHeartbeatPacket();            } else {              one = dataQueue.getFirst(); // regular data packet            }          }          assert one != null;          // get new block from namenode.          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {            if(DFSClient.LOG.isDebugEnabled()) {              DFSClient.LOG.debug("Allocating new block");            }            setPipeline(nextBlockOutputStream());            initDataStreaming();          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {            if(DFSClient.LOG.isDebugEnabled()) {              DFSClient.LOG.debug("Append to block " + block);            }            setupPipelineForAppendOrRecovery();            initDataStreaming();          }          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();          if (lastByteOffsetInBlock > blockSize) {            throw new IOException("BlockSize " + blockSize +                " is smaller than data size. " +                " Offset of packet in block " +                 lastByteOffsetInBlock +                " Aborting file " + src);          }          if (one.lastPacketInBlock) {            // wait for all data packets have been successfully acked            synchronized (dataQueue) {              while (!streamerClosed && !hasError &&                   ackQueue.size() != 0 && dfsClient.clientRunning) {                try {                  // wait for acks to arrive from datanodes                  dataQueue.wait(1000);                } catch (InterruptedException  e) {                  DFSClient.LOG.warn("Caught exception ", e);                }              }            }            if (streamerClosed || hasError || !dfsClient.clientRunning) {              continue;            }            stage = BlockConstructionStage.PIPELINE_CLOSE;          }                    // send the packet          synchronized (dataQueue) {            // move packet from dataQueue to ackQueue            if (!one.isHeartbeatPacket()) {              dataQueue.removeFirst();              ackQueue.addLast(one);              dataQueue.notifyAll();            }          }          if (DFSClient.LOG.isDebugEnabled()) {            DFSClient.LOG.debug("DataStreamer block " + block +                " sending packet " + one);          }          // write out data to remote datanode          try {            one.writeTo(blockStream);            blockStream.flush();             } catch (IOException e) {            // HDFS-3398 treat primary DN is down since client is unable to             // write to primary DN. If a failed or restarting node has already            // been recorded by the responder, the following call will have no             // effect. Pipeline recovery can handle only one node error at a            // time. If the primary node fails again during the recovery, it            // will be taken out then.            tryMarkPrimaryDatanodeFailed();            throw e;          }          lastPacket = Time.now();                    // update bytesSent          long tmpBytesSent = one.getLastByteOffsetBlock();          if (bytesSent < tmpBytesSent) {            bytesSent = tmpBytesSent;          }          if (streamerClosed || hasError || !dfsClient.clientRunning) {            continue;          }          // Is this block full?          if (one.lastPacketInBlock) {            // wait for the close packet has been acked            synchronized (dataQueue) {              while (!streamerClosed && !hasError &&                   ackQueue.size() != 0 && dfsClient.clientRunning) {                dataQueue.wait(1000);// wait for acks to arrive from datanodes              }            }            if (streamerClosed || hasError || !dfsClient.clientRunning) {              continue;            }            endBlock();          }          if (progress != null) { progress.progress(); }          // This is used by unit test to trigger race conditions.          if (artificialSlowdown != 0 && dfsClient.clientRunning) {            Thread.sleep(artificialSlowdown);           }        } catch (Throwable e) {          // Log warning if there was a real error.          if (restartingNodeIndex == -1) {            DFSClient.LOG.warn("DataStreamer Exception", e);          }          if (e instanceof IOException) {            setLastException((IOException)e);          } else {            setLastException(new IOException("DataStreamer Exception: ",e));          }          hasError = true;          if (errorIndex == -1 && restartingNodeIndex == -1) {            // Not a datanode issue            streamerClosed = true;          }        }      }      if (traceScope != null) {        traceScope.close();      }      closeInternal();    }

终于找到了这么一行代码!!!dfsClient.namenode.addBlock(src, dfsClient.clientName,block, excludedNodes, fileId, favoredNodes);

这里是与nameNode进行RPC调用,获取dataNode的信息。既然获取到信息我们就可以通过网络把数据发送出去了。

    private LocatedBlock locateFollowingBlock(long start,        DatanodeInfo[] excludedNodes)  throws IOException {      int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;      long sleeptime = 400;      while (true) {        long localstart = Time.now();        while (true) {          try {            return dfsClient.namenode.addBlock(src, dfsClient.clientName,                block, excludedNodes, fileId, favoredNodes);          } catch (RemoteException e) {            IOException ue =               e.unwrapRemoteException(FileNotFoundException.class,                                      AccessControlException.class,                                      NSQuotaExceededException.class,                                      DSQuotaExceededException.class,                                      UnresolvedPathException.class);            if (ue != e) {               throw ue; // no need to retry these exceptions            }                                    if (NotReplicatedYetException.class.getName().                equals(e.getClassName())) {              if (retries == 0) {                 throw e;              } else {                --retries;                DFSClient.LOG.info("Exception while adding a block", e);                if (Time.now() - localstart > 5000) {                  DFSClient.LOG.info("Waiting for replication for "                      + (Time.now() - localstart) / 1000                      + " seconds");                }                try {                  DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src                      + " retries left " + retries);                  Thread.sleep(sleeptime);                  sleeptime *= 2;                } catch (InterruptedException ie) {                  DFSClient.LOG.warn("Caught exception ", ie);                }              }            } else {              throw e;            }          }        }      }     }    ExtendedBlock getBlock() {      return block;    }    DatanodeInfo[] getNodes() {      return nodes;    }    Token<BlockTokenIdentifier> getBlockToken() {      return accessToken;    }    private void setLastException(IOException e) {      lastException.compareAndSet(null, e);    }  }

再往下去,最后会发现构造了一个SocketOutputstream,然后通过nio将数据传输至datanode.

DataStreamer里面有一个成员blockStream类型为DataOutputStream,这里底层封装了一个out的成员变量,这里的类型为SocketOutputStream。

然后调用了一个doIO的方法,数据就会通过socket nio传输至HDFS。

  void waitForIO(int ops) throws IOException {        if (selector.select(channel, ops, timeout) == 0) {      throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,                                                              ops));     }  }



最后总结

1、向HDFS上传文件时,主要是通过DistributedFileSystem里的DFSOutputStream实现。

2、DFSOutputStream中比较重要的成员变量如下:

  private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();  // 准备写入的Packet队列  private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();   // dataNode返回的答应队列  private DataStreamer streamer;                                          // 负责开启线程,通过nio的方式将Packet传输至hdfs
3、再看看DataStreamer中有什么重要的成员变量:

    private DataOutputStream blockStream;     // 里面实现了SocketOutputStream接口,负责发送Packet    private DataInputStream blockReplyStream; // 接收dataNode返回的答应包    private ResponseProcessor response = null; // 开启一个线程负责将dataNode返回的应答包反序列化,然后进行校验。

4、当第一次与nameNode通讯时,不会立刻获取到dataNode列表,而是通知nameNode准备上传文件。客户端将文件数据写入DFSOutputStream中的缓冲区,缓冲区写满后,唤醒DataSreamer线程,该线程再次与nameNode通讯,获取dataNode列表,再将文件数据发送去dataNode,与此同时dataNode也会返回应答包。然后会唤醒ResponseProcessor线程,分析应答包。从这里看出,读数据,上传数据 和 分析数据都有一个对应的线程。

5、文件数据的传输单位为Packet,一个Packet默认大小为64K,一个Packet中含有多个chunk,chunk为校验数据的单位,默认大小512byte。

6、按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1,2, 3,则会进行如下的过程:

  • 首先将package 1写入DataNode 1
  • 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
  • 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
  • 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕.



【来自@若泽大数据】