hadoop源码 - HDFS的文件操作流 写操作(客户端)

时间:2021-08-09 08:33:12
在前面的博文中我主要从客户端的角度讲述了HDFS文件写操作的工作流程,但是关于客户端是如何把数据块传送到数据节点,同时数据节点又是如何来接受来自客户端的数据块呢?这就是本文将要讨论的。

    在客户端负责数据写入处理的核心类是DFSOutputStream,它的内部主要有数据包发送器DataStream数据包确认处理器ResponseProcessor和数据包封装器Packet,其整体设计架构图如下:

hadoop源码 - HDFS的文件操作流 写操作(客户端)

1.数据校验器FSOutputSummer

[java] view plaincopy
  1. abstract public class FSOutputSummer extends OutputStream {  
  2.   private Checksum sum;             //数据校验器   
  3.     
  4.   private byte buf[];               //数据缓存  
  5.   private byte checksum[];          //校验数据缓存  
  6.     
  7.   private int count;                    //数据缓存的当前写入位置  
  8.     
  9.   protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {  
  10.     this.sum = sum;  
  11.     this.buf = new byte[maxChunkSize];  
  12.     this.checksum = new byte[checksumSize];  
  13.     this.count = 0;  
  14.   }  
  15.     
  16.   /*  
  17.    * 写入数据及其检验数据 
  18.    */  
  19.   protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException;  
  20.   
  21.     
  22.   /** Write one byte */  
  23.   public synchronized void write(int b) throws IOException {  
  24.     sum.update(b);  
  25.     buf[count++] = (byte)b;  
  26.     if(count == buf.length) {  
  27.       flushBuffer();  
  28.     }  
  29.   }  
  30.   
  31.   /** 
  32.    *  
  33.    */  
  34.   public synchronized void write(byte b[], int off, int len) throws IOException {  
  35.     if (off < 0 || len < 0 || off > b.length - len) {  
  36.       throw new ArrayIndexOutOfBoundsException();  
  37.     }  
  38.   
  39.     for (int n=0; n<len; n+=write1(b, off+n, len-n)) {  
  40.     }  
  41.   }  
  42.     
  43.   /** 
  44.    * Write a portion of an array, flushing to the underlying 
  45.    * stream at most once if necessary. 
  46.    */  
  47.   private int write1(byte b[], int off, int len) throws IOException {  
  48.     if(count==0 && len>=buf.length) {  
  49.       // local buffer is empty and user data has one chunk  
  50.       // checksum and output data  
  51.       final int length = buf.length;  
  52.       sum.update(b, off, length);  
  53.       writeChecksumChunk(b, off, length, false);  
  54.         
  55.       return length;  
  56.     }  
  57.       
  58.     // copy user data to local buffer  
  59.     int bytesToCopy = buf.length-count;  
  60.     bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;  
  61.     sum.update(b, off, bytesToCopy);  
  62.     System.arraycopy(b, off, buf, count, bytesToCopy);  
  63.     count += bytesToCopy;  
  64.     if (count == buf.length) {  
  65.       // local buffer is full  
  66.       flushBuffer();  
  67.     }   
  68.       
  69.     return bytesToCopy;  
  70.   }  
  71.   
  72.   /** Generate checksum for the data chunk and output data chunk & checksum 
  73.    * to the underlying output stream. If keep is true then keep the 
  74.    * current checksum intact, do not reset it. 
  75.    */  
  76.   private void writeChecksumChunk(byte b[], int off, int len, boolean keep) throws IOException {  
  77.     int tempChecksum = (int)sum.getValue();  
  78.     if (!keep) {  
  79.       sum.reset();  
  80.     }  
  81.     int2byte(tempChecksum, checksum);  
  82.     writeChunk(b, off, len, checksum);  
  83.   }  
  84. }  

2.DFSOutputStream

   上一次在HDFS的文件操作流(1)——写操作(客户端概述)一文中粗略的提到了DataStreamer线程,那么现在我们就来具体的看看客户端是如何传输数据的。先来看看底层文件写入流DFSOutputSream的核心代码:

[java] view plaincopy
  1. class DFSOutputStream extends FSOutputSummer implements Syncable {  
  2.    private Socket s;                                //客户端与副本复制流水线中第一个存储节点的连接  
  3.    boolean closed = false;                          //写入操作是否关闭  
  4.    
  5.    private String src;                              //文件路径  
  6.    private DataOutputStream blockStream;            //客户端向副本复制流水线中第一个存储节点写入数据的IO流  
  7.    private DataInputStream blockReplyStream;        //客户端从副本复制流水线中第一个存储节点读取数据包确认的IO流  
  8.    private Block block;                             //当前正在写入的数据块  
  9.    final private long blockSize;                    //数据块大小  
  10.    private DataChecksum checksum;                   //保存数据校验和计算器的类型信息  
  11.    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();     //等待发送的数据包队列  
  12.    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();      //等待确认的数据包队列  
  13.    private Packet currentPacket = null;  
  14.    private int maxPackets = 80;                     //客户端写入时可缓存的数据包最大数量  
  15.    // private int maxPackets = 1000; // each packet 64K, total 64MB  
  16.    private DataStreamer streamer = new DataStreamer();                      //后台工作线程,负责数据包的发送  
  17.    private ResponseProcessor response = null;                               //后台工作线程,负责确认包接受与处理,一个数据块对应一个确认处理线程  
  18.    private long currentSeqno = 0;                   //下一个数据包的序号  
  19.    private long bytesCurBlock = 0;              //当前数据块已写入数据的大小  
  20.    private int packetSize = 0;                  //一个数据包的实际大小  
  21.    private int chunksPerPacket = 0;             //一个数据包最多包含多少个校验块  
  22.    private DatanodeInfo[] nodes = null;             //当前数据块副本的存储节点  
  23.    private volatile boolean hasError = false;   //数据节点接收数据包时是否出错  
  24.    private volatile int errorIndex = 0;         //出错的数据节点  
  25.    private volatile IOException lastException = null;  
  26.    private long artificialSlowdown = 0;  
  27.    private long lastFlushOffset = -1// offset when flush was invoked  
  28.    private boolean persistBlocks = false;       // persist blocks on namenode  
  29.    private int recoveryErrorCount = 0;          // number of times block recovery failed  
  30.    private int maxRecoveryErrorCount = 5;       //数据写入出错时,数据块恢复可尝试的最大次数  
  31.    private volatile boolean appendChunk = false;   //当前数据块中最后一个校验块是否未满(文件追加)  
  32.    private long initialFileSize = 0;                //文件追加写开始时,文件的初始大小  
  33.   
  34.     ...  
  35.      
  36.    /** 
  37.     *  写入数据及其检验数据 
  38.     */  
  39.    protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException {  
  40.      checkOpen();  
  41.      isClosed();  
  42.    
  43.      int cklen = checksum.length;  
  44.      int bytesPerChecksum = this.checksum.getBytesPerChecksum();   
  45.      if (len > bytesPerChecksum) {       //如果待写入的数据长度大于检验块长度,则出现错误状态,抛出异常  
  46.        throw new IOException("writeChunk() buffer size is " + len + " is larger than supported  bytesPerChecksum " + bytesPerChecksum);  
  47.      }  
  48.      if (checksum.length != this.checksum.getChecksumSize()) {  
  49.        throw new IOException("writeChunk() checksum size is supposed to be " + this.checksum.getChecksumSize() + " but found to be " + checksum.length);  
  50.      }  
  51.   
  52.      synchronized (dataQueue) {  
  53.    
  54.         //如果待写入数据包数量与待确认数据包数量之和大于一个阈值,则当前写线程开始阻塞  
  55.        while (!closed && dataQueue.size() + ackQueue.size()  > maxPackets) {  
  56.          try {  
  57.            dataQueue.wait();  
  58.          } catch (InterruptedException  e) {  
  59.          }  
  60.        }  
  61.        isClosed();  
  62.    
  63.        if(currentPacket == null) {  
  64.          currentPacket = new Packet(packetSize, chunksPerPacket, bytesCurBlock);  
  65.          //if (LOG.isDebugEnabled()) {  
  66.            LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock);  
  67.          //}  
  68.        }  
  69.   
  70.         //将校验数据转存到数据包中  
  71.        currentPacket.writeChecksum(checksum, 0, cklen);  
  72.         //将数据转存到数据包中  
  73.        currentPacket.writeData(b, offset, len);  
  74.         //记录当前数据包中已写入的校验块数量  
  75.        currentPacket.numChunks++;  
  76.         //记录当前数据块已写入数据的长度  
  77.        bytesCurBlock += len;  
  78.   
  79.         //如果数据包已写满,或者当前数据块已写满  
  80.        if(currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) {  
  81.           
  82.          if(LOG.isDebugEnabled()) {  
  83.            LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.seqno + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk);  
  84.          }  
  85.            
  86.          //如果当前数据块已写满,则标记当前数据包为最后一个数据包,数据块已写数据的计数器置0  
  87.          if(bytesCurBlock == blockSize) {  
  88.            currentPacket.lastPacketInBlock = true;  
  89.            bytesCurBlock = 0;  
  90.            lastFlushOffset = -1;  
  91.          }  
  92.            
  93.          LOG.debug("the packet is full,so put it to dataQueue");  
  94.          dataQueue.addLast(currentPacket);  
  95.          dataQueue.notifyAll();  
  96.           //将当前数据包置空,以使得下次写入时重新创建数据包对象  
  97.          currentPacket = null;  
  98.   
  99.          // If this was the first write after reopening a file, then the above  
  100.          // write filled up any partial chunk. Tell the summer to generate full   
  101.          // crc chunks from now on.  
  102.          if(appendChunk) {  
  103.            appendChunk = false;  
  104.            resetChecksumChunk(bytesPerChecksum);  
  105.          }  
  106.            
  107.           //根据当前数据块的剩余空间从新计算数据包的实际大小  
  108.          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);  
  109.          computePacketChunkSize(psize, bytesPerChecksum);  
  110.        }  
  111.          
  112.      }  
  113.      //LOG.debug("DFSClient writeChunk done length " + len + " checksum length " + cklen);  
  114.    }  


3.数据包装器Packet

    客户端向第一个存储节点发送的数据是按照数据包的形式来组织的,以此来提高网络IO的效

[java] view plaincopy
  1. private class Packet {  
  2.       ByteBuffer buffer;             
  3.       byte[]  buf;                  // 数据缓存区  
  4.       long    seqno;               // 数据包在数据块中的序列号  
  5.       long    offsetInBlock;       // 数据包在数据块中的偏移位置  
  6.       boolean lastPacketInBlock;   // 是否是数据块的最后一个数据包  
  7.       int     numChunks;           // 数据包当前存放了多少个校验块  
  8.       int     maxChunks;           // 数据包最多可有多少个校验块  
  9.       int     dataStart;                // 数据在该数据包中的开始位置  
  10.       int     dataPos;              // 当前的数据写入位置  
  11.       int     checksumStart;        // 校验数据在该数据包中的开始位置  
  12.       int     checksumPos;          // 当前的校验数据写入位置  
  13.     
  14.          
  15.       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {  
  16.         this.lastPacketInBlock = false;  
  17.         this.numChunks = 0;  
  18.         this.offsetInBlock = offsetInBlock;  
  19.         this.seqno = currentSeqno;  
  20.         currentSeqno++;  
  21.           
  22.         buffer = null;  
  23.         buf = new byte[pktSize];  
  24.           
  25.          //计算校验数据在数据包中的开始位置  
  26.         checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;  
  27.         checksumPos = checksumStart;  
  28.            
  29.          //计算数据在数据包中的开始位置  
  30.         dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();  
  31.         dataPos = dataStart;  
  32.           
  33.         maxChunks = chunksPerPkt;  
  34.       }  
  35.   
  36.       //写入数据  
  37.       void writeData(byte[] inarray, int off, int len) {  
  38.         if ( dataPos + len > buf.length) {  
  39.           throw new BufferOverflowException();  
  40.         }  
  41.         System.arraycopy(inarray, off, buf, dataPos, len);  
  42.         dataPos += len;  
  43.       }  
  44.     
  45.       //写入检验数据  
  46.       void  writeChecksum(byte[] inarray, int off, int len) {  
  47.         if (checksumPos + len > dataStart) {  
  48.           throw new BufferOverflowException();  
  49.         }  
  50.         System.arraycopy(inarray, off, buf, checksumPos, len);  
  51.         checksumPos += len;  
  52.       }  
  53.         
  54.       /** 
  55.        * 获取整个数据包的数据(数据包头部+校验数据+数据) 
  56.        *    数据包头部: 
  57.        *        1).数据包的长度(4+校验数据长度+数据长度)(4) 
  58.        *        2).数据包的数据在数据块中的偏移位置(8) 
  59.        *        3).数据包在数据块中的序列号(8) 
  60.        *        4).是否是数据块中的最后一个数据包(1) 
  61.        */  
  62.       ByteBuffer getBuffer() {  
  63.         /* Once this is called, no more data can be added to the packet. 
  64.          * setting 'buf' to null ensures that. 
  65.          * This is called only when the packet is ready to be sent. 
  66.          */  
  67.         if (buffer != null) {  
  68.           return buffer;  
  69.         }  
  70.           
  71.         int dataLen = dataPos - dataStart;                  //数据长度  
  72.         int checksumLen = checksumPos - checksumStart;      //校验数据长度  
  73.           
  74.         // 如果检验数据和数据不是连续的,则将校验数据往后移动,使得检验数据和数据是连续的.  
  75.         // 之所以移动检验数据而不移动数据主要是应为检验数据一般要比数据少得多,这样移动的开销也就小  
  76.         if (checksumPos != dataStart) {  
  77.           System.arraycopy(buf, checksumStart, buf, dataStart - checksumLen , checksumLen);   
  78.         }  
  79.           
  80.         int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;  
  81.           
  82.          //开始写入数据包的头部,同时也使得头部数据和校验数据是连续的  
  83.         buffer = ByteBuffer.wrap(buf, dataStart - checksumPos, DataNode.PKT_HEADER_LEN + pktLen);  
  84.         buf = null;  
  85.         buffer.mark();  
  86.           
  87.         buffer.putInt(pktLen);                                  // 数据包的长度(4+校验数据长度+数据长度)(4)  
  88.         buffer.putLong(offsetInBlock);                      // 数据包的数据在数据块中的偏移位置(8)  
  89.         buffer.putLong(seqno);                                  // 数据包在数据块中的序列号(8)  
  90.         buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));   // 是否是数据块中的最后一个数据包(1)  
  91.         //end of pkt header  
  92.         buffer.putInt(dataLen);                                 // 数据长度  
  93.           
  94.         buffer.reset();  
  95.         return buffer;  
  96.       }  
  97.     }  

4.数据包发送器DataStreamer

   从DFSOutputSream的核心函数writeChunk()我们可以看出,DFSOutputSream先把写入的数据缓存到packet中,当packet满了,或者是当前Block满了,则把packet放入队列dataQueue,等待其它的工作者把该packet发送到目标数据节点上。其实,这个工作者就是DataStreamer,它是DFSOutputSream的一个内部线程类,下面就来看看DataStreamer是如何工作的吧!

[java] view plaincopy
  1. private class DataStreamer extends Daemon {  
  2.   
  3.       private volatile boolean closed = false;  
  4.     
  5.       public void run() {  
  6.             
  7.         while (!closed && clientRunning) {  
  8.   
  9.            //如果接收到了数据节点返回的错误确认包,则关闭确认处理器  
  10.           if (hasError && response != null) {  
  11.             try {  
  12.               response.close();  
  13.               response.join();  
  14.               response = null;  
  15.             } catch (InterruptedException  e) {  
  16.             }  
  17.           }  
  18.   
  19.           Packet one = null;  
  20.           synchronized (dataQueue) {  
  21.   
  22.             // process IO errors if any  
  23.             boolean doSleep = processDatanodeError(hasError, false);  
  24.   
  25.             // wait for a packet to be sent.  
  26.             while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {  
  27.               try {  
  28.                 dataQueue.wait(1000);  
  29.               } catch (InterruptedException  e) {  
  30.               }  
  31.               doSleep = false;  
  32.             }  
  33.               
  34.             if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {  
  35.               continue;  
  36.             }  
  37.   
  38.             try {  
  39.               // get packet to be sent.  
  40.               one = dataQueue.getFirst();  
  41.               long offsetInBlock = one.offsetInBlock;  
  42.     
  43.                 //向NameNode节点申请一个新的数据块,并创建与第一个存储节点的网络I/O流  
  44.               if (blockStream == null) {  
  45.                 LOG.debug("Allocating new block");  
  46.                 nodes = nextBlockOutputStream(src);   
  47.                 this.setName("DataStreamer for file " + src +  " block " + block);  
  48.                 response = new ResponseProcessor(nodes);  
  49.                 response.start();  
  50.                 lastTime = System.currentTimeMillis();  
  51.               }  
  52.   
  53.                //如果将要写入的数据在当前数据块中的起始位置超过了该数据块的大小,则抛出异常  
  54.              if (offsetInBlock >= blockSize) {  
  55.                 throw new IOException("BlockSize " + blockSize + " is smaller than data size. Offset of packet in block " + offsetInBlock + " Aborting file " + src);  
  56.               }  
  57.   
  58.               ByteBuffer buf = one.getBuffer();  
  59.                 
  60.               // move packet from dataQueue to ackQueue  
  61.               dataQueue.removeFirst();  
  62.               dataQueue.notifyAll();  
  63.               synchronized (ackQueue) {  
  64.                 ackQueue.addLast(one);  
  65.                 ackQueue.notifyAll();  
  66.               }   
  67.                 
  68.               // write out data to remote datanode  
  69.               blockStream.write(buf.array(), buf.position(), buf.remaining());  
  70.                 
  71.               if (one.lastPacketInBlock) {  
  72.                 blockStream.writeInt(0); // indicate end-of-block   
  73.               }  
  74.               blockStream.flush();  
  75.                 
  76.               if (LOG.isDebugEnabled()) {  
  77.                 LOG.debug("DataStreamer block " + block + " wrote packet seqno:" + one.seqno + " size:" + buf.remaining() + " offsetInBlock:" + one.offsetInBlock + " lastPacketInBlock:" + one.lastPacketInBlock);  
  78.               }  
  79.                 
  80.             } catch (Throwable e) {  
  81.               LOG.warn("DataStreamer Exception: " + StringUtils.stringifyException(e));  
  82.               if (e instanceof IOException) {  
  83.                 setLastException((IOException)e);  
  84.               }  
  85.               hasError = true;  
  86.             }  
  87.           }  
  88.   
  89.           if (closed || hasError || !clientRunning) {  
  90.             continue;  
  91.           }  
  92.   
  93.            //当前数据块已经写满,在收到该数据块的所有确认包之后,关闭相应的网络I/O流,确认包处理线程  
  94.           if (one.lastPacketInBlock) {  
  95.             synchronized (ackQueue) {  
  96.               while (!hasError && ackQueue.size() != 0 && clientRunning) {  
  97.                 try {  
  98.                   ackQueue.wait();   // wait for acks to arrive from datanodes  
  99.                 } catch (InterruptedException  e) {  
  100.                 }  
  101.               }  
  102.             }  
  103.             LOG.debug("Closing old block " + block);  
  104.             this.setName("DataStreamer for file " + src);  
  105.   
  106.             response.close();        // ignore all errors in Response  
  107.             try {  
  108.               response.join();  
  109.               response = null;  
  110.             } catch (InterruptedException  e) {  
  111.             }  
  112.   
  113.             if (closed || hasError || !clientRunning) {  
  114.               continue;  
  115.             }  
  116.   
  117.             synchronized (dataQueue) {  
  118.               try {  
  119.                   LOG.debug("start to close wrtiter stream to DataNode["+s.getInetAddress()+"].");  
  120.                 blockStream.close();  
  121.                   
  122.                 LOG.debug("start to close reader stream from DataNode["+s.getInetAddress()+"].");  
  123.                 blockReplyStream.close();  
  124.                   
  125.                 System.out.println("send a block takes "+(System.currentTimeMillis()-lastTime)+" ms");  
  126.                   
  127.               } catch (IOException e) {  
  128.               }  
  129.               nodes = null;  
  130.               response = null;  
  131.               blockStream = null;  
  132.               blockReplyStream = null;  
  133.             }  
  134.           }  
  135.             
  136.           if (progress != null) { progress.progress(); }  
  137.   
  138.           // This is used by unit test to trigger race conditions.  
  139.           if (artificialSlowdown != 0 && clientRunning) {  
  140.             try {   
  141.               Thread.sleep(artificialSlowdown);   
  142.             } catch (InterruptedException e) {}  
  143.           }  
  144.         }  
  145.           
  146.       }  
  147.   
  148.       // shutdown thread  
  149.       void close() {  
  150.         closed = true;  
  151.         synchronized (dataQueue) {  
  152.           dataQueue.notifyAll();  
  153.         }  
  154.         synchronized (ackQueue) {  
  155.           ackQueue.notifyAll();  
  156.         }  
  157.         this.interrupt();  
  158.       }  
  159.     }  

     上面的代码值得让我们注意的是,在Hadoop的官网上有关于介绍HDFS的一句话:A client request to create a file does not reach the NameNode immediately. In fact, initially the HDFS client caches the file data into a temporary local file. Application writes are transparently redirected to this temporary local file. When the local file accumulates data worth over one HDFS block size, the client contacts the NameNode. 翻译这句话,我就在这里不献丑了。很多分析过源代码的朋友都认为这句话说得有问题,但是我想说的,这就话在本质上是没有问题的,因为DataStreamer总是一个数据块接着一个数据块向目标数据节点发送,也就是对于已经向某一个数据节点发送了一个Block后,DataStreamer并不是马上发送下一个Block,而是要等到packet得到确认后才发送下一个Block,假设当一个用户调用HDFS的API写入了2个Block的数据。此时DataStreamer还在等待第一个Block的所有packet的ack,那么用户的第2个Block的数据还缓存在dataQueue中,同时DataStreamer也没有向NameNode申请第二个Block。那么现在大家再来体会一下刚才那句话。是不是还有点意思呢?另外,用户不能一味的发送数据,否则缓存扛不住,所以就有一个限制了,也就是总的缓存数据不能超过maxPackets个packet,这个值视运行环境而定,目前默认是80或者是1000。ok,再来看看nextBlockOutputStream函数到底为数据块向数据节点的传送到底干了那些工作。

[java] view plaincopy
  1. /** 
  2.   * 申请一个新的数据块来存储用户写入的数据,并且与第一个存储节点建立网络连接 
  3.   */  
  4.  private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {  
  5.    LocatedBlock lb = null;  
  6.    boolean retry = false;  
  7.    DatanodeInfo[] nodes;  
  8.    int count = conf.getInt("dfs.client.block.write.retries"3);  
  9.    boolean success;  
  10.    do {  
  11.      hasError = false;  
  12.      lastException = null;  
  13.      errorIndex = 0;  
  14.      retry = false;  
  15.      nodes = null;  
  16.      success = false;  
  17.                
  18.      long startTime = System.currentTimeMillis();  
  19.      lb = locateFollowingBlock(startTime);  
  20.      block = lb.getBlock();  
  21.      nodes = lb.getLocations();  
  22.        
  23.      LOG.debug("locate a block["+block.getBlockId()+"] for file["+src+"]: "+nodes);  
  24.   
  25.       //与第一个存储节点建立网络连接  
  26.      success = createBlockOutputStream(nodes, clientName, false);  
  27.   
  28.      if(!success) {  
  29.        LOG.info("Abandoning block " + block);  
  30.         //与第一个存储节点建立网络连接失败,所以放弃该数据块  
  31.        namenode.abandonBlock(block, src, clientName);  
  32.   
  33.        // Connection failed. Let's wait a little bit and retry  
  34.        retry = true;  
  35.        try {  
  36.          if (System.currentTimeMillis() - startTime > 5000) {  
  37.            LOG.info("Waiting to find target node: " + nodes[0].getName());  
  38.          }  
  39.          Thread.sleep(6000);  
  40.        } catch (InterruptedException iex) {  
  41.        }  
  42.      }  
  43.    } while (retry && --count >= 0);  
  44.   
  45.    if (!success) {  
  46.      throw new IOException("Unable to create new block.");  
  47.    }  
  48.    return nodes;  
  49.  }  
  50.   
  51.   /* 
  52.    * 与第一个存储节点建立网络连接,并写入数据块传输的头部信息: 
  53.    *    1).数据传输协议版本号 
  54.    *    2).数据传输类型 
  55.    *    3).数据块基本信息 
  56.    *        a).数据块id 
  57.    *        b).数据块时间戳 
  58.    *        c).数据块的副本数 
  59.    *    4).数据块恢复标记 
  60.    *    5).客户端名称 
  61.    *    6).源节点是否是数据节点 
  62.    *    7).复制流水线中剩余的存储节点数量 
  63.    *    8).剩余存储节点信息 
  64.    *    9).数据校验器信息 
  65.    */  
  66.  private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client, boolean recoveryFlag) {  
  67.    String firstBadLink = "";  
  68.    if (LOG.isDebugEnabled()) {  
  69.      for (int i = 0; i < nodes.length; i++) {  
  70.        LOG.debug("pipeline = " + nodes[i].getName());  
  71.      }  
  72.    }  
  73.   
  74.    // persist blocks on namenode on next flush  
  75.    persistBlocks = true;  
  76.   
  77.    try {  
  78.      LOG.debug("Connecting to " + nodes[0].getName());  
  79.      InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());  
  80.      s = socketFactory.createSocket();  
  81.      int timeoutValue = 3000 * nodes.length + socketTimeout;  
  82.      NetUtils.connect(s, target, timeoutValue);  
  83.      s.setSoTimeout(timeoutValue);  
  84.      s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);  
  85.      LOG.debug("Send buf size " + s.getSendBufferSize());  
  86.        
  87.      long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length + datanodeWriteTimeout;  
  88.   
  89.      LOG.debug("create a write stream to a datanode["+target+"]");  
  90.       //与第一个存储节点建立网络写入流  
  91.      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE));  
  92.        
  93.      LOG.debug("create a read stream from a datanode["+target+"]");  
  94.       //与第一个存储节点建立网络读取流  
  95.      blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));  
  96.   
  97.      out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);            //写入数据传输协议版本号  
  98.      out.write(DataTransferProtocol.OP_WRITE_BLOCK);                            //写入数据传输类型  
  99.      out.writeLong(block.getBlockId());                                     //写入数据块id  
  100.      out.writeLong(block.getGenerationStamp());                             //写入数据块时间戳  
  101.      out.writeInt(nodes.length);                                                //写入数据块的副本数  
  102.      out.writeBoolean(recoveryFlag);                                        //写入数据块恢复标记  
  103.      Text.writeString(out, client);                                             //写入客户端名称  
  104.      out.writeBoolean(false);                                                   //写入源节点是否是数据节点  
  105.      out.writeInt(nodes.length - 1);                                            //写入复制流水线中剩余的存储节点数量  
  106.      for (int i = 1; i < nodes.length; i++) {                                    //写入剩余存储节点信息  
  107.        nodes[i].write(out);  
  108.      }  
  109.      checksum.writeHeader(out);                                                 //写入数据校验器信息  
  110.      out.flush();  
  111.   
  112.       //从接收端读取确认恢复包  
  113.      firstBadLink = Text.readString(blockReplyStream);  
  114.      if (firstBadLink.length() != 0) {  
  115.        throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);  
  116.      }  
  117.   
  118.      blockStream = out;  
  119.      return true;     // success  
  120.   
  121.    } catch (IOException ie) {  
  122.   
  123.      LOG.info("Exception in createBlockOutputStream " + ie);  
  124.   
  125.      // find the datanode that matches  
  126.      if(firstBadLink.length() != 0) {  
  127.        for (int i = 0; i < nodes.length; i++) {  
  128.          if (nodes[i].getName().equals(firstBadLink)) {  
  129.            errorIndex = i;  
  130.            break;  
  131.          }  
  132.        }  
  133.      }  
  134.      hasError = true;  
  135.      setLastException(ie);  
  136.      blockReplyStream = null;  
  137.      return false;  //error  
  138.    }  
  139.  }  
  140.   
  141.  /** 
  142.   * 为文件申请一个新的数据块,放回数据块的存放位置 
  143.   */  
  144.  private LocatedBlock locateFollowingBlock(long start) throws IOException {       
  145.    int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries"5);  
  146.    long sleeptime = 400;  
  147.    while (true) {  
  148.      long localstart = System.currentTimeMillis();  
  149.      while (true) {  
  150.        try {  
  151.          return namenode.addBlock(src, clientName);  
  152.        } catch (RemoteException e) {  
  153.          IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class);  
  154.          if (ue != e) {   
  155.            throw ue; // no need to retry these exceptions  
  156.          }  
  157.            
  158.          if(NotReplicatedYetException.class.getName().equals(e.getClassName())) {  
  159.   
  160.              if (retries == 0) {   
  161.                throw e;  
  162.              } else {  
  163.                --retries;  
  164.                LOG.info(StringUtils.stringifyException(e));  
  165.                if (System.currentTimeMillis() - localstart > 5000) {  
  166.                  LOG.info("Waiting for replication for " + (System.currentTimeMillis() - localstart) / 1000 + " seconds");  
  167.                }  
  168.                try {  
  169.                  LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries);  
  170.                  Thread.sleep(sleeptime);  
  171.                  sleeptime *= 2;  
  172.                } catch (InterruptedException ie) {  
  173.                }  
  174.              }  
  175.          } else {  
  176.            throw e;  
  177.          }  
  178.        }  
  179.      }  
  180.    }//while   
  181.  }  

5.确认包处理器

   对于客户端向存储节点发送的每一个数据包,客户端都要确认每一个数据包是否被所有的存储节点所真确接受了,如果有一个存储节点没有真确接受,则客户端就需要立即回复当前数据块。

[java] view plaincopy
  1. private class ResponseProcessor extends Thread {  
  2.   
  3.       private volatile boolean closed = false;              // 响应处理器是否被关闭  
  4.       private DatanodeInfo[] targets = null;                    // 当前数据块副本的存储位置  
  5.       private boolean lastPacketInBlock = false;                // 当前处理其响应的数据包是否是数据块中最后一个数据包  
  6.   
  7.       ResponseProcessor (DatanodeInfo[] targets) {  
  8.         this.targets = targets;  
  9.       }  
  10.   
  11.       public void run() {  
  12.   
  13.         this.setName("ResponseProcessor[" + block + "]");  
  14.           
  15.         PipelineAck ack = new PipelineAck();  
  16.     
  17.         while (!closed && clientRunning && !lastPacketInBlock) {  
  18.           // process responses from datanodes.  
  19.           try {  
  20.             // read an ack from the pipeline  
  21.               LOG.debug("Start to read a ack for a data packet from datanode["+s.getInetAddress()+"]");  
  22.             ack.readFields(blockReplyStream, targets.length);  
  23.               
  24.             LOG.debug("Receive an Ack[" + ack + "]");  
  25.   
  26.             long seqno = ack.getSeqno();  
  27.             if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {  
  28.               continue;  
  29.             } else if (seqno == -2) {  
  30.               // This signifies that some pipeline node failed to read downstream  
  31.               // and therefore has no idea what sequence number the message corresponds  
  32.               // to. So, we don't try to match it up with an ack.  
  33.               assert ! ack.isSuccess();  
  34.             } else {  
  35.               Packet one = null;  
  36.               synchronized (ackQueue) {  
  37.                 one = ackQueue.getFirst();  
  38.               }  
  39.               if (one.seqno != seqno) {  
  40.                 throw new IOException("Responseprocessor: Expecting seqno for block " + block + one.seqno + " but received " + seqno);  
  41.               }  
  42.               lastPacketInBlock = one.lastPacketInBlock;  
  43.             }  
  44.   
  45.               //处理来自所有副本存放节点的响应,只要有一个出错,就抛出异常  
  46.             for (int i = 0; i < targets.length && clientRunning; i++) {  
  47.               short reply = ack.getReply(i);  
  48.               if(reply != DataTransferProtocol.OP_STATUS_SUCCESS) {  
  49.                 errorIndex = i; // first bad datanode  
  50.                 throw new IOException("Bad response " + reply + " for block " + block + " from datanode " +  targets[i].getName());  
  51.               }  
  52.             }  
  53.   
  54.             synchronized (ackQueue) {  
  55.               ackQueue.removeFirst();  
  56.               ackQueue.notifyAll();  
  57.             }  
  58.               
  59.           } catch (Exception e) {  
  60.             if (!closed) {  
  61.               hasError = true;  
  62.               if (e instanceof IOException) {  
  63.                 setLastException((IOException)e);  
  64.               }  
  65.               LOG.warn("DFSOutputStream ResponseProcessor exception for block " + block + StringUtils.stringifyException(e));  
  66.               closed = true;  
  67.             }  
  68.           }  
  69.   
  70.           synchronized (dataQueue) {  
  71.             dataQueue.notifyAll();  
  72.           }  
  73.           synchronized (ackQueue) {  
  74.             ackQueue.notifyAll();  
  75.           }  
  76.         }  
  77.                 
  78.       }  
  79.   
  80.       void close() {  
  81.         closed = true;  
  82.         this.interrupt();  
  83.       }  
  84.     }  

    对于客户端向数据节点传送过程中,难免会发生错误,这些错误包括,客户端向第一个数据节点写数据时发生网络错误,数据节点向数据节点写数据时发生错误,从数据节点获取packet的确认信息是发生错误等,它们都统一交给DFSOutputSream中的函数processDatanodeError来处理的。

[java] view plaincopy
  1. /* 
  2.  * 处理在传输当前数据块的过程中发生的错误 
  3.  */  
  4. private boolean processDatanodeError(boolean hasError, boolean isAppend) {  
  5.   if(!hasError) {  
  6.     return false;  
  7.   }  
  8.     
  9.   if (response != null) {  
  10.     LOG.info("Error Recovery for block " + block + " waiting for responder to exit. ");  
  11.     return true;  
  12.   }  
  13.     
  14.   if (errorIndex >= 0) {  
  15.     LOG.warn("Error Recovery for block " + block + " bad datanode[" + errorIndex + "] " + (nodes == null"nodes == null": nodes[errorIndex].getName()));  
  16.   }  
  17.   
  18.   if (blockStream != null) {  
  19.     try {  
  20.         LOG.debug("start to close wrtiter stream to DataNode["+s.getInetAddress()+"].");  
  21.       blockStream.close();  
  22.         
  23.       LOG.debug("start to close reader stream from DataNode["+s.getInetAddress()+"]");  
  24.       blockReplyStream.close();  
  25.     } catch (IOException e) {  
  26.     }  
  27.   }  
  28.   blockStream = null;  
  29.   blockReplyStream = null;  
  30.   
  31.   // move packets from ack queue to front of the data queue  
  32.   synchronized (ackQueue) {  
  33.     dataQueue.addAll(0, ackQueue);  
  34.     ackQueue.clear();  
  35.   }  
  36.   
  37.   boolean success = false;  
  38.   while (!success && clientRunning) {  
  39.     DatanodeInfo[] newnodes = null;  
  40.     if (nodes == null) {  
  41.       String msg = "Could not get block locations. " + "Source file \"" + src + "\" - Aborting...";  
  42.       LOG.warn(msg);  
  43.       setLastException(new IOException(msg));  
  44.       closed = true;  
  45.       if (streamer != null) streamer.close();  
  46.       return false;  
  47.     }  
  48.       
  49.     StringBuilder pipelineMsg = new StringBuilder();  
  50.     for (int j = 0; j < nodes.length; j++) {  
  51.       pipelineMsg.append(nodes[j].getName());  
  52.       if (j < nodes.length - 1) {  
  53.         pipelineMsg.append(", ");  
  54.       }  
  55.     }  
  56.       
  57.      //从当前数据块的存储节点集合中删除出错的数据节点  
  58.     if (errorIndex < 0) {  
  59.       newnodes = nodes;  
  60.     } else {  
  61.       if (nodes.length <= 1) {  
  62.         lastException = new IOException("All datanodes " + pipelineMsg + " are bad. Aborting...");  
  63.         closed = true;  
  64.         if (streamer != null) streamer.close();  
  65.         return false;  
  66.       }  
  67.       LOG.warn("Error Recovery for block " + block + " in pipeline " + pipelineMsg + ": bad datanode " + nodes[errorIndex].getName());  
  68.       newnodes =  new DatanodeInfo[nodes.length-1];  
  69.       System.arraycopy(nodes, 0, newnodes, 0, errorIndex);  
  70.       System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, newnodes.length-errorIndex);  
  71.     }  
  72.   
  73.     // Tell the primary datanode to do error recovery by stamping appropriate generation stamps.  
  74.     LocatedBlock newBlock = null;  
  75.     ClientDatanodeProtocol primary =  null;  
  76.     DatanodeInfo primaryNode = null;  
  77.     try {  
  78.         //从当前有效的存储节点中选取一个节点作为数据块恢复的主节点  
  79.       primaryNode = Collections.min(Arrays.asList(newnodes));  
  80.       primary = createClientDatanodeProtocolProxy(primaryNode, conf);  
  81.        //过RPC让主存储节点恢复数据块  
  82.       newBlock = primary.recoverBlock(block, isAppend, newnodes);  
  83.     } catch (IOException e) {  
  84.       recoveryErrorCount++;  
  85.       if (recoveryErrorCount > maxRecoveryErrorCount) {  
  86.         if (nodes.length > 1) {  
  87.           // if the primary datanode failed, remove it from the list.  
  88.           // The original bad datanode is left in the list because it is  
  89.           // conservative to remove only one datanode in one iteration.  
  90.           for (int j = 0; j < nodes.length; j++) {  
  91.             if (nodes[j].equals(primaryNode)) {  
  92.               errorIndex = j; // forget original bad node.  
  93.             }  
  94.           }  
  95.             //主存储节点恢复数据块失败多次,从存储节点集合中删除  
  96.           newnodes =  new DatanodeInfo[nodes.length-1];  
  97.           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);  
  98.           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, newnodes.length-errorIndex);  
  99.           nodes = newnodes;  
  100.             
  101.           LOG.warn("Error Recovery for block " + block + " failed " + " because recovery from primary datanode " + primaryNode + " failed " + recoveryErrorCount + " times. Pipeline was " + pipelineMsg + ". Marking primary datanode as bad.");  
  102.           recoveryErrorCount = 0;   
  103.           errorIndex = -1;  
  104.           return true;          // sleep when we return from here  
  105.         }  
  106.         String emsg = "Error Recovery for block " + block + " failed because recovery from primary datanode " + primaryNode + " failed " + recoveryErrorCount + " times. Pipeline was " + pipelineMsg + ". Aborting...";  
  107.         LOG.warn(emsg);  
  108.         lastException = new IOException(emsg);  
  109.           //已无可用的存储节点来恢复当前数据块,所以抛出异常终止写入  
  110.         closed = true;  
  111.         if (streamer != null) streamer.close();  
  112.         return false;       // abort with IOexception  
  113.       }  
  114.         
  115.       LOG.warn("Error Recovery for block " + block + " failed because recovery from primary datanode " + primaryNode + " failed " + recoveryErrorCount + " times. "  + " Pipeline was " + pipelineMsg + ". Will retry...");  
  116.       return true;          // sleep when we return from here  
  117.     } finally {  
  118.       RPC.stopProxy(primary);  
  119.     }  
  120.       
  121.     recoveryErrorCount = 0;  
  122.     if (newBlock != null) {  
  123.       block = newBlock.getBlock();  
  124.       nodes = newBlock.getLocations();  
  125.     }  
  126.   
  127.      //当前数据块恢复成功,获取该数据块新的存储位置  
  128.     this.hasError = false;  
  129.     lastException = null;  
  130.     errorIndex = 0;  
  131.       
  132.     success = createBlockOutputStream(nodes, clientName, true);  
  133.   }//while  
  134.   
  135.   response = new ResponseProcessor(nodes);  
  136.   response.start();  
  137.     
  138.   return false// do not sleep, continue processing  
  139. }  
    上面的代码可能有些复杂,我还是简单的描述一下关于processDatanodeError函数处理I/O过程中的错误:当有错误发生时。肯定是某一个数据节点发生了问题,那么首先会把这个有问题的数据节点删除掉。然后从剩余的可用的数据节点中选取一个,让它来恢复当前的这个Block,如果成功了ok,而失败,则删除再删除这个出问题的节点,则继续选节点来恢复Block,直到成功,如果最后没有可用的数据节点来恢复Block,则宣告这个Block写入失败,将关闭DFSOutputSream流,当用户再次写入时抛出异常。
   从HDFS对OutputStream的实现来看,还存在一定的缺陷,它将数据流切包发送以降低风险及异常恢复的时间,这种设计本身是极好的,但对于每一次的封包操作,它在实现上都是重新new一个Packet对象来存储的,这样带来的影响就是可能会频繁的触发Minor GC,如果你Young Gen中的Eden区大小配置小于一个数据块的长度的话,很有可能触发Full GC;此外,对于每一个数据块都会开启一个数据包确认线程,如果是写大文件的话,也会频繁的创建线程而造成不小的资源开销。所以,如果能够复用Packet对象和 数据包确认线程的话,将有可能提高客户端写的性能。