看这里:上一篇文章介绍了ReferenceBean的获取过程,这里介绍如何根据registryUrl和RegistryFactory获取注册器,最终注册中心回调服务消费者暴露的回调接口,来对服务提供者的服务引用(refer),生成对应的可执行对象invoker。
1
@SuppressWarnings("unchecked")
private Invoker<?> getClusterInvoker(List<URL> urls) {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));//这里爆发
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
if (useUnitCluster()) {
URL u = registryURL.addParameterIfAbsent(Constants.REGISTRY_KEY + "." + Constants.CLUSTER_KEY, FailfastCluster.NAME);
return registryCluster.join(new UnitDirectory(u, invokers));
} else {
URL u = registryURL.addParameterIfAbsent(Constants.REGISTRY_KEY + "." + Constants.CLUSTER_KEY, AvailableCluster.NAME);
return registryCluster.join(new StaticDirectory(u, invokers));
}
} else { // 不是 注册中心的URL
return cluster.join(new StaticDirectory(invokers));
}
}
2 看此方法
refprotocol.refer(interfaceClass, url)
其接口原型(com.alibaba.dubbo.rpc.Protocol):
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
仔细体会注释中的:refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。
根据传入的url
registry://127.0.0.1:9098/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=2484&refer=application%3Ddemo-consumer%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D2484%26side%3Dconsumer%26timestamp%3D1415879965901®istry=zookeeper×tamp=1415879990670
根据registry://协议 自适应到 com.alibaba.dubbo.registry.integration.RegistryProtocol 中的refer方法
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);//重新设置协议--为了下一步获取注册器 默认 DEFAULT_REGISTRY = "dubbo";
Registry registry = registryFactory.getRegistry(url);//此处获取可以自适应到 com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry 或者 默认 com.alibaba.dubbo.registry.dubbo.DubboRegistryFactory
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
经过上面url.setProtocol(...)方法后新的url 举例
zookeeper://127.0.0.1:9098/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=4524&refer=application%3Ddemo-consumer%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4524%26side%3Dconsumer%26timestamp%3D1415881461048×tamp=1415881461113
3 根据自适应获取的注册中心ZookeeperRegistry,要获取的服务类型和重新设置协议后的url获取服务类型
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().
removeParameter(Constants.CONSUMER_HOST_KEY).getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
4 构建引用服务的 subscribeUrl
consumer://10.5.24.221/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8536&side=consumer×tamp=1415945205031
5 构建目录服务 RegistryDirectory 另一文章再单独了解
6 构建订阅消费者订阅的 url 举例
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
10.33.37.4 or 10.5.24.221 需要调试验证
consumer://10.33.37.4/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9692&side=consumer×tamp=1415967547508
7 注册中心 回调服务消费者暴露的回调接口来对服务供者的服务进行引用 , refer生成对应的可执行对象 invoker。服务供者与服务的消费建立连接
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// routers
if (routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
// configurators
if (configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
if (configuratorUrls.size() > 0) {
logger.info("unconfigured directory url without provider params: " + this.directoryUrl
+ ", configured directory url without provider params: " + this.overrideDirectoryUrl);
}
// providers
refreshInvoker(invokerUrls);
}
8 找到对应的服务提供者Invoker 举例
dubbo://10.33.37.4:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.4-SNAPSHOT&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=9828&side=provider×tamp=1415968955329