【Dubbo源码阅读系列】服务暴露之本地暴露

时间:2023-11-23 11:24:44

在上一篇文章中我们介绍 Dubbo 自定义标签解析相关内容,其中我们自定义的 XML 标签 <dubbo:service /> 会被解析为 ServiceBean 对象(传送门:Dubbo XML 配置加载)。今天我们讲述的内容和 ServiceBean 密切相关!

细心的读者在阅读 ServiceBean 类时会发现 onApplicationEvent() 方法和 afterPropertiesSet() 方法调用了一个共同的方法 export()。直觉告诉我们这个方法应该和服务的暴露有关,我们接下来就

从 export() 方法入手分析。

export()方法调用时机

为了解答 export() 调用时机问题,我们需要关注 ServiceBean 类中的三个方法

  1. setApplicationContext(ApplicationContext applicationContext)

    ServiceBean 实现了 ApplicationContextAware 接口,在 ServiceBean 初始化后,会调用 setApplicationContext 注入 Spring 上下文;
  2. afterPropertiesSet()

    注入 ApplicationConfig、registries、protocols 等属性;
  3. onApplicationEvent(ContextRefreshedEvent event)

    这里接受的 event 事件类型为 ContextRefreshedEvent。当 applicationContext 被初始化或者刷新时,会调用该方法。

    这三个方法在 Spring 生命周期中被调用的顺序大致如下图所示

    setApplicationContext()——> afterPropertiesSet() ——> onApplicationEvent()

    我们结合代码继续看
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
supportedApplicationListener = addApplicationListener(applicationContext, this);
} public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
} public void afterPropertiesSet() throws Exception {
// 省略...
if (!supportedApplicationListener) {
export();
}
}

代码执行逻辑大致如下:

  1. 首先执行 setApplicationContext() 方法,注入上下文。这里的 supportedApplicationListener 用于判断 Spring 是否支持 Spring 监听机制。
  2. 执行 afterPropertiesSet() 方法。如果 supportedApplicationListener 值为 false,调用 export() 方法。
  3. 执行 onApplicationEvent() 方法。如果没有执行过 export() 以及 unexport() 方法,调用 export() 方法。

    通过上面简单的分析我们可以看到 export() 方法只会在 onApplicationEvent() 和 export() 方法中调用一次。

export() 方法解析

public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
} if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}

export()方法比较简单。注意这里有个 delay 变量,我们可以使用该变量延迟执行 export() 方法。

继续看 doExport() 方法

protected synchronized void doExport() {
// 省略...
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
} private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

doExport()方法省略了很多 ServiceBean 配置校验和初始化代码。大家有兴趣可以自行阅览。这里直接划重点!!!分析 doExportUrls() 方法!!!

先看 loadRegistries() 方法:

loadRegistries()

protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
// registries 在 afterPropertiesSet() 方法中初始化
if (registries != null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 将 application/config 部分属性整合到 map 中,详细见:
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
// 构建 url ,返回结果类似 zookeeper://192.168.0.100:2181/org.apache.dubbo.registry.RegistryService?
// application=demo-provider&dubbo=2.0.2&pid=22705&qos.port=22222&timestamp=1549005672530
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
// 将此时 url 的 protocol 保存到 registry 参数中
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
// 设置 url protcol 属性为 registry
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}

loadRegistries() 用于加载注册中心。概括来说就是用于解析我们在配置文件中定义的 <dubbo:registry /> 标签。

checkRegistry() 方法用于校验注册中心配置校验,里面有一些版本兼容的代码。appendParameters() 方法详见 appendParameters() 小节。

本地暴露

介绍完 loadRegistries() 方法,我们接着看 doExportUrlsFor1Protocol()。doExportUrlsFor1Protocol() 方法比较长,这里我们挑出和本地暴露相关的内容进行分析。

if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 远程暴露相关内容,省略...
}
}
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}

看到 exportLocal() 方法,意味着我们已经快要直达本地服务暴露的核心了!更令人按捺不住的是!这里又用到了 Dubbo 中的 SPI 机制(详见系列第一篇Dubbo SPI)。让我们看看这里到底做了什么?

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

熟悉的配方熟悉的料,在这里我们获取了 Protocol 和 ProxyFactory 对应的自适应扩展类。根据方法调用的嵌套逻辑,先来看 ProxyFactory 自适应扩展类 ProxyFactory$Adaptive 的 getInvoker() 方法。

核心方法 proxyFactory.getInvoker()

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = null;
try {
extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
}catch(Exception e){
if (count.incrementAndGet() == 1) {
logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", e);
}
extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
}
return extension.getInvoker(arg0, arg1, arg2);
}
}

这里我们实际会去调用 StubProxyFactoryWrapper 包装类的 getInvoker() 方法,如果不明白可以先看下 【Dubbo源码阅读系列】之 Dubbo SPI 机制

public class StubProxyFactoryWrapper implements ProxyFactory {
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return proxyFactory.getInvoker(proxy, type, url);
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}

结合上面的代码我们发现,发现最后调用的是 JavassistProxyFactory 类的 getInvoker() 方法。其中 wrapper 是动态生成的代理对象。最后返回一个 AbstractProxyInvoker 对象,doInvoke() 方法会调用 wrapper 代理类的 invokeMethod() 方法,其中 invokeMethod() 方法大概如下所示:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
org.apache.dubbo.demo.provider.DemoServiceImpl w;
try {
w = ((org.apache.dubbo.demo.provider.DemoServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals($2) && $3.length == 1) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}

稍微有一点绕,至少我们已经看完了 proxyFactory.getInvoker() 方法了,我们获取到了一个包装了动态代理类的 AbstractProxyInvoker 对象。接下来继续看 protocol.export() 方法。

核心方法 protocol.export()

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = null;
try {
extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
}catch(Exception e){
if (count.incrementAndGet() == 1) {
logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", e);
}
extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("dubbo");
}
return extension.export(arg0);
}

由于此时的 url 中 protocol 值为 injvm(url 经过 setProtocol(LOCAL_PROTOCOL) 操作后 protocol 已经更新为 injvm),因此我们这里获得的扩展类实际为包装了 InjvmProtocol 的包装类对象,对 wrapper 类有疑问的可以看下【Dubbo源码阅读系列】之 Dubbo SPI 机制

这里会涉及到一个方法 buildInvokerChain() 方,道它用于构建一个调用链。

整体调用时序简图如下所示:

【Dubbo源码阅读系列】服务暴露之本地暴露

最后 exportLocal() 方法中获取到的是一个 InjvmExporter 对象,并将其添加到 ServiceConfig 类的 exporters 集合中。

buildInvokerChain()

ProtocolFilterWrapper.java
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
// 省略 Invoker 构建代码...
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
// 省略 Invoker 构建代码...
};
}
}
return last;
}

buildInvokerChain() 方法用于构建调用链,初步浏览下来发现调用链应该是由 Filter 扩展类构成。那么这些 Filter 扩展类又从何而来呢?这行代码很关键!!!

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

对于这段代码我们应该有很强的亲切感,但仔细看又稍稍有所不同。实际上被 @Activate 注解标记的扩展类会被加载到 ExtensionLoader 类的 cachedActivates 集合中。

我们在调用 ExtensionLoader 类的 getActivateExtension() 时,会根据我们传入的 key 和 group 值从 cachedActivates 集合中获取满足当前条件的 filter 对象。

拿到 filters 集合后,会用链表的形式拼接 filter 调用链,举个例子:

假设当前获取到的 filters 集合中保存的 filter 对象为 filter0、filter1、filter2。我们对 filters 集合进行倒序遍历。最后获得的 last 其实为新建的 ivk2 对象。如果我们调用 last 的 invoke 方法,调用链如下图所示:

【Dubbo源码阅读系列】服务暴露之本地暴露

End

本文介绍了 Export() 方法被调用的时机以及基本流程。并且花了一定篇幅对 Dubbo 服务本地暴露进行了分析。其中掺杂了不少代码的分析,可能没有面面俱到吧。还是建议大家自己自己 Debug 一下,很多东西瞬间秒懂,有助于源码理解。下一篇文章我们介绍 Dubbo 服务远程暴露。

appendProperties()

protected static void appendProperties(AbstractConfig config) {
if (config == null) {
return;
}
// getTagName:获取去除了 Bean/Config 结尾的小写类名(ApplicationConfig->application)
String prefix = "dubbo." + getTagName(config.getClass()) + ".";
Method[] methods = config.getClass().getMethods();
for (Method method : methods) {
try {
String name = method.getName();
// 1、方法长度大于3;2、方法以 set 开头;3、方法修饰符类型为 public;4、形参个数为 1;5、形参类型为基本类型
if (name.length() > 3 && name.startsWith("set") && Modifier.isPublic(method.getModifiers())
&& method.getParameterTypes().length == 1 && isPrimitive(method.getParameterTypes()[0])) {
// camelToSplitName: 举个例子 ApplicationConfig——>application.config
String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "."); String value = null;
if (config.getId() != null && config.getId().length() > 0) {
// 拼接属性名称,并尝试获取对应属性
String pn = prefix + config.getId() + "." + property;
value = System.getProperty(pn);
if (!StringUtils.isBlank(value)) {
logger.info("Use System Property " + pn + " to config dubbo");
}
}
if (value == null || value.length() == 0) {
// 比如当前 config 为 ApplicationConfig,pn = dubbo.application.xxx
String pn = prefix + property;
value = System.getProperty(pn);
if (!StringUtils.isBlank(value)) {
logger.info("Use System Property " + pn + " to config dubbo");
}
}
if (value == null || value.length() == 0) {
Method getter;
try {
getter = config.getClass().getMethod("get" + name.substring(3));
} catch (NoSuchMethodException e) {
try {
getter = config.getClass().getMethod("is" + name.substring(3));
} catch (NoSuchMethodException e2) {
getter = null;
}
}
if (getter != null) {
if (getter.invoke(config) == null) {
// 尝试使用 ConfigUtils.getProperty() 方法获取属性值
// 尝试从 dubbo.properties.file 文件或 dubbo.properties 文件中读取属性
if (config.getId() != null && config.getId().length() > 0) {
value = ConfigUtils.getProperty(prefix + config.getId() + "." + property);
}
if (value == null || value.length() == 0) {
value = ConfigUtils.getProperty(prefix + property);
}
if (value == null || value.length() == 0) {
String legacyKey = legacyProperties.get(prefix + property);
if (legacyKey != null && legacyKey.length() > 0) {
value = convertLegacyValue(legacyKey, ConfigUtils.getProperty(legacyKey));
}
} }
}
}
if (value != null && value.length() > 0) {
method.invoke(config, convertPrimitive(method.getParameterTypes()[0], value));
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}

appendParameters()

protected static void appendParameters(Map<String, String> parameters, Object config) {
appendParameters(parameters, config, null);
}
protected static void appendParameters(Map<String, String> parameters, Object config, String prefix) {
if (config == null) {
return;
}
Method[] methods = config.getClass().getMethods();
// 遍历 config 类方法集合
for (Method method : methods) {
try {
String name = method.getName();
// 找到满足以下的方法:以set/is 开头,非 getClass;方法修饰符为 public;方法参数个数为 0;返回类型为基本类型
if ((name.startsWith("get") || name.startsWith("is"))
&& !"getClass".equals(name)
&& Modifier.isPublic(method.getModifiers())
&& method.getParameterTypes().length == 0
&& isPrimitive(method.getReturnType())) {
// 获取 parameter 注解
Parameter parameter = method.getAnnotation(Parameter.class);
// @Parameter(excluded = true),直接跳过
if (method.getReturnType() == Object.class || parameter != null && parameter.excluded()) {
continue;
}
int i = name.startsWith("get") ? 3 : 2;
String prop = StringUtils.camelToSplitName(name.substring(i, i + 1).toLowerCase() + name.substring(i + 1), ".");
String key;
if (parameter != null && parameter.key().length() > 0) {
key = parameter.key();
} else {
key = prop;
}
// 利用反射调用 config 类中的 get/is 方法
Object value = method.invoke(config);
String str = String.valueOf(value).trim();
if (value != null && str.length() > 0) {
// 是否需要转义,UTF-8
if (parameter != null && parameter.escaped()) {
str = URL.encode(str);
}
if (parameter != null && parameter.append()) {
String pre = parameters.get(Constants.DEFAULT_KEY + "." + key);
if (pre != null && pre.length() > 0) {
str = pre + "," + str;
}
pre = parameters.get(key);
if (pre != null && pre.length() > 0) {
str = pre + "," + str;
}
}
if (prefix != null && prefix.length() > 0) {
key = prefix + "." + key;
}
// key/value 添加到 parameters 集合
parameters.put(key, str);
} else if (parameter != null && parameter.required()) {
throw new IllegalStateException(config.getClass().getSimpleName() + "." + key + " == null");
}
// 方法名为 getParameters();方法修饰符为 public;方法形参个数为0;返回类型为 Map
} else if ("getParameters".equals(name)
&& Modifier.isPublic(method.getModifiers())
&& method.getParameterTypes().length == 0
&& method.getReturnType() == Map.class) {
Map<String, String> map = (Map<String, String>) method.invoke(config, new Object[0]);
if (map != null && map.size() > 0) {
String pre = (prefix != null && prefix.length() > 0 ? prefix + "." : "");
for (Map.Entry<String, String> entry : map.entrySet()) {
parameters.put(pre + entry.getKey().replace('-', '.'), entry.getValue());
}
}
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}

该方法会调用当前类对象的 isXXX/getXXX 方法(非 getClass 方法;方法修饰符为 public;形参个数为 0;返回类型为基本类型),获取其返回值构造键值对添加到指定 map 集合中;同时也会解析 getParameters() 返回的结果,构造键值对注入到 map 集合中。

本BLOG上原创文章未经本人许可,不得用于商业用途及传统媒体。网络媒体转载请注明出处,否则属于侵权行为。https://juejin.im/post/5c2b7ab46fb9a049d236273b