属于协议层的实现部分

时间:2022-03-20 06:32:40

  Netty在Java NIO范围根基算是独有鳌头,涉及到高性能网络通信,根基城市以Netty为底层通信框架,Dubbo 也不例外。以下将以Dubbo实现为例介绍其是如安在NIO非梗阻通信根本上实现同步通信的。

Dubbo为一种RPC通信框架,供给进程间的通信,在使用dubbo协议+Netty作为传输层时,供给三种API挪用方法:

同步接口

异步带回调接口

异步不带回调接口

同步接口适用在大部分环境,通信方法简单、可靠,客户端倡议挪用,期待处事端措置惩罚惩罚,挪用功效同步返回。这种方法下,在高吞吐、高性能(响应时间很快)的处事接口场景中最为适用,可以减少异步带来的特别的消耗,也便利客户端做一致性保证。

异步带回调接口,用在任务措置惩罚惩罚时间较长,客户端应用线程不愿梗阻期待,而是为了提高自身措置惩罚惩罚能力但愿处事端措置惩罚惩罚完成后可以异步通知应用线程。这种方法可以大大提升客户真个吞吐量,制止因为处事真个耗时问题拖死客户端。

异步不带回调接口,一些场景为了进一步提升客户真个吞吐能力,只需倡议一次处事端挪用,不需关系挪用功效,可以使用此种通信方法。一般在不需要严格保证数据一致性或者有其他赔偿法子的情况下,选用这种,可以最小化长途挪用带来的性能损耗。

    

来看一下Dubbo是如何实现这三种API的。核心代码在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,如下图对应的位置,属于协议层的实现部分。为便利大家可以准确定位代码地址位置,使用截图的方法,而不是直接贴代码了。

上文描述的是三种API方法,Dubbo里面通过参数isOneway、isAsync来控制,isOneway=true暗示异步不带回调,isAsync=true暗示异步带回调,否则是同步API。具体是如何控制,看以下代码:

isOneway==true时,客户端send完请求后,直接return一个空功效的RpcResult;isAsync==true时,客户端倡议请求,设置一个ResponseFuture,直接return一个空功效的RpcResult,接下来当处事端措置惩罚惩罚完成,客户端Netty层在收到响应后会通过Future通知应用线程;最后是同步情况下,客户端倡议请求,并通过get()要领梗阻期待处事真个响应功效。

异步API情况下,结合NIO模型对照好理解是如何实现的(固然需要先了解NIO的reactor模型),接下来重点理解下,这个get()梗阻要领是如何做到基于非梗阻NIO实现同步梗阻效果。

直接进入get()要领内部。

可以看到是操作Java的锁机制实现,循环判断是否收到响应,如果收到或者期待超时则返回。done的实例东西如下:

private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition();

使用可重入锁ReentrantLock,获取一个Condition东西在其上做await操纵。这里有await操纵,,何时被唤醒呢,有两个条件,第一个是期待timeout超时,默认dubbo是1s,第二个就是被其他线程唤醒,即收到了处事真个响应。

signal信号一发出,上文循环检测内的await操纵会当即返回,下一次isDone判断会酿成true,直接跳出循环。

仔细看代码会发明,被唤醒的处所还有一个是在DefaultFuture内部有一个超时轮询检测的线程,这个线程主要是措置惩罚惩罚响应超时后触发资源回收、记录异平日志等操纵。    

private static class RemotingInvocationTimeoutScan implements Runnable { public void run() { while (true) { try { for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } } static { Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start(); }