hbase客户端源码分析--deletetable

时间:2022-02-04 19:31:28

–hbase 删除表

HBaseAdmin admin = new HBaseAdmin(conf);

可以查看源码,其实低层也是调用创建 HConnectionImplementation 对象进行连接管理的

admin.disableTable(tableName);

然后失效一个表,
里面类似于delete操作一下,创建一个回调临时对象

DisableTableResponse response = executeCallable(
  new MasterCallable<DisableTableResponse>(getConnection()) {
    @Override
    public DisableTableResponse call(int callTimeout) throws ServiceException {
      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
      controller.setCallTimeout(callTimeout);
      controller.setPriority(tableName);

      LOG.info("Started disable of " + tableName);
      DisableTableRequest req =
          RequestConverter.buildDisableTableRequest(
            tableName, ng.getNonceGroup(), ng.newNonce());
      return master.disableTable(controller, req);
    }
  });

在MasterCallable 中的 prepare 方法中

 this.master = this.connection.getKeepAliveMasterService();

拿到hmaster的地址 ,创建如下对象,建立到hmaster的查找过程

 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
  try {
this.masterServiceState.stub = stubMaker.makeStub();

在makeStub方法中调用makeStubNoRetries() 方法
创建连接到 zk的对象

 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);

然后通过 下面方法进行连接到zk当中

this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);

然后去请请求hmaster的地址
ServerName sn = MasterAddressTracker.getMasterAddress(zkw);

在hmaster拿到hmaster的地址

data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode());

最好就可以创建rpc的channel对象了

   BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
  stub = makeStub(channel);

其实是创建了 BlockingRpcChannelImplementation,对象,里面又调用了 rpcClient对象
然后每次创建 连接时,都会进行了 isMasterRunning(); 方法的调用进行测试master是否在运行当中
在hmaster的rpc服务中 MasterRpcServices.isMasterRunning 就可以接到请求的方法调用。

现在回到最开始的业务,就是 发送 DisableTableRequest 请求到hmaster中。
接着

admin.deleteTable(tableName);

过程和上面的过程一样,只是现在请求的对象是DeleteTableRequest 而已

  private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException {
DeleteTableResponse response = executeCallable(
  new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
public DeleteTableResponse call(int callTimeout) throws ServiceException {
  PayloadCarryingRpcController controller = rpcControllerFactory.newController();
  controller.setCallTimeout(callTimeout);
  controller.setPriority(tableName);
  DeleteTableRequest req =
  RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
  return master.deleteTable(controller,req);
}
  });
return new DeleteTableFuture(this, tableName, response);
  }

其中 deleteTable是 client 和hmaster的rpc接口方法

大家可能也注意到,像上面也用到了 rpcClient

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);

这个类是对服务端调用的公共rpc类。
所以在之前曾经说到的在发送RpcClientImpl.call之时,会建立到远程的连接

  final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
pcrc.getCallTimeout());
   final Connection connection = getConnection(ticket, call, addr);

在这里创建连接,在连接里会创建 ConnectionHeader header 类,在发起连接建立时,会上传客户端的用户信息的

connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());