DubboProtocol.refer() 过程
==============================================================代码分析====================================================================
==============================================================代码分析====================================================================
==============================================================代码分析====================================================================
- ==============================================================1.Protocol层====================================================================
先看看DubboProtocol.refer(Class<T> serviceType,URL url)方法:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
serviceType 为 服务的interface,方法返回一个已经创建好netty长连接的DubboInvoker实例,连接创建见 getClient(url) 方法
initClient(url) 方法创建socket连接(默认只允许nio方式):
1)如果url中设置了 lazy,则只创建连接实例,但是不立即connect,只有在使用时才会进行connect
2)不设置lazy 直接返回已经connect好的实例。
ExchangeClient client;
try {
//设置连接应该是lazy的 LAZY_CONNECT_KEY = lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
从
LazyConnectExchangeClient.send 方法中可知道 在首次使用的时候调用initClient()进行connect,下一次调用如果connect已经存在,则直接返回
2. ==============================================================2.Exchanges层===================================================================
接下来看看Exchangers 方法 ->
ExchangeClient connect(URL url, ExchangeHandler handler)
:
1)首先会根据 url中 key= exchanger
关键字获取值,没有则使用默认值 header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); //EXCHANGER_KEY=exchanger
; DEFAULT_EXCHANGER = header
2)依据type 使用 ExtensionLoader获取 Extension:
ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
最终获取到 Exchanger实现类 HeaderExchanger;因此实际执行的是 HeaderExchanger.connect(URL url, ExchangeHandler handler):
HeaderExchanger.connect 方法返回一个 HeaderExchangeClient实例,
创建出来的ExchangeClient是HeaderExchangeClient,它也是Client的包装类,仅仅在Client外层加上心跳检测的功能,向它所连接的服务器端发送心跳检测。
==============================================================3.
Transporter
s层===================================================================
HeaderExchangeClient需要外界给它传一个Client实现,这是由Transporter接口实现来定的,默认是NettyTransporter
创建出来的的Client实现是NettyClient。
同时DubboProtocol的ChannelHandler实现经过层层装饰器包装,最终传给底层通信Client。
NettyClient 会对DubboProtocol传递进来的Handler进一步封装 :
设置线程池类型:默认设置 cachepool
调用ChannelHandlers进行进一步封装:ChannelHandlers.wrap ->ChannelHandlers.getInstance().wrapInternal(handler, url)
==============================================================4.
Dispatcher
层===================================================================
通过 Dispatcher进一步封装,HeartbeatHandler等;
Dispatcher 使用的Adaptive类动态代理执行后 最终使用默认的 AllDispatcher.dispatch(handler,url) 类处理,将顶层 Protocol的Handler交由 AllChannelHandler
AllChannelHandler继承 WrappedChannelHandler:
设置 ThreadPool :url中设置threadpool=cached,因此使用的是 CachedThreadPool 类创建线程池 :
返回
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit
unit,
BlockingQueue
<
Runnable
> workQueue,
ThreadFactory
threadFactory,
RejectedExecutionHandler
handler)
:
corePoolSize=0;
maximumPoolSize = Integer.MAX_VALUE;
keepAliveTime
= 60000;
unit = TimeUnit.MILLISECONDS;
workQueue
= new SynchronousQueue<Runnable>()
threadFactory
= new NamedThreadFactory(name, true);
handler
= new AbortPolicyWithReport(name, url)
DataStore ;
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
采用SimpleDataStore 实现
最后将线程池保存起来:
dataStore.put(componentKey, Integer.toString(url.getPort()), executor); // componentKey = url.getParameter(“side”); port=20880; executor = 前面返回的线程池
最终
NettyClient 获取到的handler 为:
传入的ChannelHandler 通过包装的方法,形成一个调用链, MultiMessageHandler负责对多条消息的处理,HeartbeatHandler负责心跳包的处理,spi构造出AllChannelHandler,负责对事件进行派发。DecodeHandler中是对数据的反序代操作,HeaderExchangeHandler中对dubbo协议的数据进行处理,并回调reply方法。
reply方法在DubboProtocol属性中 :
同时NettClient初始化时经过层层父类初始化:
NettyClient -> AbstractClient -> AbstractEndpoint -> class AbstractPeer implements ChannelHandler
AbstractEndpoint 构造时设置
编解码器 Codec : com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
超时时间 timeout: 1000
connectTimeout :3000
构造完成后 AbstractClient
回调doOpen方法开启服务,NettyClient的doOpen方法则是调用netty的api,开启了netty的客户端。接着回调doConnect方法创建与客户端的连接。
NettyCilent.doOpen()调用netty api 并把Handler交由netty handler
connect() 方法 首先启动线程 进行 连接状态检测 2s检测一次,如果发现连接状态异常,重新
启动连接;具体见 initConnectStatusCheckCommand 方法,接着回调 NettyClient.doConnect()方法建立与客户端连接
最终启动的netty连接:
连接成功日志如下:
INFO transport.AbstractClient: [DUBBO] Successed connect to server
/10.0.28.54:20880 from
NettyClient 10.0.28.54 using dubbo version 2.0.0,
channel is NettyChannel [channel=[id: 0x0d5d9e71, /10.0.28.54:62531 => /10.0.28.54:20880]], dubbo version: 2.0.0, current host: 10.0.28.54
Start NettyClient 755D025365/10.0.28.54 connect to the server /10.0.28.54:20880, dubbo version: 2.0.0, current host: 10.0.28.54
连接创建完毕,逐层向上返回到 DubboProtocol(
protocol层),最终DubboProtocol.initClient 获取到的Client 为 :
此时创建好待返回的Invoker 为
在返回到 Protocol 包装类 DubboProtocol -> ProtocolListenerWrapper -> ProtocolFilterWrapper
ProtocolListenerWrapper 将返回的invoker 包装成 ListenerInvokerWrapper ;
ProtocolFilterWrapper 将返回的invoker加上 FIlter链(filter是根据 group=consumer, @Activate(group=“***”) 来区分是provider还是consumer端的,此处获取到的consumer的 Filter有 : ConsumerContextFilter -> MonitorFilter -> FutureFilter)
到这里,新的invoker列表创建完毕,接下来会使用新的invoker列表与旧的invokers列表比对,删除掉未使用的invoker