来看一下客户端请求代码:
1 DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理
2 String hello = demoService.sayHello("world"); // 执行远程方法
在8.2 构建客户端源码解析中我们看到最终得到的demoService是一个proxy0代理对象。现在来分析第二行代码。
一 客户端请求总体流程
//代理发出请求
proxy0.sayHello(String paramString)-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
-->new RpcInvocation(method, args)
-->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
//ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
-->AbstractClusterInvoker.invoke(final Invocation invocation)
//获取Invokers
-->list(Invocation invocation)
-->AbstractDirectory.list(Invocation invocation)
-->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
-->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
//获取负载均衡器
-->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
-->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
-->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
//使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
-->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
-->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
//执行listener和filter链
-->ListenerInvokerWrapper.invoke
-->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
-->FutureFilter.invoke(Invocation invocation)
-->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
-->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
-->DubboInvoker.doInvoke(final Invocation invocation)
//获取ExchangeClient进行消息的发送
-->ReferenceCountExchangeClient.request(Object request, int timeout)
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
-->AbstractClient.send(Object message, boolean sent)//NettyClient的父类
-->getChannel()//NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
-->NettyChannel.send(Object message, boolean sent)
-->NioClientSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
总体流程:
- 将请求参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成一个Invocation
- 附加参数中的path:即接口名,将会用于服务端接收请求信息后从exportMap中选取Exporter实例
- 方法名,方法参数类型,方法参数值:将用于JavassistProxyFactory$AbstractProxyInvoker执行对应的方法
- 使用Directory从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello(指定方法名)的List<Invoker<T>>
- 使用Router对上述的List<Invoker<T>>再进行一次过滤,得到subList
- 使用LoadBalancer从subList中再获取一个Invoker,实际上是InvokerDelegete实例
- 使用InvokerDelegete实例执行真正的DubboInvoker的listener和filter链,然后执行到真正的DubboInvoker
- DubboInvoker使用NettyClient向服务端发出了请求
二 源码分析
首先来看proxy0.sayHello
1 public String sayHello(String paramString) {
2 Object[] arrayOfObject = new Object[1];
3 arrayOfObject[0] = paramString;
4 Object localObject = null;
5 try {
6 localObject = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrayOfObject);
7 } catch (Throwable e) {
8 // TODO Auto-generated catch block
9 e.printStackTrace();
10 }
11 return (String) localObject;
12 }
这里的handler就是InvokerInvocationHandler
1 public class InvokerInvocationHandler implements InvocationHandler {
2 private final Invoker<?> invoker;//MockClusterInvoker实例
3
4 public InvokerInvocationHandler(Invoker<?> handler) {
5 this.invoker = handler;
6 }
7
8 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
9 String methodName = method.getName();
10 Class<?>[] parameterTypes = method.getParameterTypes();
11 if (method.getDeclaringClass() == Object.class) {
12 return method.invoke(invoker, args);
13 }
14 if ("toString".equals(methodName) && parameterTypes.length == 0) {
15 return invoker.toString();
16 }
17 if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
18 return invoker.hashCode();
19 }
20 if ("equals".equals(methodName) && parameterTypes.length == 1) {
21 return invoker.equals(args[0]);
22 }
23 return invoker.invoke(new RpcInvocation(method, args)).recreate();
24 }
25 }
首先将请求参数封装成一个RpcInvocation实例,如下:
-->String methodName=sayHello
-->Class<?>[] parameterTypes=[class java.lang.String]
-->Object[] arguments=[world]
-->Map<String, String> attachments={}
之后使用MockClusterInvoker.invoke(Invocation invocation)进行远程调用:
1 private final Directory<T> directory;//RegistryDirectory
2 private final Invoker<T> invoker;//FailoverClusterInvoker
3
4 /**
5 * 这里实际上会根据配置的mock参数来做服务降级:
6 * 1 如果没有配置mock参数或者mock=false,则进行远程调用;
7 * 2 如果配置了mock=force:return null,则直接返回null,不进行远程调用;
8 * 3 如果配置了mock=fail:return null,先进行远程调用,失败了在进行mock调用。
9 */
10 public Result invoke(Invocation invocation) throws RpcException {
11 Result result = null;
12 //sayHello.mock->mock->default.mock
13 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
14 if (value.length() == 0 || value.equalsIgnoreCase("false")) {
15 //no mock
16 result = this.invoker.invoke(invocation);
17 } else if (value.startsWith("force")) {
18 if (logger.isWarnEnabled()) {
19 logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
20 }
21 //force:direct mock
22 result = doMockInvoke(invocation, null);
23 } else {
24 //fail-mock
25 try {
26 result = this.invoker.invoke(invocation);
27 } catch (RpcException e) {
28 if (e.isBiz()) {
29 throw e;
30 } else {
31 if (logger.isWarnEnabled()) {
32 logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
33 }
34 result = doMockInvoke(invocation, e);
35 }
36 }
37 }
38 return result;
39 }
注意:这里可以做服务降级,后续会说。
之后调用FailoverClusterInvoker.invoke方法,该方法在其父类AbstractClusterInvoker中,
1 protected final Directory<T> directory;//RegistryDirectory
2
3 public Result invoke(final Invocation invocation) throws RpcException {
4 ...
5 LoadBalance loadbalance;
6
7 List<Invoker<T>> invokers = list(invocation);
8 if (invokers != null && invokers.size() > 0) {
9 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
10 .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
11 } else {
12 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
13 }
14 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//异步调用加调用ID
15 return doInvoke(invocation, invokers, loadbalance);
16 }
17
18 protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
19 List<Invoker<T>> invokers = directory.list(invocation);
20 return invokers;
21 }
首先是获取一个List<Invoker<T>>,之后获取一个LoadBalance,最后调用doInvoke进行调用。
首先来看通过RegistryDirectory.list(Invocation invocation),该方法在RegistryDirectory的父类AbstractDirectory中:
1 private volatile List<Router> routers;
2 public List<Invoker<T>> list(Invocation invocation) throws RpcException {
3 ...
4 List<Invoker<T>> invokers = doList(invocation);
5 List<Router> localRouters = this.routers; // local reference
6 if (localRouters != null && localRouters.size() > 0) {
7 for (Router router : localRouters) {
8 try {
9 if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
10 invokers = router.route(invokers, getConsumerUrl(), invocation);
11 }
12 } catch (Throwable t) {
13 logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
14 }
15 }
16 }
17 return invokers;
18 }
首先执行doList(invocation)方法获取出List<Invoker<T>>,之后使用router循环过滤,最后返回过滤后的List<Invoker<T>>。
RegistryDirectory.doList(invocation)
1 public List<Invoker<T>> doList(Invocation invocation) {
2 ...
3 List<Invoker<T>> invokers = null;
4 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
5 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
6 String methodName = RpcUtils.getMethodName(invocation);
7 Object[] args = RpcUtils.getArguments(invocation);
8 if (args != null && args.length > 0 && args[0] != null
9 && (args[0] instanceof String || args[0].getClass().isEnum())) {
10 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由 sayHello.world
11 }
12 if (invokers == null) {
13 invokers = localMethodInvokerMap.get(methodName);
14 }
15 if (invokers == null) {
16 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
17 }
18 if (invokers == null) {
19 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
20 if (iterator.hasNext()) {
21 invokers = iterator.next();
22 }
23 }
24 }
25 return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
26 }
其中Map<String, List<Invoker<T>>> methodInvokerMap在8.2 构建客户端源码解析已经初始化好了:
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[provider1的RegistryDirectory$InvokerDelegete实例, provider2的RegistryDirectory$InvokerDelegete实例], *=[provider1的RegistryDirectory$InvokerDelegete实例, provider2的RegistryDirectory$InvokerDelegete实例]}
这里根据方法名sayHello取出两个RegistryDirectory$InvokerDelegete实例。最后通过Router进行过滤,这里只有一个Router,就是MockInvokersSelector。
1 public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
2 URL url, final Invocation invocation) throws RpcException {
3 if (invocation.getAttachments() == null) {
4 return getNormalInvokers(invokers);
5 } else {
6 String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
7 if (value == null)
8 return getNormalInvokers(invokers);
9 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
10 return getMockedInvokers(invokers);
11 }
12 }
13 return invokers;
14 }
15
16 private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
17 if (!hasMockProviders(invokers)) {
18 return invokers;
19 } else {
20 List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
21 for (Invoker<T> invoker : invokers) {
22 if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
23 sInvokers.add(invoker);
24 }
25 }
26 return sInvokers;
27 }
28 }
这里直接返回了。到此就已经选出可以被调用的RegistryDirectory$InvokerDelegete实例子集了。记下来先获取负载均衡器,默认是RandomLoadBalance。最后执行FailoverClusterInvoker.
doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance):
1 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
2 List<Invoker<T>> copyinvokers = invokers;
3 checkInvokers(copyinvokers, invocation);
4 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//默认是2+1次
5 if (len <= 0) {
6 len = 1;
7 }
8 // retry loop.
9 RpcException le = null; // last exception.
10 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
11 Set<String> providers = new HashSet<String>(len);
12 for (int i = 0; i < len; i++) {
13 //重试时,进行重新选择,避免重试时invoker列表已发生变化.
14 //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
15 if (i > 0) {
16 checkWhetherDestroyed();
17 copyinvokers = list(invocation);
18 //重新检查一下
19 checkInvokers(copyinvokers, invocation);
20 }
21 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
22 invoked.add(invoker);
23 RpcContext.getContext().setInvokers((List) invoked);
24 try {
25 Result result = invoker.invoke(invocation);
26 ...
27 return result;
28 } catch (RpcException e) {
29 if (e.isBiz()) { // biz exception.
30 throw e;
31 }
32 le = e;
33 } catch (Throwable e) {
34 le = new RpcException(e.getMessage(), e);
35 } finally {
36 providers.add(invoker.getUrl().getAddress());
37 }
38 }
39 throw new RpcException(le ...);
40 }
首先使用负载均衡器获取一个RegistryDirectory$InvokerDelegete实例,然后使用选出的RegistryDirectory$InvokerDelegete.invoke进行请求发送。
1 protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
2 ...
3 Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
4 ..
5 return invoker;
6 }
7
8 private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
9 if (invokers == null || invokers.size() == 0)
10 return null;
11 if (invokers.size() == 1)
12 return invokers.get(0);
13 // 如果只有两个invoker,并且其中一个已经有至少一个被选过了,退化成轮循
14 if (invokers.size() == 2 && selected != null && selected.size() > 0) {
15 return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
16 }
17 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
18
19 //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
20 if ((selected != null && selected.contains(invoker))
21 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
22 try {
23 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
24 ...
25 } catch (Throwable t) {
26 ...
27 }
28 }
29 return invoker;
30 }
RandomLoadBalance.doSelect
1 private final Random random = new Random();
2
3 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4 int length = invokers.size(); // 总个数
5 ...//权重计算
6 // 如果权重相同或权重为0则均等随机
7 return invokers.get(random.nextInt(length));
8 }
最后来看RegistryDirectory$InvokerDelegete.invoke,该方法实际在其父类InvokerWrapper中:
1 private final Invoker<T> invoker;//ListenerInvokerWrapper
2
3 public Result invoke(Invocation invocation) throws RpcException {
4 return invoker.invoke(invocation);
5 }
ListenerInvokerWrapper.invoke
1 private final Invoker<T> invoker;//ProtocolFilterWrapper$Invoker
2
3 public Result invoke(Invocation invocation) throws RpcException {
4 return invoker.invoke(invocation);
5 }
之后就会执行一系列的filter,这些filter后续会讲,现在直接执行到DubboInvoker.invoke,实际上该方法在其父类AbstractInvoker中,AbstractInvoker又调用了DubboInvoker.doInvoke:
1 private final ExchangeClient[] clients;
2
3 protected Result doInvoke(final Invocation invocation) throws Throwable {
4 RpcInvocation inv = (RpcInvocation) invocation;
5 final String methodName = RpcUtils.getMethodName(invocation);
6 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
7 inv.setAttachment(Constants.VERSION_KEY, version);
8
9 ExchangeClient currentClient;
10 if (clients.length == 1) {
11 currentClient = clients[0];//单一长连接。默认
12 } else {
13 currentClient = clients[index.getAndIncrement() % clients.length];
14 }
15 try {
16 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
17 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
18 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
19 if (isOneway) {
20 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
21 currentClient.send(inv, isSent);
22 RpcContext.getContext().setFuture(null);
23 return new RpcResult();
24 } else if (isAsync) {
25 ResponseFuture future = currentClient.request(inv, timeout);
26 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
27 return new RpcResult();
28 } else {
29 RpcContext.getContext().setFuture(null);
30 return (Result) currentClient.request(inv, timeout).get();
31 }
32 } catch (TimeoutException e) {
33 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34 } catch (RemotingException e) {
35 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
36 }
37 }
其中ExchangeClient[] clients在8.2 构建客户端源码解析已经被初始化好了:
1 ExchangeClient[] clients = [ReferenceCountExchangeClient实例]//如果设置了多条连接,此处有多个client
ReferenceCountExchangeClient.request
1 private ExchangeClient client;//HeaderExchangeClient
2
3 public ResponseFuture request(Object request, int timeout) throws RemotingException {
4 return client.request(request, timeout);
5 }
HeaderExchangeClient.request
1 private final ExchangeChannel channel;//HeaderExchangeChannel
2
3 public ResponseFuture request(Object request, int timeout) throws RemotingException {
4 return channel.request(request, timeout);
5 }
HeaderExchangeChannel.request
1 private final Channel channel;//NettyClient
2
3 public ResponseFuture request(Object request, int timeout) throws RemotingException {
4 if (closed) {
5 throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
6 }
7 // create request.
8 Request req = new Request();
9 req.setVersion("2.0.0");
10 req.setTwoWay(true);
11 req.setData(request);
12 DefaultFuture future = new DefaultFuture(channel, req, timeout);
13 try {
14 channel.send(req);
15 } catch (RemotingException e) {
16 future.cancel();
17 throw e;
18 }
19 return future;
20 }
上边的channel是NettyClient实例,这里的send实际上是调用其父类AbstractClient的父类AbstractPeer,AbstractPeer调用AbstractClient.send:
1 public void send(Object message, boolean sent) throws RemotingException {
2 if (send_reconnect && !isConnected()) {
3 connect();
4 }
5 Channel channel = getChannel();//NettyChannel
6 //TODO getChannel返回的状态是否包含null需要改进
7 if (channel == null || !channel.isConnected()) {
8 throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
9 }
10 channel.send(message, sent);
11 }
NettyChannel.send
1 private final org.jboss.netty.channel.Channel channel;//NioClientSocketChannel
2
3 public void send(Object message, boolean sent) throws RemotingException {
4 super.send(message, sent);
5
6 boolean success = true;
7 int timeout = 0;
8 try {
9 ChannelFuture future = channel.write(message);
10 if (sent) {
11 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
12 success = future.await(timeout);
13 }
14 Throwable cause = future.getCause();
15 if (cause != null) {
16 throw cause;
17 }
18 } catch (Throwable e) {
19 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
20 }
21
22 if (!success) {
23 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
24 + "in timeout(" + timeout + "ms) limit");
25 }
26 }
这里就执行到了netty内部,通过netty自己的NioClientSocketChannel将消息发送给服务端。(这里发送之前有编码行为,后续会讲)