之前梳理了一下hdfs客户端的简单流程,但为了跟深刻的了解,所以尝试通过源码了解一次。
先准备demo代码,将断点打在fs.copyFromLocalFile()。
public class HdfsClientDemo { FileSystem fs = null;
Configuration conf = null;
@Before
public void init() throws Exception{
conf = new Configuration();
//可以直接传入 uri和用户身份
fs = FileSystem.get(new URI("hdfs://hadoop001:9000"),conf,"hadoop");
}
/**
* 上传文件
* @throws Exception
*/
@Test
public void testUpload() throws Exception {
if (fs.exists(new Path("/apollo_update.log"))) {
fs.delete(new Path("/apollo_update.log"), true);
}
fs.copyFromLocalFile(new Path("c:/apollo_update.log"), new Path("/apollo_update.log"));
fs.close();
}
}
在文件空间系统之间拷贝文件,这里可以看到如果目标文件是文件夹的话,会通过一个递归操作。
in = srcFS.open(src);
out = dstFS.create(dst, overwrite);
这里从localFilesystem打开一个文件,通过类型为DistributedFileSystem的变量dfsFilesystem通过create方法获取一个FSDataOutputStream实例,通过该实例向dataNode写数据,而且里面包含了一个
成员变量dfs的类型为DFSClient,dfs通过RPC与nameNode进行通讯。
public static boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { Path src = srcStatus.getPath(); dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcStatus.isDirectory()) { checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; } FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); } } else { InputStream in=null; OutputStream out = null; try { in = srcFS.open(src); out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } if (deleteSource) { return srcFS.delete(src, true); } else { return true; } }
DFSOutputStream实例被构造的代码如下。
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } String[] favoredNodeStrs = null; if (favoredNodes != null) { favoredNodeStrs = new String[favoredNodes.length]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort(); } } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(result.getFileId(), result); return result; }
dfsClient.namenode.create ()这里是通过rpc调用与nameNode通讯,申请上传文件。
DFSOutputStream含有一个成员变量streamer类型为DataStreamer,主要是负责启动一个线程将文件通过网络传输至hdfs,获取实例后,会调用start的方法,该方法是启动一个pipeline。
streamer里面同时含有一个成员变量response类型为ResponseProcessor,改变量会启动一个线程负责处理dataNode返回的ack包。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out; }然后进入IOUtils.copyBytes(in, out, conf, true);可以看到一个参数io.file.buffer,是读取文件的缓冲区大小,默认是4096byte,感觉这个太细了,如果设置到大小和一个package大小会不会好点?
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) throws IOException { copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); }
public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } }
DFSOutputStream也拥有自己的缓存区,如果该缓冲区被写满,就用调用writeChecksumChunks这个方法。
private int write1(byte b[], int off, int len) throws IOException { if(count==0 && len>=buf.length) { // local buffer is empty and user buffer size >= local buffer size, so // simply checksum the user buffer and send it directly to the underlying // stream final int length = buf.length; writeChecksumChunks(b, off, length); return length; } // copy user data to local buffer int bytesToCopy = buf.length-count; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; System.arraycopy(b, off, buf, count, bytesToCopy); count += bytesToCopy; if (count == buf.length) { // local buffer is full flushBuffer(); } return bytesToCopy; }
里面会在调用一个writeChunk的方法
private void writeChecksumChunks(byte b[], int off, int len) throws IOException { sum.calculateChunkedSums(b, off, len, checksum, 0); for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } }由于循环太多了,为了编码收到while循环的影响,建议在out.close();打断点,里面同样会调用writeChunk这个方法。
里面又一个比较重要的函数waitAndQueueCurrentPacket。
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } if (cklen != 0 && cklen != getChecksumSize()) { throw new IOException("writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen); } if (currentPacket == null) { currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock); } } currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len; // If packet is full, enqueue it for transmission // if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.seqno + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } waitAndQueueCurrentPacket(); // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { appendChunk = false; resetChecksumBufSize(); } if (!appendChunk) { int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; } } }
这里是把一见写满的packet放入队列,准备发送。并唤醒等待的应该是DFSOutputStream用过DataStreamer创建的传输线程。
private void queueCurrentPacket() { synchronized (dataQueue) { if (currentPacket == null) return; dataQueue.addLast(currentPacket); lastQueuedSeqno = currentPacket.seqno; if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Queued packet " + currentPacket.seqno); } currentPacket = null; dataQueue.notifyAll(); } }
DataStreamer找出这个类的run方法,尝试打个断点。尝试验证一下,还真进去了。
nextBlockOutputStream进去这个函数看看,发现里面调用了locateFollowingBlock
public void run() { long lastPacket = Time.now(); TraceScope traceScope = null; if (traceSpan != null) { traceScope = Trace.continueSpan(traceSpan); } while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } Packet one; try { // process datanode IO errors if any boolean doSleep = false; if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) { doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.now(); while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; try { dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.now(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); } else { one = dataQueue.getFirst(); // regular data packet } } assert one != null; // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.lastPacketInBlock) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode try { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; } lastPacket = Time.now(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.lastPacketInBlock) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException)e); } else { setLastException(new IOException("DataStreamer Exception: ",e)); } hasError = true; if (errorIndex == -1 && restartingNodeIndex == -1) { // Not a datanode issue streamerClosed = true; } } } if (traceScope != null) { traceScope.close(); } closeInternal(); }
终于找到了这么一行代码!!!dfsClient.namenode.addBlock(src, dfsClient.clientName,block, excludedNodes, fileId, favoredNodes);
这里是与nameNode进行RPC调用,获取dataNode的信息。既然获取到信息我们就可以通过网络把数据发送出去了。
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.now(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " + (Time.now() - localstart) / 1000 + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { DFSClient.LOG.warn("Caught exception ", ie); } } } else { throw e; } } } } } ExtendedBlock getBlock() { return block; } DatanodeInfo[] getNodes() { return nodes; } Token<BlockTokenIdentifier> getBlockToken() { return accessToken; } private void setLastException(IOException e) { lastException.compareAndSet(null, e); } }
再往下去,最后会发现构造了一个SocketOutputstream,然后通过nio将数据传输至datanode.
DataStreamer里面有一个成员blockStream类型为DataOutputStream,这里底层封装了一个out的成员变量,这里的类型为SocketOutputStream。
然后调用了一个doIO的方法,数据就会通过socket nio传输至HDFS。
void waitForIO(int ops) throws IOException { if (selector.select(channel, ops, timeout) == 0) { throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops)); } }
最后总结
1、向HDFS上传文件时,主要是通过DistributedFileSystem里的DFSOutputStream实现。
2、DFSOutputStream中比较重要的成员变量如下:
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); // 准备写入的Packet队列 private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // dataNode返回的答应队列 private DataStreamer streamer; // 负责开启线程,通过nio的方式将Packet传输至hdfs3、再看看DataStreamer中有什么重要的成员变量:
private DataOutputStream blockStream; // 里面实现了SocketOutputStream接口,负责发送Packet private DataInputStream blockReplyStream; // 接收dataNode返回的答应包 private ResponseProcessor response = null; // 开启一个线程负责将dataNode返回的应答包反序列化,然后进行校验。
4、当第一次与nameNode通讯时,不会立刻获取到dataNode列表,而是通知nameNode准备上传文件。客户端将文件数据写入DFSOutputStream中的缓冲区,缓冲区写满后,唤醒DataSreamer线程,该线程再次与nameNode通讯,获取dataNode列表,再将文件数据发送去dataNode,与此同时dataNode也会返回应答包。然后会唤醒ResponseProcessor线程,分析应答包。从这里看出,读数据,上传数据 和 分析数据都有一个对应的线程。
5、文件数据的传输单位为Packet,一个Packet默认大小为64K,一个Packet中含有多个chunk,chunk为校验数据的单位,默认大小512byte。
6、按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1,2, 3,则会进行如下的过程:
- 首先将package 1写入DataNode 1
- 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
- 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
- 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕.
【来自@若泽大数据】