必要性
前面我们实现了通用的 rpc,但是存在一个问题,同步获取响应的时候没有超时处理。
如果 server 挂掉了,或者处理太慢,客户端也不可能一直傻傻的等。
当外部的调用超过指定的时间后,就直接报错,避免无意义的资源消耗。
思路
调用的时候,将开始时间保留。
获取的时候检测是否超时。
同时创建一个线程,用来检测是否有超时的请求。
实现
思路
调用的时候,将开始时间保留。
获取的时候检测是否超时。
同时创建一个线程,用来检测是否有超时的请求。
超时检测线程
为了不影响正常业务的性能,我们另起一个线程检测调用是否已经超时。
- packagecom.github.houbb.rpc.client.invoke.impl;
- importcom.github.houbb.heaven.util.common.ArgUtil;
- importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
- importcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
- importcom.github.houbb.rpc.common.support.time.impl.Times;
- importjava.util.Map;
- importjava.util.concurrent.ConcurrentHashMap;
- /**
- *超时检测线程
- *@authorbinbin.hou
- *@since0.0.7
- */
- publicclassTimeoutCheckThreadimplementsRunnable{
- /**
- *请求信息
- *@since0.0.7
- */
-
privatefinalConcurrentHashMap
requestMap; ,long> - /**
- *请求信息
- *@since0.0.7
- */
-
privatefinalConcurrentHashMap
responseMap; ,rpcresponse> - /**
- *新建
- *@paramrequestMap请求Map
- *@paramresponseMap结果map
- *@since0.0.7
- */
-
publicTimeoutCheckThread(ConcurrentHashMap
requestMap, ,long> -
ConcurrentHashMap
responseMap){ ,rpcresponse> - ArgUtil.notNull(requestMap,"requestMap");
- this.requestMap=requestMap;
- this.responseMap=responseMap;
- }
- @Override
- publicvoidrun(){
-
for(Map.Entry
entry:requestMap.entrySet()){ ,long> - longexpireTime=entry.getValue();
- longcurrentTime=Times.time();
- if(currentTime>expireTime){
- finalStringkey=entry.getKey();
- //结果设置为超时,从请求map中移除
- responseMap.putIfAbsent(key,RpcResponseFactory.timeout());
- requestMap.remove(key);
- }
- }
- }
- }
这里主要存储请求,响应的时间,如果超时,则移除对应的请求。
线程启动
在 DefaultInvokeService 初始化时启动:
- finalRunnabletimeoutThread=newTimeoutCheckThread(requestMap,responseMap);
- Executors.newScheduledThreadPool(1)
- .scheduleAtFixedRate(timeoutThread,60,60,TimeUnit.SECONDS);
DefaultInvokeService
原来的设置结果,获取结果是没有考虑时间的,这里加一下对应的判断。
设置请求时间
•添加请求 addRequest
会将过时的时间直接放入 map 中。
因为放入是一次操作,查询可能是多次。
所以时间在放入的时候计算完成。
- @Override
- publicInvokeServiceaddRequest(StringseqId,longtimeoutMills){
- LOG.info("[Client]startaddrequestforseqId:{},timeoutMills:{}",seqId,
- timeoutMills);
- finallongexpireTime=Times.time()+timeoutMills;
- requestMap.putIfAbsent(seqId,expireTime);
- returnthis;
- }
设置请求结果
•添加响应 addResponse
1.如果 requestMap 中已经不存在这个请求信息,则说明可能超时,直接忽略存入结果。
2.此时检测是否出现超时,超时直接返回超时信息。
3.放入信息后,通知其他等待的所有进程。
- @Override
- publicInvokeServiceaddResponse(StringseqId,RpcResponserpcResponse){
- //1.判断是否有效
- LongexpireTime=this.requestMap.get(seqId);
- //如果为空,可能是这个结果已经超时了,被定时job移除之后,响应结果才过来。直接忽略
- if(ObjectUtil.isNull(expireTime)){
- returnthis;
- }
- //2.判断是否超时
- if(Times.time()>expireTime){
- LOG.info("[Client]seqId:{}信息已超时,直接返回超时结果。",seqId);
- rpcResponse=RpcResponseFactory.timeout();
- }
- //这里放入之前,可以添加判断。
- //如果seqId必须处理请求集合中,才允许放入。或者直接忽略丢弃。
- //通知所有等待方
- responseMap.putIfAbsent(seqId,rpcResponse);
- LOG.info("[Client]获取结果信息,seqId:{},rpcResponse:{}",seqId,rpcResponse);
- LOG.info("[Client]seqId:{}信息已经放入,通知所有等待方",seqId);
- //移除对应的requestMap
- requestMap.remove(seqId);
- LOG.info("[Client]seqId:{}removefromrequestmap",seqId);
- synchronized(this){
- this.notifyAll();
- }
- returnthis;
- }
获取请求结果
•获取相应 getResponse
1.如果结果存在,直接返回响应结果
2.否则进入等待。
3.等待结束后获取结果。
- @Override
- publicRpcResponsegetResponse(StringseqId){
- try{
- RpcResponserpcResponse=this.responseMap.get(seqId);
- if(ObjectUtil.isNotNull(rpcResponse)){
- LOG.info("[Client]seq{}对应结果已经获取:{}",seqId,rpcResponse);
- returnrpcResponse;
- }
- //进入等待
- while(rpcResponse==null){
- LOG.info("[Client]seq{}对应结果为空,进入等待",seqId);
- //同步等待锁
- synchronized(this){
- this.wait();
- }
- rpcResponse=this.responseMap.get(seqId);
- LOG.info("[Client]seq{}对应结果已经获取:{}",seqId,rpcResponse);
- }
- returnrpcResponse;
- }catch(InterruptedExceptione){
- thrownewRpcRuntimeException(e);
- }
- }
可以发现获取部分的逻辑没变,因为超时会返回一个超时对象:RpcResponseFactory.timeout();
这是一个非常简单的实现,如下:
- packagecom.github.houbb.rpc.common.rpc.domain.impl;
- importcom.github.houbb.rpc.common.exception.RpcTimeoutException;
- importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
- /**
- *响应工厂类
- *@authorbinbin.hou
- *@since0.0.7
- */
- publicfinalclassRpcResponseFactory{
- privateRpcResponseFactory(){}
- /**
- *超时异常信息
- *@since0.0.7
- */
- privatestaticfinalDefaultRpcResponseTIMEOUT;
- static{
- TIMEOUT=newDefaultRpcResponse();
- TIMEOUT.error(newRpcTimeoutException());
- }
- /**
- *获取超时响应结果
- *@return响应结果
- *@since0.0.7
- */
- publicstaticRpcResponsetimeout(){
- returnTIMEOUT;
- }
- }
响应结果指定一个超时异常,这个异常会在代理处理结果时抛出:
- RpcResponserpcResponse=proxyContext.invokeService().getResponse(seqId);
- Throwableerror=rpcResponse.error();
- if(ObjectUtil.isNotNull(error)){
- throwerror;
- }
- returnrpcResponse.result();
测试代码
服务端
我们故意把服务端的实现添加沉睡,其他保持不变。
- publicclassCalculatorServiceImplimplementsCalculatorService{
- publicCalculateResponsesum(CalculateRequestrequest){
- intsum=request.getOne()+request.getTwo();
- //故意沉睡3s
- try{
- TimeUnit.SECONDS.sleep(3);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- returnnewCalculateResponse(true,sum);
- }
- }
客户端
设置对应的超时时间为 1S,其他不变:
- publicstaticvoidmain(String[]args){
- //服务配置信息
-
ReferenceConfig
config=newDefaultReferenceConfig (); - config.serviceId(ServiceIdConst.CALC);
- config.serviceInterface(CalculatorService.class);
- config.addresses("localhost:9527");
- //设置超时时间为1S
- config.timeout(1000);
- CalculatorServicecalculatorService=config.reference();
- CalculateRequestrequest=newCalculateRequest();
- request.setOne(10);
- request.setTwo(20);
- CalculateResponseresponse=calculatorService.sum(request);
- System.out.println(response);
- }
日志如下:
- .log.integration.adaptors.stdout.StdOutExImpl'adapter.
- [INFO][2021-10-0514:59:40.974][main][c.g.h.r.c.c.RpcClient.connect]-RPC服务开始启动客户端
- ...
- [INFO][2021-10-0514:59:42.504][main][c.g.h.r.c.c.RpcClient.connect]-RPC服务启动客户端完成,监听地址localhost:9527
- [INFO][2021-10-0514:59:42.533][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallremotewithrequest:DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb',createTime=1633417182525,serviceId='calc',methodName='sum',paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest],paramValues=[CalculateRequest{one=10,two=20}]}
- [INFO][2021-10-0514:59:42.534][main][c.g.h.r.c.i.i.DefaultInvokeService.addRequest]-[Client]startaddrequestforseqId:62e126d9a0334399904509acf8dfe0bb,timeoutMills:1000
- [INFO][2021-10-0514:59:42.535][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallchannelid:00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
- ...
- Exceptioninthread"main"com.github.houbb.rpc.common.exception.RpcTimeoutException
-
atcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.
(RpcResponseFactory.java:23) - atcom.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
- atcom.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
- atio.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
- atio.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
- atio.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
- atio.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
- atio.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
- atio.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
- atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
- atio.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
- atio.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
- atjava.lang.Thread.run(Thread.java:748)
- ...
- [INFO][2021-10-0514:59:45.615][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已超时,直接返回超时结果。
- [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]获取结果信息,seqId:62e126d9a0334399904509acf8dfe0bb,rpcResponse:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
- [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已经放入,通知所有等待方
- [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bbremovefromrequestmap
- [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.c.RpcClient.channelRead0]-[Client]responseis:DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb',error=null,result=CalculateResponse{success=true,sum=30}}
- [INFO][2021-10-0514:59:45.619][main][c.g.h.r.c.i.i.DefaultInvokeService.getResponse]-[Client]seq62e126d9a0334399904509acf8dfe0bb对应结果已经获取:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
- ...
可以发现,超时异常。
不足之处
对于超时的处理可以拓展为双向的,比如服务端也可以指定超时限制,避免资源的浪费。
原文链接:https://www.toutiao.com/a7018512258305278500/