Apache Pulsar源码解析之Lookup机制-客户端实现原理

时间:2024-04-05 22:16:27

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                        ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
				....
        //这里进去就是创建跟Broker的网络连接
        grabCnx();
    }

    void grabCnx() {
      	//实际上是调用ConnectionHandler进行的
        this.connectionHandler.grabCnx();
    }

		protected void grabCnx(Optional<URI> hostURI) {
  		....
      //这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法
      cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
  		....
    }


    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
      	//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

    public LookupService getLookup(String serviceUrl) {
        return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
            try {
              	//忽略其他的,直接跟这里进去
                return createLookup(serviceUrl);
            } catch (PulsarClientException e) {
                log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
                throw new IllegalStateException("Failed to update url " + url);
            }
        });
    }

    public LookupService createLookup(String url) throws PulsarClientException {
      	//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询
        if (url.startsWith("http")) {
            return new HttpLookupService(conf, eventLoopGroup);
        } else {
            return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }
    }

		
    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
            throws PulsarClientException {
      	//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数
        this.httpClient = new HttpClient(conf, eventLoopGroup);
        this.useTls = conf.isUseTls();
        this.listenerName = conf.getListenerName();
    }

    protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
			....
      //可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包
      httpClient = new DefaultAsyncHttpClient(config);
			....
    }

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

   public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
        //判断访问哪个版本的接口
        String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
        String path = basePath + topicName.getLookupName();
        path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
        //获取要访问的Broker地址
        return httpClient.get(path, LookupData.class)
                .thenCompose(lookupData -> {
            URI uri = null;
            try {
              	//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口
                InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                //HTTP通过Lookup方式访问服务端绝对不会走代理
                return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
                        false /* HTTP lookups never use the proxy */));
            } catch (Exception e) {
							....
            }
        });
    }

public class LookupTopicResult {
  	//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口
    private final InetSocketAddress logicalAddress;
    private final InetSocketAddress physicalAddress;
    private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节