Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
....
//这里进去就是创建跟Broker的网络连接
grabCnx();
}
void grabCnx() {
//实际上是调用ConnectionHandler进行的
this.connectionHandler.grabCnx();
}
protected void grabCnx(Optional<URI> hostURI) {
....
//这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法
cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
....
}
public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释
return getLookup(url).getBroker(topicName)
.thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
}
public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
try {
//忽略其他的,直接跟这里进去
return createLookup(serviceUrl);
} catch (PulsarClientException e) {
log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
throw new IllegalStateException("Failed to update url " + url);
}
});
}
public LookupService createLookup(String url) throws PulsarClientException {
//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询
if (url.startsWith("http")) {
return new HttpLookupService(conf, eventLoopGroup);
} else {
return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
}
public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
}
protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
....
//可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包
httpClient = new DefaultAsyncHttpClient(config);
....
}
通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息
public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
return getLookup(url).getBroker(topicName)
.thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
}
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
//判断访问哪个版本的接口
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
//获取要访问的Broker地址
return httpClient.get(path, LookupData.class)
.thenCompose(lookupData -> {
URI uri = null;
try {
//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口
InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
//HTTP通过Lookup方式访问服务端绝对不会走代理
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
false /* HTTP lookups never use the proxy */));
} catch (Exception e) {
....
}
});
}
public class LookupTopicResult {
//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口
private final InetSocketAddress logicalAddress;
private final InetSocketAddress physicalAddress;
private final boolean isUseProxy;
}
客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节