首先我们通过一个时序图,直观看下Dubbo服务提供方启动的流程:
- 在《Dubbo整体框架分析》一文中我们提到,服务提供方需要使用ServiceConfig API发布服务,具体是调用代码(1)export()方法来激活发布服务。export的核心代码如下:
public synchronized void export(){
...
// 这里是延迟发布
if(delay != null && delay > 0){
delayExportExecutor.schedule(new Runnable(){
public void run(){
doExport();
}
} , delay , TimeUnit.MILLISECONDS);
}else{
// 直接发布
doExport();
}
}
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter" , true));
如上代码,可知Dubbo的延迟发布通过使用ScheduledExecutorService来实现的,可以通过调用ServiceConfig的setDelay(Integer delay)来设置延迟发布时间。如果没有设置延迟时间则直接调用代码(2)doExport()方法发布服务,延迟发布最后也是调用的该方法。
- 代码(2)主要是根据ServiceConfig里面的属性进行合法性检查,这里我们主要看其内部最后调用的doExportUrls方法。
- 代码(3)内部首先通过调用loadRegistries方法加载所有的服务注册中心对象,Dubbo中一个服务可以被注册到多个服务注册中心。
- 另外Dubbo支持本地调用,本地调用使用了injvm协议,是一个伪协议,它不开启端口,不发起远程调用,旨在jvm内直接关联,但执行Dubbo的Filter链,所以默认下Dubbo同时支持本地调用与远程调用协议,所以这里的循环用来暴露本地服务和远程服务。
- 代码(5)的 doExportUrlsFor1Protocol 方法内部首先把参数封装在url,在Dubbo里面会把所有参数封装在一个url里面,然后具体执行服务到处,其核心代码如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref , (Class)interfaceClass , url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker , this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
其中proxyFactory和protocol的定义如下:
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdativeExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
可知它们是扩展接口的适配器类。
- 执行代码 proxyFactory.getInvoker(ref , (Class)interfaceClass , url)时,我们发现实际是首先执行代码(6)调用了扩展接口 ProxyFactory的适配器类,然后适配器内部根据url里proxy的类型选择具体的代理工厂,这里默认proxy类型为javasist,所以又调用了代码(7)执行了JavassistProxyFactory的getInvoker方法获取了代理类。
JavassistProxyFactory的getInvoker代码如下:
public <T> Invoker<T> getInvoker(T proxy , Class<T> type , URL url){
//
final Wrapper c = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy , type , url){
@Override
protected Object doInvoke(T proxy , String methodName , Class<?>[] parameterTypes , Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy,methodName,parameterTypes,arguments);
}
};
}
这里首先把服务实现类转换为Wrapper类,是为了减少反射的调用。代码(6)具体返回的是AbstractProxyInvoker对象,其内部重写doInvoke方法,具体委托给Wrapper实现具体功能。到这里完成了《Dubbo具体架构分析》一文中讲解的服务提供方实现类到Invoker的转换。
- 然后执行代码(8),当执行protocol.export(wrapperInvoker)的时候,实际调用了Protocol的适配器类Protocol$Adaptive的export方法。其内部根据url中Protocol的类型为registry,会选择Protocol的实现类RegistryProtocol。但由于Dubbo SPI的扩展点使用wrapper自动增强的存在,这里使用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper对其进行了增强,所以需要一层层调用才会具体调用到RegistryProtocol的export方法。
- 代码(12)RegistryProtocol中的export方法通过代码(13)doLocalExport启动了NettyServer进行监听服务,代码(14)则将当前服务注册到具体的服务注册中心。到这里完成《Dubbo整体架构分析》一文中谈到的Invoker到Exporter的转换。
下面我们再来看doLocalExport是如何启动NettyServer的,doLocalExport内部主要调用DubboProtocol的export方法,下面看下时序图:
由于DubboProtocol也被Wrapper类增强了,所以也是一层层调用后,执行代码(7)调用DubboProtocol的export方法,export代码如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException{
URL url = invoker.getUrl();
// invoker到exporter转换
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker , key , exporterMap);
exporterMap.put(key,exporter);
...
openServer(url);
return exporter;
}
这里将invoker转换到DubboExporter对象,然后执行代码(9),接着一步步最后会执行到代码(14)NettyTransporter的bind方法,其代码如下:
public Server bind(URL url,ChannelHandler listener) throws RemotingException{
return new NettyServer(url,listener);
}
有上面代码可知,其内部创建了NettyServer,而NettyServer的构造函数内部又调用了NettyServer的doOpen方法启动了服务监听。
public NettyServer(URL url,ChannelHandler handler) throws RemotingException{
super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME)));
}
这里我们主要看 ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME))这个代码,改代码里面加载了具体的线程模型,通过ChannelHandlers的wrapInternal方法完成了加载:
protected ChannelHandler wrapInternal(ChannelHandler handler,URL url){
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler,url)));
}
可知根据url里面的线程模型选择具体的Dispatcher实现类,这里再重新提下Dubbo提供的Dispatcher实现类如下,默认是all:
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
到这里线程模型加载地方讲解完了,还有一个点就是线程模型中线程池SPI扩展什么时候加载的呢?这里以线程模型AllDispatcher为例,它对应的是AllChannelHandler,其构造函数如下:
public AllChannelHandler(ChannelHandler handler,URL url){
super(handler , url); // 父类WrappedChannelHandler构造方法
}
public WrappedChannelHandler(ChannelHandler handler,URL url){
...
executor = (ExecutorService)ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
...
}
可知WrapperChannelHandler的构造函数里根据url获取了具体的扩展接口ThreadPool的SPI实现类,这里再提下ThreadPool的扩展接口如下,默认是fixed:
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool