【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

时间:2021-07-13 19:36:25
DubboProtocol.refer() 过程

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

==============================================================代码分析====================================================================
==============================================================代码分析====================================================================
==============================================================代码分析====================================================================



  1. ==============================================================1.Protocol层====================================================================
先看看DubboProtocol.refer(Class<T> serviceType,URL url)方法:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
serviceType 为 服务的interface,方法返回一个已经创建好netty长连接的DubboInvoker实例,连接创建见 getClient(url) 方法

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
initClient(url) 方法创建socket连接(默认只允许nio方式):
1)如果url中设置了 lazy,则只创建连接实例,但是不立即connect,只有在使用时才会进行connect
2)不设置lazy 直接返回已经connect好的实例。

ExchangeClient client;
try {
//设置连接应该是lazy的 LAZY_CONNECT_KEY = lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;



LazyConnectExchangeClient.send 方法中可知道 在首次使用的时候调用initClient()进行connect,下一次调用如果connect已经存在,则直接返回
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
2. ==============================================================2.Exchanges层===================================================================

接下来看看Exchangers 方法 -> ExchangeClient connect(URL url, ExchangeHandler handler) :
1)首先会根据 url中 key= exchanger 关键字获取值,没有则使用默认值 header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); //EXCHANGER_KEY=exchanger ; DEFAULT_EXCHANGER = header
2)依据type 使用 ExtensionLoader获取 Extension:
ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
最终获取到 Exchanger实现类 HeaderExchanger;因此实际执行的是 HeaderExchanger.connect(URL url, ExchangeHandler handler):
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

HeaderExchanger.connect 方法返回一个 HeaderExchangeClient实例,
创建出来的ExchangeClient是HeaderExchangeClient,它也是Client的包装类,仅仅在Client外层加上心跳检测的功能,向它所连接的服务器端发送心跳检测。
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

==============================================================3. Transporter s层===================================================================

HeaderExchangeClient需要外界给它传一个Client实现,这是由Transporter接口实现来定的,默认是NettyTransporter

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
创建出来的的Client实现是NettyClient。
同时DubboProtocol的ChannelHandler实现经过层层装饰器包装,最终传给底层通信Client。

NettyClient 会对DubboProtocol传递进来的Handler进一步封装 :
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
设置线程池类型:默认设置 cachepool
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
调用ChannelHandlers进行进一步封装:ChannelHandlers.wrap ->ChannelHandlers.getInstance().wrapInternal(handler, url)
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程


==============================================================4. Dispatcher 层===================================================================

通过 Dispatcher进一步封装,HeartbeatHandler等;
Dispatcher 使用的Adaptive类动态代理执行后 最终使用默认的 AllDispatcher.dispatch(handler,url) 类处理,将顶层 Protocol的Handler交由 AllChannelHandler
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
AllChannelHandler继承 WrappedChannelHandler:
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
设置 ThreadPool :url中设置threadpool=cached,因此使用的是 CachedThreadPool 类创建线程池 :
返回 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) :
corePoolSize=0;
maximumPoolSize = Integer.MAX_VALUE;
keepAliveTime = 60000;
unit = TimeUnit.MILLISECONDS;
workQueue = new SynchronousQueue<Runnable>()
threadFactory = new NamedThreadFactory(name, true);
handler = new AbortPolicyWithReport(name, url)
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
DataStore ;

DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
采用SimpleDataStore 实现
最后将线程池保存起来:
dataStore.put(componentKey, Integer.toString(url.getPort()), executor); // componentKey = url.getParameter(“side”); port=20880; executor = 前面返回的线程池




最终 NettyClient 获取到的handler 为: 传入的ChannelHandler 通过包装的方法,形成一个调用链, MultiMessageHandler负责对多条消息的处理,HeartbeatHandler负责心跳包的处理,spi构造出AllChannelHandler,负责对事件进行派发。DecodeHandler中是对数据的反序代操作,HeaderExchangeHandler中对dubbo协议的数据进行处理,并回调reply方法。
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

reply方法在DubboProtocol属性中 :
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程


同时NettClient初始化时经过层层父类初始化:
NettyClient -> AbstractClient -> AbstractEndpoint -> class AbstractPeer implements ChannelHandler

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

AbstractEndpoint 构造时设置
编解码器 Codec : com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
超时时间 timeout: 1000
connectTimeout :3000

构造完成后 AbstractClient 回调doOpen方法开启服务,NettyClient的doOpen方法则是调用netty的api,开启了netty的客户端。接着回调doConnect方法创建与客户端的连接。
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程




NettyCilent.doOpen()调用netty api 并把Handler交由netty handler
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

connect() 方法 首先启动线程 进行 连接状态检测 2s检测一次,如果发现连接状态异常,重新
启动连接;具体见 initConnectStatusCheckCommand 方法,接着回调 NettyClient.doConnect()方法建立与客户端连接

【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
最终启动的netty连接:
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程
连接成功日志如下:

INFO transport.AbstractClient: [DUBBO] Successed connect to server /10.0.28.54:20880 from NettyClient 10.0.28.54 using dubbo version 2.0.0, channel is NettyChannel [channel=[id: 0x0d5d9e71, /10.0.28.54:62531 => /10.0.28.54:20880]], dubbo version: 2.0.0, current host: 10.0.28.54
Start NettyClient 755D025365/10.0.28.54 connect to the server /10.0.28.54:20880, dubbo version: 2.0.0, current host: 10.0.28.54

连接创建完毕,逐层向上返回到 DubboProtocol( protocol层),最终DubboProtocol.initClient 获取到的Client 为 :
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

此时创建好待返回的Invoker 为
【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程


在返回到 Protocol 包装类 DubboProtocol -> ProtocolListenerWrapper -> ProtocolFilterWrapper
ProtocolListenerWrapper 将返回的invoker 包装成 ListenerInvokerWrapper ;
ProtocolFilterWrapper 将返回的invoker加上 FIlter链(filter是根据 group=consumer, @Activate(group=“***”) 来区分是provider还是consumer端的,此处获取到的consumer的 Filter有 : ConsumerContextFilter -> MonitorFilter -> FutureFilter)


【dubbo源码分析】消费端 DubboProtocol 服务消费 refer 过程

到这里,新的invoker列表创建完毕,接下来会使用新的invoker列表与旧的invokers列表比对,删除掉未使用的invoker