–hbase 删除表
HBaseAdmin admin = new HBaseAdmin(conf);
可以查看源码,其实低层也是调用创建 HConnectionImplementation 对象进行连接管理的
admin.disableTable(tableName);
然后失效一个表,
里面类似于delete操作一下,创建一个回调临时对象
DisableTableResponse response = executeCallable(
new MasterCallable<DisableTableResponse>(getConnection()) {
@Override
public DisableTableResponse call(int callTimeout) throws ServiceException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
LOG.info("Started disable of " + tableName);
DisableTableRequest req =
RequestConverter.buildDisableTableRequest(
tableName, ng.getNonceGroup(), ng.newNonce());
return master.disableTable(controller, req);
}
});
在MasterCallable 中的 prepare 方法中
this.master = this.connection.getKeepAliveMasterService();
拿到hmaster的地址 ,创建如下对象,建立到hmaster的查找过程
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
try {
this.masterServiceState.stub = stubMaker.makeStub();
在makeStub方法中调用makeStubNoRetries() 方法
创建连接到 zk的对象
keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
然后通过 下面方法进行连接到zk当中
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
然后去请请求hmaster的地址
ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
在hmaster拿到hmaster的地址
data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode())
;
最好就可以创建rpc的channel对象了
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = makeStub(channel);
其实是创建了 BlockingRpcChannelImplementation,对象,里面又调用了 rpcClient对象
然后每次创建 连接时,都会进行了 isMasterRunning(); 方法的调用进行测试master是否在运行当中
在hmaster的rpc服务中 MasterRpcServices.isMasterRunning 就可以接到请求的方法调用。
现在回到最开始的业务,就是 发送 DisableTableRequest 请求到hmaster中。
接着
admin.deleteTable(tableName);
过程和上面的过程一样,只是现在请求的对象是DeleteTableRequest 而已
private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException {
DeleteTableResponse response = executeCallable(
new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
public DeleteTableResponse call(int callTimeout) throws ServiceException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(tableName);
DeleteTableRequest req =
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
return master.deleteTable(controller,req);
}
});
return new DeleteTableFuture(this, tableName, response);
}
其中 deleteTable是 client 和hmaster的rpc接口方法
大家可能也注意到,像上面也用到了 rpcClient
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
这个类是对服务端调用的公共rpc类。
所以在之前曾经说到的在发送RpcClientImpl.call之时,会建立到远程的连接
final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
pcrc.getCallTimeout());
final Connection connection = getConnection(ticket, call, addr);
在这里创建连接,在连接里会创建 ConnectionHeader header 类,在发起连接建立时,会上传客户端的用户信息的
connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());