1)首先在初始化nettyServer和nettyClient时候都设置了NettyHandler,那么它的ChannelHandler handler是下面的这条链
MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler
这个链怎么来的
1、DubboProtocol的initClient(URL url)
client = Exchangers.connect(url ,requestHandler);
2、Exchangers的ExchangeClient connect(URL url, ExchangeHandler handler)
getExchanger(url).connect(url, handler);
3、HeaderExchanger的connect
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
到这里可以得到最后三节
4、然后到NettyClient的doOpen()
NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
传入的是this自己
new方法传入的handler即为DecodeHandler
5、父方法wrapChannelHandler方法return ChannelHandlers.wrap(handler, url);
6、ChannelHandlers的wrapInternal
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
即可得到上面的链。
2)消费者调用,消费者端的配置最后得到DubboInvoker,通过代理生成了代理对象,这个时候调用接口即为使用Invoker的invoke方法
1、DubboInvoker的doInvoke方法,即为向服务端发送数据,最后else是最常用的同步处理方式,调用完后线程会阻塞在这里
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
2、currentClient.request(inv, timeout)->HeaderExchangeClient.request->HeaderExchangeChannel.request->NettyClient.send->NettyChannel.send
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
这边结束即通过netty把封装的request发送到了服务端,返回了一个DefaultFuture
3)服务生产者端处理,服务端的netty在收到数据时候,触发NettyHandler的messageReceived方法,也就会触发上面的那一条链
1、当走到HeaderExchangeHandler.received时候
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
处理请求,然后把结果写入channel
2、handleRequest->Object result = handler.reply(channel, msg)->DubboProtocol.requestHandler.reply-> return invoker.invoke(inv);
在服务端暴漏服务时候,会对每个服务生成一个exporter(持有对应的invoker),invoker是使用proxyFactory.getInvoker生成的
所有的exporter保存在exporterMap中,key是由serviceKey方法确定,在得到invoker后调用invoke会使用反射来处理请求
4)服务端处理完成后,将响应写回客户端,就会触发客户端的NettyHandler的messageReceived方法,这里没有讨论编解码
1、当走到HeaderExchangeHandler.received时候
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
2、handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
5)DefaultFuture
这个类中有static final Map<Long, Channel> CHANNELS
final Map<Long, DefaultFuture> FUTURES 用来保存所有的响应DefaultFuture,key为请求响应ID
1、在客户端发送结束后调用get方法阻塞线程
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
2、客户端收到响应后需要唤醒阻塞的线程,调用了received方法,这里就根据请求响应ID来找到对应的DefaultFuture,
进而找到对应线程的condition,唤醒后返回结果
public static void received(Channel channel, Response response) { try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}