【知识库】--Dubbo ReferenceBean获取 -- router路由服务 源码过程(255)

时间:2021-11-03 19:25:59

Router 路由: 根据路由规则从多个Invoker 中选出一个子集。 

应用:AbstractDirectory是所有目录服务实现的上层抽象, 它在list 列举出所有invokers 后,会在通过 Router服务进行路由过滤。 

1 路由接口定义

/**
* Router. (SPI, Prototype, ThreadSafe)
*
* <a href="http://en.wikipedia.org/wiki/Routing">Routing</a>
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
* @see com.alibaba.dubbo.rpc.cluster.Directory#list(Invocation)
* @author chao.liuc
*/
public interface Router extends Comparable<Router> {

/**
* get the router url.
*
* @return url
*/
URL getUrl();

/**
* route.
*
* @param invokers
* @param url refer url
* @param invocation
* @return routed invokers
* @throws RpcException
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

2 应用1:com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) throw new RpcException("Directory already destroyed .url: " + getUrl());
List<Invoker<T>> invokers = doList(invocation);
return route(invokers, invocation);
}

private List<Invoker<T>> route(List<Invoker<T>> invokers, Invocation invocation) {
// local reference
List<Router> localRouters = this.routers;
if (localRouters == null || localRouters.isEmpty()) return invokers;

for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);//调用具体路由规则
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return invokers;
}


3 应用2 com.alibaba.dubbo.registry.integration.RegistryDirectory#route

//服务发现的时候先路由一次
private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {
Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
List<Router> routers = getRouters();
if (routers == null) return invokers;
for (Router router : routers) {
if (router.getUrl() != null && !router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
return invokers;
}

4 调用源:

private void executeRoute(Map<String, List<Invoker<T>>> method2Invokers) {
if (serviceMethods == null || serviceMethods.length == 0) return;
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = method2Invokers.get(method);
if (methodInvokers == null || methodInvokers.size() == 0) {
methodInvokers = method2Invokers.get(Constants.ANY_VALUE);
}
method2Invokers.put(method, route(methodInvokers, method));//这里调用rute
}
}

调用源:

/**
* 将invokers列表转成与方法的映射关系
*
* @param invokersMap Invoker列表
* @return Invoker与方法的映射关系
*/
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = groupByMethod(invokersMap.values());
executeRoute(newMethodInvokerMap);

// sort and unmodifiable
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}

 调用源:

/**
* 根据invokerURL列表转换为invoker列表。转换规则如下:
* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
* 2.如果传入的invoker列表不为空,则表示最新的invoker列表
* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
*
* @param invokerUrls 传入的参数不能为null
*/
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
overrideDirectoryUrl = overrideDirectoryUrl.addParameters(Constants.INVOKER_INSIDE_INVOKERS_KEY, "", Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, "0");
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.lastInvokerUrls != null) {
invokerUrls.addAll(
this.lastInvokerUrls);
} else {
this.lastInvokerUrls = new HashSet<URL>();
this.lastInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}

if (invokerUrls.size() == 0) return;

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
// state change
//如果计算错误,则不进行处理.
//比如所有provider都标记为下线状态,这样其实最后一个provider是下不了的
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
Map<String
, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表

this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap
, newUrlInvokerMap); // 关闭未使用的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}

List<String> invokerUrlString =
new ArrayList<String>();
for (URL invoker : invokerUrls) {
invokerUrlString.add(invoker.toString())
;
}
overrideDirectoryUrl = overrideDirectoryUrl.addParameters(
Constants.
INVOKER_INSIDE_INVOKERS_KEY, URL.encode(CollectionUtils.join(invokerUrlString, ";")),
Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, String.valueOf(invokerUrls.size()));
}
}

7 最终就是回调函数 com.alibaba.dubbo.registry.integration.RegistryDirectory#notify

public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();

for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}

// routers
if (routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}

// configurators
if (configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}

if (configuratorUrls.size() > 0) {
logger.info("unconfigured directory url without provider params: " + this.directoryUrl
+ ", configured directory url without provider params: " + this.overrideDirectoryUrl);
}

// providers
refreshInvoker(invokerUrls);
}