dubbo的InvocationChain

时间:2023-12-31 12:45:32

个人觉得dubbo比较好的设计是:一个是Cooma微容器设计、另一个就是InvocationChain了

Cooma微容器是自己实现了一套SPI,方便了用户做扩展;

InvocationChain类似于servlet中的filter,在用户开发了扩展程序之后,能够方便的插入到consumer和provider的逻辑中

InvocationChain的构建

provider端InvocationChain构建:

dubbo的InvocationChain

具体看buildInvokerChain的实现,参数invoker就是被Wrapper的服务(com.xxx.HelloService)的实例,protocol是injvm; group=provider; key = service.filer

**流程 **

  • 首先从扩展点获取所有激活并且作用在provider端的Filter,filter list是按照Filter上的注解order升序排列
  • for循环是从filter list的末尾开始,filter[0]的next指向filter[1], list最末尾的filter[n-1] next指向了参数invoker。举例 filter list有4个元素,filter[0]->filter[1]->filter[2]->filter[3]->invoker
  • 返回给最外面的匿名包装Invoker filter[0], 在调用invoker.invoke()的时候,就能从 filter[0]开始逐个filter访问一遍,实现了访问实例invoker之前的filter
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() { public Class<T> getInterface() {
return invoker.getInterface();
} public URL getUrl() {
return invoker.getUrl();
} public boolean isAvailable() {
return invoker.isAvailable();
} public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
} public void destroy() {
invoker.destroy();
} @Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}

consumer端InvocationChain构建

dubbo的InvocationChain

consumer端buildInvokerChain流程一样的,只不过获取的所有group=consumer的filter。

总结

上面介绍了dubbo的InvocationChain的机制,我们可以想象用户自动一个provider或者consumer的filter是很简单的,只要增加一个Filter扩展点,指定排序order值就好了,dubbo会自己去主动加载。也不要把自定义的扩展点写在dubbo框架里面

有了Filter chain 用户想做dubbo调用自定义的监控和扩展就非常方便了,比如监控调用关系,调用链量,RT等等

一些关于Dubbo SPI解释

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()

我们知道加载protocol的SPI扩展点的时候,实际上是dubbo的SPI机制自动生成了一个Protocol$Adpative的类,它根据url里面的protocol字段自动加载SPI扩展点

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements Protocol {
public Invoker refer(Class arg0, URL arg1) throws Class {
if (arg1 == null) throw new IllegalArgumentException("url == null"); URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(Protocol) name from url(" + url.toString() + ") use keys([protocol])"); Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.refer(arg0, arg1);
} public Exporter export(Invoker arg0) throws Invoker {
if (arg0 == null) throw new IllegalArgumentException("Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("Invoker argument getUrl() == null");URL url = arg0.getUrl(); String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(Protocol) name from url(" + url.toString() + ") use keys([protocol])"); Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.export(arg0);
} public void destroy() {
throw new UnsupportedOperationException("method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
} public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
}
}

然后在ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName) 里面还有一些规则当Protocol有wrapper包装类的时候,返回的是包装类。

Protocol有2个包装类:ProtocolFilterWrapper和ProtocolListenerWrapper

private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}