SOFA 源码分析 — 连接管理器

时间:2022-05-21 09:24:27

SOFA 源码分析 — 连接管理器

前言

RPC 框架需要维护客户端和服务端的连接,通常是一个客户端对应多个服务端,而客户端看到的是接口,并不是服务端的地址,服务端地址对于客户端来讲是透明的。

那么,如何实现这样一个 RPC 框架的网络连接呢?

我们从 SOFA 中寻找答案。

连接管理器介绍

先从一个小 demo 开始看:

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setProtocol("bolt") // 指定协议
    .setDirectUrl("bolt://127.0.0.1:9696"); // 指定直连地址

HelloService helloService = consumerConfig.refer();

while (true) {
  System.out.println(helloService.sayHello("world"));
  try {
    Thread.sleep(2000);
  } catch (Exception e) {
  }
}

上面的代码中,一个 ConsumerConfig 对应一个接口服务,并指定了直连地址。

然后调用 ref 方法。每个 ConsumerConfig 绑定了一个 ConsumerBootstrap,这是一个非单例的类。

而每个 ConsumerBootstrap 又绑定了一个 Cluster,这是真正的客户端。该类包含了一个客户端所有的关键信息,例如:

  1. Router 路由链
  2. loadBalance 负载均衡
  3. addressHolder 地址管理器
  4. connectionHolder 连接管理器
  5. filterChain 过滤器链

这 5 个实例是 Cluster 的核心。一个客户端的正常使用绝对离不开这 5 个元素。

我们之前分析了 5 个中的 4 个,今天分析最后一个 —— 连接管理器。

他可以说是 RPC 网络通信的核心。

地址管理器代表的是:一个客户端可以拥有多个接口。
连接管理器代表的是:一个客户端可以拥有多个 TCP 连接。

很明显,地址管理器的数据肯定比连接管理器要多。因为通常一个 TCP 连接(Server 端)可以含有多个接口。

那么 SOFA 是如何实现连接管理器的呢?

从 AbstractCluster 的 init 方法中,我们知道,该方法初始化了 Cluster。同时也初始化了 connectionHolder。

具体代码如下:

// 连接管理器
connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);

使用了 SPI 的方式进行的初始化。目前 RPC 框架的具体实现类只有一个 AllConnectConnectionHolder。即长连接管理器。

该类需要一个 ConsumerConfig 才能初始化。

该类中包含很多和连接相关的属性,有 4 个 Map,未初始化的 Map,存活的节点列表,存活但亚健康的列表,失败待重试的列表。这些 Map 的元素都会随着服务的网络变化而变化。

而这些 Map 中的元素则是:ConcurrentHashMap ,>

即每个服务者的信息对应一个客户端传输。那么这个 ClientTransport 是什么呢?看过之前文章的都知道,这个一个 RPC 和 Bolt 的胶水类。该类的默认实现 BoltClientTransport 包含了一个 RpcClient 属性,注意,该属性是个静态的。也就是说,是所有实例公用的。并且,BoltClientTransport 包含一个 ProviderInfo 属性。还有一个 Url 属性,Connection 属性(网络连接)。

我们理一下:一个 ConsumerConfig 绑定一个 Cluster,一个 Cluster 绑定一个 connectionHolder,一个 connectionHolder 绑定多个 ProviderInfo 和 ClientTransport。

因为一个客户端可以和多个服务进行通信。

代码如何实现?

在 Cluster 中,会对 connectionHolder 进行初始化,在 Cluster 从注册中心得到服务端列表后,会建立长连接。

从这里开始,地址管理器开始运作。

Cluster 的 updateAllProviders 方法是源头。该方法会将服务列表添加到 connectionHolder 中。即调用 connectionHolder.updateAllProviders(providerGroups) 方法。该方法会全量更新服务端列表。

如果更新的时候,发现有新的服务,便会建立长连接。具体代码如下:

if (!needAdd.isEmpty()) {
    addNode(needAdd);
}

addNode 方法就是添加新的节点。该方法会多线程建立 TCP 连接。

首先会根据 ProviderInfo 信息创建一个 ClientTransport,然后向线程池提交一个任务,任务内容是 initClientTransport(),即初始化客户端传输。

该方法代码如下(精简过了):

private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {
        transport.connect();
        if (doubleCheck(interfaceId, providerInfo, transport)) {
            printSuccess(interfaceId, providerInfo, transport);
            addAlive(providerInfo, transport);
        } else {
            printFailure(interfaceId, providerInfo, transport);
            addRetry(providerInfo, transport);
        }
}

其中关键是调用 transport 的 connect 方法建立连接。

该方法的默认实现在 BoltClientTransport 中,符合我们的预期。我们知道, BoltClientTransport 有一个 RpcClient 的静态实例。这个实例在类加载的时候,就会在静态块中初始化。初始化内容则是初始化他的一些属性,例如地址解析器,连接管理器,连接监控等等。

我们再看 BoltClientTransport 的 connect 方法,该方法主要逻辑是初始化连接。方式则是通过 RpcClient 的 getConnection 方法来获取,具体代码如下:

 connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());

传入一个 URL 和超时时间。 RpcClient 则是调用连接管理器的 getAndCreateIfAbsent 方法获取,同样传入 Url,这个方法的名字很好,根据 URL 获取连接,如果没有,就创建一个。

有必要看看具体代码:

public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
    // get and create a connection pool with initialized connections.
    ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),
        new ConnectionPoolCall(url));
    if (null != pool) {
        return pool.get();
    } else {
        logger.error("[NOTIFYME] bug detected! pool here must not be null!");
        return null;
    }
}

该方法会继续调用自身的 getConnectionPoolAndCreateIfAbsent 方法,传入 URL 的唯一标识,和一个 ConnectionPoolCall 对象(实现了 Callable)。

然后阻塞等待返回连接。

我们看看这个 ConnectionPoolCall 的 call 方法实现。该方法调用了连接管理器的 doCreate 方法。传入了 URL 和一个连接池。然后 call 方法返回连接池。

doCreate 方法中,重点就是 create 方法,传入了一个 url,返回一个 Connection,并放入连接池。默认池中只有一个长连接。

而 create 方法则是调用连接工厂的 createConnection 方法。然后调用 doCreateConnection 方法。该方法内部给了我们明确的答案:调用 Netty 的 Bootstrap 的 connect 方法。

代码如下:

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));

熟悉 Netty 的同学一眼便看出来了。这是一个连接服务端的操作。而这个 BootStrap 的初始化则是在 RpcClient 初始化的时候进行的。注意:BootStrap 是可以共享的。

可以看到, ConnectionPoolCall 的 call 方法就是用来创建 Netty 连接的。回到 getAndCreateIfAbsent 方法里,继续看 getConnectionPoolAndCreateIfAbsent 方法的实现。

该方法内部将 Callable 包装成一个 FutureTask,目的应该是为了以后的异步运行吧,总之,最后还是同步调用了 run 方法。然后调用 get 方法阻塞等待,等待刚刚 call 方法返回的连接池。然后返回。

得到连接池,连接池调用 get 方法,从池中根据策略选取一个连接返回。目前只有一个随机选取的策略。

这个 Connection 连接实例会保存在 BoltClientTransport 中。

在客户端进行调用的时候, RpcClient 会根据 URL 找到对应的连接,然后,获取这个连接对应的 Channel ,向服务端发送数据。具体代码如下:

conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (!f.isSuccess()) {
            conn.removeInvokeFuture(request.getId());
            future.putResponse(commandFactory.createSendFailedResponse(
                conn.getRemoteAddress(), f.cause()));
            logger.error("Invoke send failed, id={}", request.getId(), f.cause());
        }
    }
});

以上,就是 SOFA 的连接的原理和设计。

总结

连接管理器是我们分析 SOFA—RPC Cluster 中的最后一个模块,他管理着一个客户端对应的所有服务网络连接。

connectionHolder 内部包含多个 Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的胶水类,通常一个 Provider 对应一个 ClientTransport。ClientTransport 其实就是一个连接的包装。

ClientTransport 获取连接的方式则是通过 RpcClient 的 连接管理器获取的。该连接管理器内部包含一个连接工厂,会根据 URL 创建连接。创建连接的凡是则是通过 Netty 的 BootStrap 来创建。

当我们使用 Provider 对应的 ClientTransport 中的 RpcClient 发送数据的时候,则会根据 URL 找到对应 Connection,并获取他的 Channel ,向服务端发送数据。

好了,以上就是 SOFA—RPC 连接管理的分析。

篇幅有限,如有错误,还请指正。