Dubbo 之服务发布和注册(一)

时间:2022-12-24 08:58:44


相关博客:

​​Spring 自定义 XML 配置扩展​​

​​Dubbo的SPI机制(二)(Dubbo优化后的SPI实现)​​

​​Dubbo 的 SPI 机制(三)(Extension 扩展点补充)​​

​​Dubbo 之服务发布和注册(二)(补充)​​

 

官网的服务暴露时序图:

首先看看在使用 Dubbo 的时候是怎么发布服务的。

这是之前的一个 Demo 中的配置服务发布的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!--提供方信息,定义当前项目对外发布的一些内容信息,name尽量保证唯一(这里没有强制唯一),owner表示维护者-->
<dubbo:application name="dubbo-server" owner="dongguabai"/>

<!--注册中心-Dongguabai,N/A表示不需要依赖注册中心,集群使用逗号分隔-->
<!--注意要声明id-->
<dubbo:registry address="zookeeper://192.168.220.137:2181"/>
<dubbo:registry address="zookeeper://192.168.220.136:2181"/>

<!--发布的协议名称(默认dubbo协议)、端口-->
<dubbo:protocol port="20880" name="dubbo"/>
<dubbo:protocol port="20881" name="hessian"/>

<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.IHello" ref="helloService" protocol="hessian,dubbo" registry="zk1"/>
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.IProtocol" ref="protocolService" protocol="dubbo" registry="zk2"/>

<!--实现Bean-->
<bean class="dongguabai.dubbo.HelloImpl"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.ProtocolImpl"/>

<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.version.ICall" ref="callService" protocol="dubbo" registry="zk1" version="1.0.0" />
<!--当前创建服务的接口地址;ref指明实现;-->
<dubbo:service interface="dongguabai.dubbo.version.ICall" ref="callService2" protocol="dubbo" registry="zk2" version="2.0.0"/>


<!--实现Bean-->
<bean class="dongguabai.dubbo.version.CallImpl1"/>
<!--实现Bean-->
<bean class="dongguabai.dubbo.version.CallImpl2"/>

</beans>

Dubbo 在服务发布的过程会对配置文件进行一个解析,并且暴露一个端口号。

要知道 Dubbo 是如何发布服务的,需要找到一个入口,而 Dubbo 这里在 Spring 的 XML 中使用了很多自定义的标签,也就是说 Dubbo 是基于 Spring 做的一个扩展(具体可参看:​​Spring 自定义 XML 配置扩展​​)。根据 Spring 官方说明:

Creating new XML configuration extensions can be done by following these (relatively) simple steps:

  • Authoring an XML schema to describe your custom element(s).
  • Coding a custom NamespaceHandler implementation (this is an easy step, don’t worry).
  • Coding one or more BeanDefinitionParser implementations (this is where the real work is done).
  • Registering the above artifacts with Spring (this too is an easy step).

要实现自定义扩展,有三个步骤(在 Spring 中定义了两个接口,用来实现扩展):

  1. NamespaceHandler:注册一堆 BeanDefinitionParser,利用他们来进行解析
  2. BeanDefinitionParser:用于解析每个 element 的内容
  3. Spring默认会加载far包下的M ETA-I N F/spring. handlers文件寻找对应的NamespaceHandler

需要基于 NamespaceHandler和 BeanDefinitionParser 进行扩展,需要加载 META-INF/spring.handlers 和 META-INF/spring.schemas。先找找这两个文件:

Dubbo 之服务发布和注册(一)

存在一个 Key-Value 的关系:

http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

进入这个 Handler:

/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.config.spring.schema;

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ConsumerConfig;
import com.alibaba.dubbo.config.ModuleConfig;
import com.alibaba.dubbo.config.MonitorConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.ProviderConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import com.alibaba.dubbo.config.spring.ReferenceBean;
import com.alibaba.dubbo.config.spring.ServiceBean;

/**
* DubboNamespaceHandler
*
* @author william.liangf
* @export
*/
public class DubboNamespaceHandler extends NamespaceHandlerSupport {

static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}

public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}

}

这里面有很多元素标签对应的解析器。比如这个:

registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));

传入了两个参数,其中一个是 ApplicationConfig,也就是说在解析 application 的时候会将相应的参数加载至 ApplicationConfig:

Dubbo 之服务发布和注册(一)

所以在 <dubbo:application> 中也会有相应的属性:

Dubbo 之服务发布和注册(一)

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,主要就是做了一件事,将不同的配置分别转化成 Spring 容器中的 Bean 对象:

  • application 对应 ApplicationConfig
  • registry 对应 RegistryConfig
  • monitor 对应 MonitorConfig
  • provider 对应 ProviderConfig
  • consumer 对应 ConsumerConfig
  • ......

DubboBeanDefinitionParser 中主要就是要解析配置文件,装载 BeanDefinition:

Dubbo 之服务发布和注册(一)

为了在 Spring 启动的时候,也相应的启动 provider 发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,这里就涉及到了两个很关键的 Bean:ServiceBean 和 ReferenceBean:

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));

先看看 ServiceBean:

Dubbo 之服务发布和注册(一)

 ServiceBean 继承了 ServiceConfig,同时还实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware 接口。

InitializingBean接口为bean提供了初始化执行的方法,即afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法(具体可参看:​​InitializingBean简单使用​​)。

DisposableBean 当Bean被销毁的时候,Spring容器会自动执行 destory 方法,比如释放资源。

ApplicationContextAware 实现了这个接口的 Bean,当 Spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来(有时候会根据这个特性动态获取 Spring Bean,具体可参看:​​Spring动态获取被管理的Bean​​)。

ApplicationListener ,ApplicationEvent 事件监听,Spring 容器启动后会发一个事件通知,有时候会根据这个机制在 Spring 容器初始化的时候执行一些初始化操作。

BeanNameAware 让 Bean 获取自己在 BeanFactory 配置中的名字(根据情况是 id 或者 name )(具体可参看:​​Spring 的 BeanNameAware 和 BeanFactoryAware 接口​​)。

那么基本的思路可以整理出来了:

Dubbo 之服务发布和注册(一)

delay

在 Dubbo 的配置中有一个 delay 属性:

Dubbo 之服务发布和注册(一)

在 com.alibaba.dubbo.config.spring.ServiceBean#afterPropertiesSet 中有这样一段代码:

......
......
if (! isDelay()) {
export();
}
}

isDelay() 方法如下:

private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay.intValue() == -1);
}

这段代码还是很奇怪的,判断 isDelay() 后,如果不是延迟加载就 export(),但是只有 if 却没有 else 操作了。直接看看 export() 方法:

Dubbo 之服务发布和注册(一)

额,延迟加载的本质就是睡了下。这里的命名也是与 Spring 的命名一致。接下来就是调用 doExport():

protected synchronized void doExport() {
//一些判断逻辑
......
......
doExportUrls();
}

debug 调试

可以看到 doExport() 本质就是调用 doExportUrls() 方法:

@SuppressWarnings({ "unchecked", "rawtypes" })
private void doExportUrls() {
//获取注册中心的URL
List<URL> registryURLs = loadRegistries(true);
//配置了多个就会多次循环
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

debug 启动后:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

 获取到了注册中心的信息后,会循环执行 doExportUrlsFor1Protocol() 方法:

通过 debug 可以很方便的看出加载的内容,获取协议 name 如果为空,默认会走 dubbo 协议:

Dubbo 之服务发布和注册(一)

后面的一大段代码都是在获取一个合法 host(具体可参看:​​Dubbo使用之主机绑定​​):

Dubbo 之服务发布和注册(一)

后面还有一个 Map 去绑定所有合法的参数:

Dubbo 之服务发布和注册(一)

由于 Dubbo 是基于 URL 驱动的,最终会组装成为一个 URL:

URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

通过 debug  也可以看出 Map 内组装的数据:

Dubbo 之服务发布和注册(一)

最终拼装成为的 URL 如下:

Dubbo 之服务发布和注册(一)

有点长:

hessian://172.30.57.63:20881/dongguabai.dubbo.IHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=dongguabai.dubbo.IHello&methods=sayHello&owner=dongguabai&pid=11968&side=provider×tamp=1543304936932

这个最终会注册到注册中心上去:

Dubbo 之服务发布和注册(一)

接下来进入一段非常核心的代码:

//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
//循环注册中心的地址
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//这是一段非常重要的代码,后面会有介绍
//通过 proxyFactory 来获取Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//注册服务
Exporter<?> exporter = protocol.export(invoker);
//将 exporter 添加到 List 中去
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}

先看这一段代码:

Exporter<?> exporter = protocol.export(invoker);

而这个 protocol 是:

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

有木有很熟悉,就是一个自适应的扩展点(具体可参看:​​javascript:void(0)#Extension%E6%89%A9%E5%B1%95%E7%82%B9%E5%88%86%E6%9E%90​​),返回的是一个动态适配器,就是 Protocol$Adpative。这里调用了 export() 方法,将之前博客中的这段代码 copy 出来:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
//获取 URL 地址
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo": url.getProtocol());
if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//获取指定名称的扩展点 Protocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

在我这个场景下获取到的 Protocol extension 是根据 extName 来的,extName 是根据 url.getProtocol() 来的。回过头看看之前的代码,看 extName 到底是什么:

Dubbo 之服务发布和注册(一)

也就是说最终获取的 Protocol extension 是根据 registry 来获取的。

这样设计的好处是避免了过多的判断,否则可能会出现大量的判断:

//伪代码
if(dubbo){

}else if(hessian){

}else if(rmi){

}.....

最麻烦的是如果有自定义的协议的话,这么 If else 还无法完成,那就需要改代码了。这样动态的生成自适应的扩展点灵活性就体现出来了。

即最终根据指定的名称 registry 获取扩展点:

Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");

结合 Dubbo SPI 机制(具体可参看:​​javascript:void(0)​​),找到 registry 对应的扩展点:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

对应的是 RegistryProtocol。接下来查看 RegistryProtocol 中的 export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
//本地发布启动服务,后面有说明
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}

doLocalExport():

@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
//bounds 是一个全局缓存对象
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
//DCL
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//封装 ExporterChangeableWrapper
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}

在 new ExporterChangeableWrapper() 的时候调用了 protocol.export(),那么这个 protocol 是怎么来的呢:

Dubbo 之服务发布和注册(一)

这里就是在之前的博客中介绍的会通过 set 方法去注入的 Protocol(具体可参看:​​javascript:void(0)​​)。这里就是 Protocol$Adaptive,即又调用了 Protocol$Adaptive 的 export() 方法。

即最终根据指定的名称获取扩展点:

Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

经过 debug 调试,这里的 extName 就是 hessian,即协议名称:

Dubbo 之服务发布和注册(一)

即最终会生成这样的地址:

hessian://172.30.57.63:20881/dongguabai.dubbo.IHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=dongguabai.dubbo.IHello&methods=sayHello&owner=dongguabai&pid=8636&side=provider×tamp=1543310127438

为什么最后又会得到了这个地址,因为这里进行了一次包装,而包装是在  com.alibaba.dubbo.common.extension.ExtensionLoader#createExtension 中进行的(具体可参看:​​javascript:void(0)​​):

Dubbo 之服务发布和注册(一)

debug 可以看出来,包装的 Class 为 ProtocolFilterWrapper:

Dubbo 之服务发布和注册(一)

也是一个扩展点:

Dubbo 之服务发布和注册(一)

这里的包装就是将当前实例(HessianProtocol)包装到 ProtocolFilterWrapper.newInstance() 中。然后又会将扩展点进行注入(injectExtension)。那么这么包装有什么意义呢,从名字上来看 ProtocolFilterWrapper 是过滤器:

Dubbo 之服务发布和注册(一)

ProtocolFilterWrapper 的 export() 本质也是调用的它包装的 Protocol 的 export()。中间调用了 builderInvokerChain() 方法,这样就形成了一个链式调用:

Dubbo 之服务发布和注册(一)

最终会加载所有的 Filter:

Dubbo 之服务发布和注册(一)

做了很多监控、缓存、跟踪等工作。

接下来又会包装 ProtocolListenerWrapper:

Dubbo 之服务发布和注册(一)

接下来查看 ProtocolListenerWrapper 的 export() 方法:

Dubbo 之服务发布和注册(一)

我这边的版本是 2.5.3,这个版本的 exported() 方法是没有实现的,这里就先不管了。

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

这里最后会调用到 HessianProtocol 中,因为对 Hessian 不算特别了解,这里可以看 DubboProtocol 的 expot() 方法:

Dubbo 之服务发布和注册(一)

前面都是对 URL 的一些操作,直接看核心的 openServer() 方法,这里也用了一个 Map 缓存:

Dubbo 之服务发布和注册(一)

看看 createServer() 方法:

private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);

url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//绑定 URL,这个方法后面会看
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}

Exchangers.bind() 方法:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

也是基于扩展点进行返回:

Dubbo 之服务发布和注册(一)

最后也是会走到根据默认 header 获取 Exchange:

Dubbo 之服务发布和注册(一)

直接看 HeaderExchanger:

z这里就是对底层的传输层的操作了:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

这个 bind() 方法有三个实现:

Dubbo 之服务发布和注册(一)

就是根据前面的 getTransporter() 方法的返回值来决定,又是一个自适应扩展点:

Dubbo 之服务发布和注册(一)

可以先看看 Transporter 接口:

Dubbo 之服务发布和注册(一)

在 bind() 方法上有 @Adaptive 注解,也就是说这个时候会动态生成一个 Transport$Adaptive。跟之前介绍的 Protocol$Adaptive 一样。这里直接 debug 看看:

Dubbo 之服务发布和注册(一)

一路会走到这里:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

动态生成的 Transporter$Adaptive 为:

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.extension.ExtensionLoader;


public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
public com.alibaba.dubbo.remoting.Client connect(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([client, transporter])");
}

com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);

return extension.connect(arg0, arg1);
}

public com.alibaba.dubbo.remoting.Server bind(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("server",
url.getParameter("transporter", "netty"));

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([server, transporter])");
}

com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);

return extension.bind(arg0, arg1);
}
}

Transporter$Adaptive 会根据传过来的一个协议信息选择一个对应的协议去发布。Dubbo 默认提供的有 Mina、Netty等,也可以自己扩展,看看 Transporter$Adaptive 的 bind() 方法:

public com.alibaba.dubbo.remoting.Server bind(
com.alibaba.dubbo.common.URL arg0,
com.alibaba.dubbo.remoting.ChannelHandler arg1)
throws com.alibaba.dubbo.common.URL {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg0;
//默认是基于 Netty 的
String extName = url.getParameter("server",
url.getParameter("transporter", "netty"));

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" +
url.toString() + ") use keys([server, transporter])");
}
//获取扩展点
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class)
.getExtension(extName);

return extension.bind(arg0, arg1);
}

直接看 NettyTransporter:

Dubbo 之服务发布和注册(一)

在 bind() 方法中直接求 new NettyServer():

Dubbo 之服务发布和注册(一)

后面就是底层基于 Netty 实现了。

至此服务发布已经完成:

Dubbo 之服务发布和注册(一)

再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中,之前的一系列操作都是基于这段代码来的:

启动 server 监听,发布本地服务:

Dubbo 之服务发布和注册(一)

后面就需要将地址注册到注册中心(简单理解为 ZooKeeper create 一个节点)。

再往下看:

Dubbo 之服务发布和注册(一)

debug 走一波:

Dubbo 之服务发布和注册(一)

此时 registryUrl value 为:

registry://192.168.220.137:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=hessian%3A%2F%2F172.30.57.63%3A20881%2Fdongguabai.dubbo.IHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Ddongguabai.dubbo.IHello%26methods%3DsayHello%26owner%3Ddongguabai%26pid%3D8436%26side%3Dprovider%26timestamp%3D1543319004863&owner=dongguabai&pid=8436®istry=zookeeper×tamp=1543319004772

 接下来会将 registry 的协议头转化为 zookeeper 的协议头:

Dubbo 之服务发布和注册(一)

此时 registry 的 value 为:

zookeeper://192.168.220.137:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=hessian%3A%2F%2F172.30.57.63%3A20881%2Fdongguabai.dubbo.IHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Ddongguabai.dubbo.IHello%26methods%3DsayHello%26owner%3Ddongguabai%26pid%3D8436%26side%3Dprovider%26timestamp%3D1543319004863&owner=dongguabai&pid=8436×tamp=1543319004772

那这里为什么要改协议头呢,在之前已经介绍过了,会根据 registry 的协议头动态自适应。所以这里需要更改协议头。

那后面的 registryFactory 是什么呢:

Dubbo 之服务发布和注册(一)

这里就是一个注入的扩展点。查看RegistryFactory:

Dubbo 之服务发布和注册(一)

在 getRegistry() 方法上使用了 @Adaptive 注解,也就是说会动态生成 RegistryFactory$Adaptive。再看看 RegistryFactory$Adaptive 的内容,直接在 com.alibaba.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass 上加断点即可:

Dubbo 之服务发布和注册(一)

这里有点多,加个条件断点:

Dubbo 之服务发布和注册(一)

很快就到了,RegistryFactory$Adaptive 的内容为:

package com.alibaba.dubbo.registry;

import com.alibaba.dubbo.common.extension.ExtensionLoader;


public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
public com.alibaba.dubbo.registry.Registry getRegistry(
com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg0;
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" +
url.toString() + ") use keys([protocol])");
}

com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class)
.getExtension(extName);

return extension.getRegistry(arg0);
}
}

可以看到,又是从 URL 中 getProtocol() (extName),然后再根据 extName 获取扩展点,这个已经属于一个固定格式了。

想知道这个 extName 到底是什么,直接 debug 看看 registryUrl 即可:

Dubbo 之服务发布和注册(一)

这里就是 zookeeper。但是要注意的是默认是 dubbo:

Dubbo 之服务发布和注册(一)

再基于 Dubbo 的 SPI 规范从文件中找到对应关系:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

看看 ZookeeperRegistryFactory:

Dubbo 之服务发布和注册(一)

调用的父类的 getRegistry() 方法:

Dubbo 之服务发布和注册(一)

这里使用了模板模式,父类定义了公共的方法,由子类实现 createRegistry() 方法。看子类的 createRegistry() 方法:

Dubbo 之服务发布和注册(一)

后面的就是 ZooKeeper 创建节点的过程了,Dubbo 使用的是 ZkClient :

Dubbo 之服务发布和注册(一)

再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中,在这里就获取了 ZooKeeperRegistry:

Dubbo 之服务发布和注册(一)

有多个实现了,但是没有看到 ZooKeeperRegistry,那么这里还是走的父类定义的方法:

Dubbo 之服务发布和注册(一)

Dubbo 之服务发布和注册(一)

也就是说会走 FailbackRegistry 的 register() 方法:

@Override
public void register(URL url) {
//调用父类 AbstractRegistry 的方法
super.register(url);
//失败重试
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求,子类实现的模板方法,待会看看子类 ZooKeeperRegistry的实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
}

看看 ZooKeeperRegistry 的 doRegister() 方法,就很简单了,就是创建节点赋值:

Dubbo 之服务发布和注册(一)

再回到 com.alibaba.dubbo.registry.integration.RegistryProtocol#export 中来:

后面就是做一些订阅和监听

Dubbo 之服务发布和注册(一)

总体来说这一段代码:

Dubbo 之服务发布和注册(一)

主要做了这么两个事情:

  1. 通过 Netty 启动服务监听;
  2. 通过 ZooKeeper 注册了一个协议地址;

 

参考资料:

​http://dubbo.apache.org/zh-cn/docs/dev/design.html​