HBase 客户端读取实现

时间:2022-12-14 08:33:26
HBase 以客户端角色来调用服务器端的RPC实现 
HBase 以客户端角色来调用服务器端的RPC实现 
1. HBase的客户端RPC实际上是在Hadoop客户端RPC做的修改,具体的类为: 
   HBase org.apache.hadoop.hbase.ipc.HBaseRPC 
   Hadoop org.apache.hadoop.ipc.RPC 
   
2. 大致流程 
   2.1 客户端(这里可以是HBase的HTable调用HMaster或者HRegionServer,HRegionServer调用HMaster,DFSClient 调用NameNode)在创建或者初始化时,通过java的Proxy.newProxyInstance(..)方法创建一些接口的代理.例如HTable 通过HConnectionManager创建HMasterInterface的代理. 
   2.2 代理的实现是通过创建Socket连接到服务器,将调用的方法标识及调用的方法的参数传递给服务器,服务器处理后返回结果,发送请求时参数要写入到DataOutput中,对于服务器的响应创建实例时,要用DataInput中读取数据,这就是为什么接口的参数大部分都实现了org.apache.hadoop.io.Writable接口,对于List,byte[]这些参数类型,是通过org.apache.hadoop.hbase.io.HbaseObjectWritable进行了封装. 
   
3. 以HTable 通过HMasterInterface与HMaster通信为例. 
   3.1 HTable-->HConnectionManager.getConnection(conf)取得了org.apache.hadoop.hbase.client.HConnection接口的实例,实际上是由org.apache.hadoop.hbase.client.HConnectionManager.TableServers来实现的.HConnection接口定义了public HMasterInterface getMaster()的方法,来看下TableServers是怎么实现的. 
   3.2 
            masterLocation = zk.readMasterAddressOrThrow();//这里是通过ZooKeeper来取得HMaster注册的ip地址和端口 

            HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( 
                HMasterInterface.class, HBaseRPCProtocolVersion.versionID, 
                masterLocation.getInetSocketAddress(), this.conf);//这里由HBaseRPC来生成代理 
   3.3 HBaseRPC 最后调用的方法是 
      public static VersionedProtocol getProxy(Class<?> protocol, 
            long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, 
            Configuration conf, SocketFactory factory) 
          ... 
          VersionedProtocol proxy = 
              (VersionedProtocol) Proxy.newProxyInstance( 
                  protocol.getClassLoader(), new Class[] { protocol }, 
                  new Invoker(addr, ticket, conf, factory)); 
          //这里的factory是由conf中的参数"hadoop.rpc.socket.factory.class.default"指定的,默认(core-default.xml)是org.apache.hadoop.net.StandardSocketFactory,创建socket时是SocketChannel.open().socket()创建的NIO的socket. 
          //由org.apache.hadoop.hbase.ipc.HBaseRPC.Invoker实现的InvocationHandler接口 
   3.4 Invoker.invoke(Object proxy, Method method, Object[] args)方法中的实现为 
        HbaseObjectWritable value = (HbaseObjectWritable) 
            client.call(new Invocation(method, args), address, ticket);       
        //HBaseRPC.Invocation封装了Method的调用的参数,可以看readFields(DataInput in)和write(DataOutput out)的实现看是怎么样把参数写入读出的. 
        //client为Invoker创建时由HBaseRPC.ClientCache.getClient(conf, factory)创建.实现类为org.apache.hadoop.hbase.ipc.HBaseClient,每个SocketFactory对应一个HBaseClient对象. 
   3.5 真正的调用在HBaseClient中的call方法(有些异常的处理忽略了) 
      public Writable call(Writable param, InetSocketAddress addr, 
                           UserGroupInformation ticket) 
                           throws IOException { 
        Call call = new Call(param);//创建调用的对象,call中的方法都是synchronized,因为call会在不同的线程中调用 
        Connection connection = getConnection(addr, ticket, call);//取与addr的连接,这个连接可能是已经创建好的,也可能是新创建的.Connection是一个extends Thread线程 
        connection.sendParam(call);                 // send the parameter 
        synchronized (call) {//要先获得call的锁 
          while (!call.done) {//如果call调用没有完成,就一直循环下去 
            try { 
              call.wait();                           // wait for the result,当前的调用线程挂起 
            } catch (InterruptedException ignored) {..} 
          } 
          ... 
          return call.value;//服务器成功返回信息. 
        } 
      }       
   3.6 getConnection操作 
    private Connection getConnection(InetSocketAddress addr, 
                                     UserGroupInformation ticket, 
                                     Call call) 
                                     throws IOException { 
      Connection connection; 
      /* we could avoid this allocation for each RPC by having a 
       * connectionsId object and with set() method. We need to manage the 
       * refs for keys in HashMap properly. For now its ok. 
       */ 
      ConnectionId remoteId = new ConnectionId(addr, ticket);//通过addr及ticket生成一个key. 
      do { 
        synchronized (connections) {//连接池 
          connection = connections.get(remoteId); 
          if (connection == null) { 
            connection = new Connection(remoteId);//创建新的连接 
            connections.put(remoteId, connection); 
          } 
        } 
      } while (!connection.addCall(call));//如果call调用不能加入到connetion中,就一直循环,这里的加入操作只是加入到connection的calls(Hashtable<Integer, Call>)中,然后调用notify()方法唤醒因为没有 
  
      //we don't invoke the method below inside "synchronized (connections)" 
      //block above. The reason for that is if the server happens to be slow, 
      //it will take longer to establish a connection and that will slow the 
      //entire system down. 
      connection.setupIOstreams();//这里是初始化的操作 
      return connection; 
    }   
   3.7 org.apache.hadoop.hbase.ipc.HBaseClient.Connection.setupIOstreams()方法 
    protected synchronized void setupIOstreams() throws IOException { 
      if (socket != null || shouldCloseConnection.get()) {//如果socket不为空说明连接已经建立过了 
        return; 
      } 

      short ioFailures = 0; 
      short timeoutFailures = 0; 
      try { 
        while (true) {//这里一真循环真到连接建立起,如果网络不好,应该会等较长时间 
          try { 
            this.socket = socketFactory.createSocket();//这里从创建新的socket然后设置socket的连接属性 
            this.socket.setTcpNoDelay(tcpNoDelay);//这些属性都是从conf文件中初始化的,具体的作用不清楚 
            this.socket.setKeepAlive(tcpKeepAlive); 
            // connection time out is 20s 
            NetUtils.connect(this.socket, remoteId.getAddress(), 20000);//这里设置每个socket的connect返回时间间隔最大不超过20秒, 
            //如果超出20秒,方法会抛出SocketTimeoutException, 
            this.socket.setSoTimeout(pingInterval); 
            break; 
          } catch (SocketTimeoutException toe) { 
            handleConnectionFailure(timeoutFailures++, maxRetries, toe); 
            //这里每次超时都会累加超时次数,直到达到maxRetries=conf.getInt("hbase.ipc.client.connect.max.retries", 0),重新抛出异常 
          } catch (IOException ie) { 
            handleConnectionFailure(ioFailures++, maxRetries, ie); 
          } 
        } 
        this.in = new DataInputStream(new BufferedInputStream 
            (new PingInputStream(NetUtils.getInputStream(socket))));//读入流 
        this.out = new DataOutputStream 
            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));//写入流 
        writeHeader();//头信息 

        // start the receiver thread after the socket connection has been set up 
        start();//当前线程开启 
      } catch (IOException e) { 
        .. 
      } 
    }     
   3.8 Connetion的run方法 
    public void run() { 
      try { 
        while (waitForWork()) {//wait here for work - read or close connection,如果还有提交的call没有回复,或者闲置的时间没有超出限制,那么就一直循环处理. 
          receiveResponse(); 
        } 
      } catch (Throwable t) { 
       .. 
      } 
      close();//从connection连接池中删除自己,关闭自己的in,out流. 
    }   
    3.9 判断是否退出的条件 
    private synchronized boolean waitForWork() { 
      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {//如果calls池中没有还没有返回的提交信息. 
        long timeout = maxIdleTime- 
              (System.currentTimeMillis()-lastActivity.get()); 
        if (timeout>0) {//如果闲置时间还没到,那么挂起 
          try { 
            wait(timeout); 
          } catch (InterruptedException ignored) {} 
        } 
      } 

      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {//如果提交信息还有没有返回的. 
        return true; 
      } else if (shouldCloseConnection.get()) { 
        return false; 
      } else if (calls.isEmpty()) { // idle connection closed or stopped 所有的提交信息都返回了 
        markClosed(null); 
        return false; 
      } else { // get stopped but there are still pending requests 
        markClosed((IOException)new IOException().initCause( 
            new InterruptedException())); 
        return false; 
      } 
    } 
    3.10 处理服务器的回复信息 
    private void receiveResponse() {  //因为这个方法是在run中顺序执行的,所以不用对in加锁. 
      ... 
      try { 
        int id = in.readInt();                    // try to read an id 每个call的标识 
        Call call = calls.get(id); 
        
        boolean isError = in.readBoolean();     // read if error 调用成功或者失败的标识,这里是客户端和服务器端约定好的 
        if (isError) { 
          //noinspection ThrowableInstanceNeverThrown 
          call.setException(new RemoteException( WritableUtils.readString(in), 
              WritableUtils.readString(in))); 
          calls.remove(id);//都要从calls队列中去掉call. 
        } else { 
          Writable value = ReflectionUtils.newInstance(valueClass, conf); 
          value.readFields(in);                 // read value,这里从流中创建返回的对象 
          call.setValue(value);//call的setValue会调用notify()方法,这样就唤醒了客户端调用call(..)方法挂起的线程. 
          calls.remove(id); 
        } 
      } catch (IOException e) { 
        markClosed(e); 
      } 
    }  
   3.11 call方法中与入call对象调用的Connetion sendParam(call)方法 
    protected void sendParam(Call call) { 
      .. 
      DataOutputBuffer d=null; 
      try { 
        //noinspection SynchronizeOnNonFinalField 
        synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC 对于out的操作是排它的如果写入的数据量非常大那么会不会导致长时间的等待,阻塞其它的线程. 

          //for serializing the 
          //data to be written 
          d = new DataOutputBuffer(); 
          d.writeInt(call.id);//先写入call的id标识,以便从服务器的返回信息中读回id 
          call.param.write(d); 
          byte[] data = d.getData(); 
          int dataLength = d.getLength(); 
          out.writeInt(dataLength);      //first put the data length 
          out.write(data, 0, dataLength);//write the data 
          out.flush();//这里调用了flush操作.确保了out的顺序写入 
        } 
      } catch(IOException e) { 
        markClosed(e); 
      } finally { 
        //the buffer is just an in-memory buffer, but it is still polite to 
        // close early 
        IOUtils.closeStream(d); 
      } 
    } 
  4. 总结 
    4.1 HBaseClient 与 SocketFactory实例是一一对应的.默认实现StandardSocketFactory的equals只要类型一致,那么就是相同的,也就是说一个jvm中只会有一个HBaseClient实例. 
    4.2 HBaseClient中创建的Connection 与address和用户的ticket(这里不清楚)一一对应(org.apache.hadoop.hbase.ipc.HBaseClient.ConnectionId),每个Connection只会对应一个 Socket,每个Connection在最大闲置时间内可以接受多个Call对象. 
    4.3 Connection中的in与out是由NetUtils.getInputStream(socket)和NetUtils.getOutputStream(socket)来封装的,使用了Selector.select()方法,
    4.4 每个调用RPC的线程都会等待call的结果,输出到out是线程安全的, 
 
    4.5 对HBaseClient的调用会是一个阻塞的过程. 
    4.6 Connection 创建时会初始化socket,socket的创建由SocketFactory创建,默认的socket它是blocking状态的,在NetUtils.connect(this.socket, remoteId.getAddress(), 20000);连接到服务器端时, 底层先设置SocketChannel为no-blocking,然后由Seletor来设置超时时间,如果超时时间到,connect没有返回,则抛出SocketTimeoutException异常. 
    4.7 socket connect返回后,会设置this.socket.setSoTimeout(pingInterval);值,这样read操作会最多阻塞pingInterval时间,如果超时,会抛出SocketTimeoutException 异常. 
    4.8 socket的read和write操作都是由NetUtils来提供的,底层都是由Seletor来支持针对时间的等待. 
    4.9 对于read的超时,通过PingInputStream的包装,每次都会发送一个Ping操作,sendPing(); 
    4.10 每个Connetion创建之初都会writeHeader().对应的HBaseServer创建的Connection都会读header. 
    4.11 每个Connetion 只要calls(Hashtable<Integer, Call>)中有发送的call,除了发生异常,是不会被关闭的,除非HMasterServer关闭请求的连接. 
  5. 参考链接 
    5.1 Hadoop源代码分析(七) http://caibinbupt.iteye.com/blog/281281  
    5.2 Improve the Scalability and Robustness of IPC https://issues.apache.org/jira/browse/HADOOP-2864 
    5.3 HBase源码阅读-4-HMaster与HRegionServer的RPC http://run-xiao.iteye.com/blog/756455