相关博客:
Spring 自定义 XML 配置扩展
Dubbo的SPI机制(二)(Dubbo优化后的SPI实现)
Dubbo 的 SPI 机制(三)(Extension 扩展点补充)
Dubbo 之服务发布和注册(二)(补充)
官网的服务暴露时序图:
首先看看在使用 Dubbo 的时候是怎么发布服务的。
这是之前的一个 Demo 中的配置服务发布的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!--提供方信息,定义当前项目对外发布的一些内容信息,name尽量保证唯一(这里没有强制唯一),owner表示维护者-->
<dubbo:application name="dubbo-server" owner="dongguabai"/>
<!--注册中心-Dongguabai,N/A表示不需要依赖注册中心,集群使用逗号分隔-->
<!--注意要声明id-->
<dubbo:registry address="zookeeper://192.168.220.137:2181"/>
<dubbo:registry address="zookeeper://192.168.220.136:2181"/>
<!--发布的协议名称(默认dubbo协议)、端口-->
<dubbo:protocol port="20880" name="dubbo"/>
<dubbo:protocol port="20881" name="hessian"/>
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.IHello" ref="helloService" protocol="hessian,dubbo" registry="zk1"/>
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.IProtocol" ref="protocolService" protocol="dubbo" registry="zk2"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.HelloImpl"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.ProtocolImpl"/>
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.version.ICall" ref="callService" protocol="dubbo" registry="zk1" version="1.0.0" />
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.version.ICall" ref="callService2" protocol="dubbo" registry="zk2" version="2.0.0"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.version.CallImpl1"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.version.CallImpl2"/>
</beans>
Dubbo 在服务发布的过程会对配置文件进行一个解析,并且暴露一个端口号。
要知道 Dubbo 是如何发布服务的,需要找到一个入口,而 Dubbo 这里在 Spring 的 XML 中使用了很多自定义的标签,也就是说 Dubbo 是基于 Spring 做的一个扩展(具体可参看:Spring 自定义 XML 配置扩展)。根据 Spring 官方说明:
Creating new XML configuration extensions can be done by following these (relatively) simple steps:
- Authoring an XML schema to describe your custom element(s).
- Coding a custom NamespaceHandler implementation (this is an easy step, don’t worry).
- Coding one or more BeanDefinitionParser implementations (this is where the real work is done).
- Registering the above artifacts with Spring (this too is an easy step).
要实现自定义扩展,有三个步骤(在 Spring 中定义了两个接口,用来实现扩展):
- NamespaceHandler:注册一堆 BeanDefinitionParser,利用他们来进行解析
- BeanDefinitionParser:用于解析每个 element 的内容
- Spring默认会加载far包下的M ETA-I N F/spring. handlers文件寻找对应的NamespaceHandler
需要基于 NamespaceHandler和 BeanDefinitionParser 进行扩展,需要加载 META-INF/spring.handlers 和 META-INF/spring.schemas。先找找这两个文件:
存在一个 Key-Value 的关系:
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
进入这个 Handler:
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.config.spring.schema;
import org.springframework.beans.factory.xml.NamespaceHandlerSupport;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ConsumerConfig;
import com.alibaba.dubbo.config.ModuleConfig;
import com.alibaba.dubbo.config.MonitorConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.ProviderConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import com.alibaba.dubbo.config.spring.ReferenceBean;
import com.alibaba.dubbo.config.spring.ServiceBean;
/**
* DubboNamespaceHandler
*
* @author william.liangf
* @export
*/
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
这里面有很多元素标签对应的解析器。比如这个:
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
传入了两个参数,其中一个是 ApplicationConfig,也就是说在解析 application 的时候会将相应的参数加载至 ApplicationConfig:
所以在 <dubbo:application> 中也会有相应的属性:
BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,主要就是做了一件事,将不同的配置分别转化成 Spring 容器中的 Bean 对象:
- application 对应 ApplicationConfig
- registry 对应 RegistryConfig
- monitor 对应 MonitorConfig
- provider 对应 ProviderConfig
- consumer 对应 ConsumerConfig
- ......
DubboBeanDefinitionParser 中主要就是要解析配置文件,装载 BeanDefinition:
为了在 Spring 启动的时候,也相应的启动 provider 发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,这里就涉及到了两个很关键的 Bean:ServiceBean 和 ReferenceBean:
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
先看看 ServiceBean:
ServiceBean 继承了 ServiceConfig,同时还实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware 接口。
InitializingBean接口为bean提供了初始化执行的方法,即afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法(具体可参看:InitializingBean简单使用)。
DisposableBean 当Bean被销毁的时候,Spring容器会自动执行 destory 方法,比如释放资源。
ApplicationContextAware 实现了这个接口的 Bean,当 Spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来(有时候会根据这个特性动态获取 Spring Bean,具体可参看:Spring动态获取被管理的Bean)。
ApplicationListener ,ApplicationEvent 事件监听,Spring 容器启动后会发一个事件通知,有时候会根据这个机制在 Spring 容器初始化的时候执行一些初始化操作。
BeanNameAware 让 Bean 获取自己在 BeanFactory 配置中的名字(根据情况是 id 或者 name )(具体可参看:Spring 的 BeanNameAware 和 BeanFactoryAware 接口)。
那么基本的思路可以整理出来了:
delay
在 Dubbo 的配置中有一个 delay 属性:
在 com.alibaba.dubbo.config.spring.ServiceBean#afterPropertiesSet 中有这样一段代码:
......
......
if (! isDelay()) {
export();
}
}
isDelay() 方法如下:
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay.intValue() == -1);
}
这段代码还是很奇怪的,判断 isDelay() 后,如果不是延迟加载就 export(),但是只有 if 却没有 else 操作了。直接看看 export() 方法:
额,延迟加载的本质就是睡了下。这里的命名也是与 Spring 的命名一致。接下来就是调用 doExport():
protected synchronized void doExport() {
//一些判断逻辑
......
......
doExportUrls();
}
debug 调试
可以看到 doExport() 本质就是调用 doExportUrls() 方法:
@SuppressWarnings({ "unchecked", "rawtypes" })
private void doExportUrls() {
//获取注册中心的URL
List<URL> registryURLs = loadRegistries(true);
//配置了多个就会多次循环
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
debug 启动后:
获取到了注册中心的信息后,会循环执行 doExportUrlsFor1Protocol() 方法:
通过 debug 可以很方便的看出加载的内容,获取协议 name 如果为空,默认会走 dubbo 协议:
后面的一大段代码都是在获取一个合法 host(具体可参看:Dubbo使用之主机绑定):
后面还有一个 Map 去绑定所有合法的参数:
由于 Dubbo 是基于 URL 驱动的,最终会组装成为一个 URL:
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
通过 debug 也可以看出 Map 内组装的数据:
最终拼装成为的 URL 如下:
有点长:
hessian://172.30.57.63:20881/dongguabai.dubbo.IHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=dongguabai.dubbo.IHello&methods=sayHello&owner=dongguabai&pid=11968&side=provider×tamp=1543304936932
这个最终会注册到注册中心上去:
接下来进入一段非常核心的代码:
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
//循环注册中心的地址
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//这是一段非常重要的代码,后面会有介绍
//通过 proxyFactory 来获取Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//注册服务
Exporter<?> exporter = protocol.export(invoker);
//将 exporter 添加到 List 中去
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
先看这一段代码:
Exporter<?> exporter = protocol.export(invoker);
而这个 protocol 是:
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
有木有很熟悉,就是一个自适应的扩展点(具体可参看:javascript:void(0)#Extension%E6%89%A9%E5%B1%95%E7%82%B9%E5%88%86%E6%9E%90),返回的是一个动态适配器,就是 Protocol$Adpative。这里调用了 export() 方法,将之前博客中的这段代码 copy 出来:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
//获取 URL 地址
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo": url.getProtocol());
if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//获取指定名称的扩展点 Protocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
在我这个场景下获取到的 Protocol extension 是根据 extName 来的,extName 是根据 url.getProtocol() 来的。回过头看看之前的代码,看 extName 到底是什么:
也就是说最终获取的 Protocol extension 是根据 registry 来获取的。
这样设计的好处是避免了过多的判断,否则可能会出现大量的判断:
//伪代码
if(dubbo){
}else if(hessian){
}else if(rmi){
}.....
最麻烦的是如果有自定义的协议的话,这么 If else 还无法完成,那就需要改代码了。这样动态的生成自适应的扩展点灵活性就体现出来了。
即最终根据指定的名称 registry 获取扩展点:
Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
结合 Dubbo SPI 机制(具体可参看:javascript:void(0)),找到 registry 对应的扩展点:
对应的是 RegistryProtocol。接下来查看 RegistryProtocol 中的 export() 方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
//本地发布启动服务,后面有说明
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
doLocalExport():
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
//bounds 是一个全局缓存对象
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
//DCL
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//封装 ExporterChangeableWrapper
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
在 new ExporterChangeableWrapper() 的时候调用了 protocol.export(),那么这个 protocol 是怎么来的呢:
这里就是在之前的博客中介绍的会通过 set 方法去注入的 Protocol(具体可参看:javascript:void(0))。这里就是 Protocol$Adaptive,即又调用了 Protocol$Adaptive 的 export() 方法。
即最终根据指定的名称获取扩展点:
Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
经过 debug 调试,这里的 extName 就是 hessian,即协议名称:
即最终会生成这样的地址:
hessian://172.30.57.63:20881/dongguabai.dubbo.IHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=dongguabai.dubbo.IHello&methods=sayHello&owner=dongguabai&pid=8636&side=provider×tamp=1543310127438
为什么最后又会得到了这个地址,因为这里进行了一次包装,而包装是在 com.alibaba.dubbo.common.extension.ExtensionLoader#createExtension 中进行的(具体可参看:javascript:void(0)):
debug 可以看出来,包装的 Class 为 ProtocolFilterWrapper:
也是一个扩展点:
这里的包装就是将当前实例(HessianProtocol)包装到 ProtocolFilterWrapper.newInstance() 中。然后又会将扩展点进行注入(injectExtension)。那么这么包装有什么意义呢,从名字上来看 ProtocolFilterWrapper 是过滤器:
ProtocolFilterWrapper 的 export() 本质也是调用的它包装的 Protocol 的 export()。中间调用了 builderInvokerChain() 方法,这样就形成了一个链式调用:
最终会加载所有的 Filter:
做了很多监控、缓存、跟踪等工作。
接下来又会包装 ProtocolListenerWrapper:
接下来查看 ProtocolListenerWrapper 的 export() 方法:
我这边的版本是 2.5.3,这个版本的 exported() 方法是没有实现的,这里就先不管了。
这里最后会调用到 HessianProtocol 中,因为对 Hessian 不算特别了解,这里可以看 DubboProtocol 的 expot() 方法:
前面都是对 URL 的一些操作,直接看核心的 openServer() 方法,这里也用了一个 Map 缓存:
看看 createServer() 方法:
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//绑定 URL,这个方法后面会看
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers.bind() 方法:
也是基于扩展点进行返回:
最后也是会走到根据默认 header 获取 Exchange:
直接看 HeaderExchanger:
z这里就是对底层的传输层的操作了:
这个 bind() 方法有三个实现:
就是根据前面的 getTransporter() 方法的返回值来决定,又是一个自适应扩展点:
可以先看看 Transporter 接口:
在 bind() 方法上有 @Adaptive 注解,也就是说这个时候会动态生成一个 Transport$Adaptive。跟之前介绍的 Protocol$Adaptive 一样。这里直接 debug 看看:
一路会走到这里:
动态生成的 Transporter$Adaptive 为:
package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
public com.alibaba.dubbo.remoting.Client connect(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([client, transporter])");
}
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);
return extension.connect(arg0, arg1);
}
public com.alibaba.dubbo.remoting.Server bind(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("server",
url.getParameter("transporter", "netty"));
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([server, transporter])");
}
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);
return extension.bind(arg0, arg1);
}
}
Transporter$Adaptive 会根据传过来的一个协议信息选择一个对应的协议去发布。Dubbo 默认提供的有 Mina、Netty等,也可以自己扩展,看看 Transporter$Adaptive 的 bind() 方法:
public com.alibaba.dubbo.remoting.Server bind(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg0;
//默认是基于 Netty 的
String extName = url.getParameter("server",
url.getParameter("transporter", "netty"));
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([server, transporter])");
}
//获取扩展点
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);
return extension.bind(arg0, arg1);
}
直接看 NettyTransporter:
在 bind() 方法中直接求 new NettyServer():
后面就是底层基于 Netty 实现了。
至此服务发布已经完成:
再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中,之前的一系列操作都是基于这段代码来的:
启动 server 监听,发布本地服务:
后面就需要将地址注册到注册中心(简单理解为 ZooKeeper create 一个节点)。
再往下看:
debug 走一波:
此时 registryUrl value 为:
registry://192.168.220.137:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=hessian%3A%2F%2F172.30.57.63%3A20881%2Fdongguabai.dubbo.IHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Ddongguabai.dubbo.IHello%26methods%3DsayHello%26owner%3Ddongguabai%26pid%3D8436%26side%3Dprovider%26timestamp%3D1543319004863&owner=dongguabai&pid=8436®istry=zookeeper×tamp=1543319004772
接下来会将 registry 的协议头转化为 zookeeper 的协议头:
此时 registry 的 value 为:
zookeeper://192.168.220.137:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=hessian%3A%2F%2F172.30.57.63%3A20881%2Fdongguabai.dubbo.IHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Ddongguabai.dubbo.IHello%26methods%3DsayHello%26owner%3Ddongguabai%26pid%3D8436%26side%3Dprovider%26timestamp%3D1543319004863&owner=dongguabai&pid=8436×tamp=1543319004772
那这里为什么要改协议头呢,在之前已经介绍过了,会根据 registry 的协议头动态自适应。所以这里需要更改协议头。
那后面的 registryFactory 是什么呢:
这里就是一个注入的扩展点。查看RegistryFactory:
在 getRegistry() 方法上使用了 @Adaptive 注解,也就是说会动态生成 RegistryFactory$Adaptive。再看看 RegistryFactory$Adaptive 的内容,直接在 com.alibaba.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass 上加断点即可:
这里有点多,加个条件断点:
很快就到了,RegistryFactory$Adaptive 的内容为:
package com.alibaba.dubbo.registry;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
public com.alibaba.dubbo.registry.Registry getRegistry(
com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg0;
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" +
url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class)
.getExtension(extName);
return extension.getRegistry(arg0);
}
}
可以看到,又是从 URL 中 getProtocol() (extName),然后再根据 extName 获取扩展点,这个已经属于一个固定格式了。
想知道这个 extName 到底是什么,直接 debug 看看 registryUrl 即可:
这里就是 zookeeper。但是要注意的是默认是 dubbo:
再基于 Dubbo 的 SPI 规范从文件中找到对应关系:
看看 ZookeeperRegistryFactory:
调用的父类的 getRegistry() 方法:
这里使用了模板模式,父类定义了公共的方法,由子类实现 createRegistry() 方法。看子类的 createRegistry() 方法:
后面的就是 ZooKeeper 创建节点的过程了,Dubbo 使用的是 ZkClient :
再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中,在这里就获取了 ZooKeeperRegistry:
有多个实现了,但是没有看到 ZooKeeperRegistry,那么这里还是走的父类定义的方法:
也就是说会走 FailbackRegistry 的 register() 方法:
@Override
public void register(URL url) {
//调用父类 AbstractRegistry 的方法
super.register(url);
//失败重试
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求,子类实现的模板方法,待会看看子类 ZooKeeperRegistry的实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
}
看看 ZooKeeperRegistry 的 doRegister() 方法,就很简单了,就是创建节点赋值:
再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中来:
后面就是做一些订阅和监听
总体来说这一段代码:
主要做了这么两个事情:
- 通过 Netty 启动服务监听;
- 通过 ZooKeeper 注册了一个协议地址;
参考资料:
http://dubbo.apache.org/zh-cn/docs/dev/design.html