1. 背景
在业务处理完之后,需要调用其他系统的接口,将相应的处理结果通知给对方,若是同步请求,假如调用的系统出现异常或是宕机等事件,会导致自身业务受到影响,事务会一直阻塞,数据库连接不够用等异常现象,可以通过异步回调来防止阻塞,但异步的情况还存在一个问题,若调用一次不成功的话接下来怎么处理?这个地方就需要按时间梯度回调,比如前期按10s间隔回调,回调3次,若不成功按30s回调,回调2次,再不成功按分钟回调,依次类推……相当于给了对方系统恢复的时间,不可能一直处于异常或宕机等异常状态,若是再不成功可以再通过人工干预的手段去处理了,具体业务具体实现。
2. 技术实现
大体实现思路如下图,此过程用到两个队列,当前队列和next队列,当前队列用来存放第一次需要回调的数据对象,如果调用不成功则放入next队列,按照制定的时间策略再继续回调,直到成功或最终持久化后人工接入处理。
用到的技术如下:
•http请求库,retrofit2
•队列,linkedblockingqueue
•调度线程池,scheduledexecutorservice
3. 主要代码说明
3.1 回调时间梯度的策略设计
采用枚举来对策略规则进行处理,便于代码上的维护,该枚举设计三个参数,级别、回调间隔、回调次数;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/**
* 回调策略
*/
public enum callbacktype {
//等级1,10s执行3次
seconds_10( 1 , 10 , 3 ),
//等级2,30s执行2次
seconds_30( 2 , 30 , 2 ),
//等级3,60s执行2次
minute_1( 3 , 60 , 2 ),
//等级4,5min执行1次
minute_5( 4 , 300 , 1 ),
//等级5,30min执行1次
minute_30( 5 , 30 * 60 , 1 ),
//等级6,1h执行2次
hour_1( 6 , 60 * 60 , 1 ),
//等级7,3h执行2次
hour_3( 7 , 60 * 60 * 3 , 1 ),
//等级8,6h执行2次
hour_6( 8 , 60 * 60 * 6 , 1 );
//级别
private int level;
//回调间隔时间 秒
private int intervaltime;
//回调次数
private int count;
}
|
3.2 数据传输对象设计
声明抽象父类,便于其他对象调用传输继承。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/**
* 消息对象父类
*/
public abstract class messageinfo {
//开始时间
private long starttime;
//更新时间
private long updatetime;
//是否回调成功
private boolean issuccess= false ;
//回调次数
private int count= 0 ;
//回调策略
private callbacktype callbacktype;
}
|
要传输的对象,继承消息父类;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/**
* 工单回调信息
*/
public class workordermessage extends messageinfo {
//车架号
private string vin;
//工单号
private string workorderno;
//工单状态
private integer status;
//工单原因
private string reason;
//操作用户
private integer userid;
}
|
3.3 调度线程池的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
//声明线程池,大小为16
private scheduledexecutorservice pool = executors.newscheduledthreadpool( 16 );
...略
while ( true ){
//从队列获取数据,交给定时器执行
try {
workordermessage message = messagequeue.getmessagefromnext();
long excuetime = message.getupdatetime()+message.getcallbacktype().getintervaltime()* 1000 ;
long t = excuetime - system.currenttimemillis();
if (t/ 1000 < 5 ) { //5s之内将要执行的数据提交给调度线程池
system.out.println( "messagehandlenext-满足定时器执行条件" +jsonobject.tojsonstring(message));
pool.schedule( new callable< boolean >() {
@override
public boolean call() throws exception {
remotecallback(message);
return true ;
}
}, t, timeunit.milliseconds);
} else {
messagequeue.putmessagetonext(message);
}
} catch (interruptedexception e) {
system.out.println(e);
}
}
|
3.4 retrofit2的使用,方便好用。
具体可查看官网相关文档进行了解,用起来还是比较方便的。
retrofit初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import retrofit2.retrofit;
import retrofit2.converter.gson.gsonconverterfactory;
public class retrofithelper {
private static final string http_url = "http://baidu.com/" ;
private static retrofit retrofit;
public static retrofit instance(){
if (retrofit == null ){
retrofit = new retrofit.builder()
.baseurl(http_url)
.addconverterfactory(gsonconverterfactory.create())
.build();
}
return retrofit;
}
}
|
如果需要修改超时时间,连接时间等可以这样初始话,retrofit采用okhttpclient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import okhttp3.okhttpclient;
import retrofit2.retrofit;
import retrofit2.converter.gson.gsonconverterfactory;
import java.util.concurrent.timeunit;
public class retrofithelper {
private static final string http_url = "http://baidu.com/" ;
private static retrofit retrofit;
public static retrofit instance(){
if (retrofit == null ){
retrofit = new retrofit.builder()
.baseurl(http_url)
.client( new okhttpclient.builder()
.connecttimeout( 30 , timeunit.seconds) //连接时间
.readtimeout( 30 , timeunit.seconds) //读时间
.writetimeout( 30 , timeunit.seconds) //写时间
.build())
.addconverterfactory(gsonconverterfactory.create())
.build();
}
return retrofit;
}
}
|
retrofit使用通过接口调用,要先声明一个接口;
1
2
3
4
5
6
7
8
9
|
import com.alibaba.fastjson.jsonobject;
import com.woasis.callbackdemo.bean.workordermessage;
import retrofit2.call;
import retrofit2.http.body;
import retrofit2.http.post;
public interface workordermessageinterface {
@post ( "/api" )
call<jsonobject> updatebatteryinfo( @body workordermessage message);
}
|
接口和实例对象准备好了,接下来就是调用;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
private void remotecallback(workordermessage message){
//实例接口对象
workordermessageinterface workordermessageinterface = retrofithelper.instance().create(workordermessageinterface. class );
//调用接口方法
call<jsonobject> objectcall = workordermessageinterface.updatebatteryinfo(message);
system.out.println( "远程调用执行:" + new date());
//异步调用执行
objectcall.enqueue( new callback<jsonobject>() {
@override
public void onresponse(call<jsonobject> call, response<jsonobject> response) {
system.out.println( "messagehandlenext****调用成功" +thread.currentthread().getid());
message.setsuccess( true );
system.out.println( "messagehandlenext-回调成功" +jsonobject.tojsonstring(message));
}
@override
public void onfailure(call<jsonobject> call, throwable throwable) {
system.out.println( "messagehandlenext++++调用失败" +thread.currentthread().getid());
//失败后再将数据放入队列
try {
//对回调策略初始化
long currenttime = system.currenttimemillis();
message.setupdatetime(currenttime);
message.setsuccess( false );
callbacktype callbacktype = message.getcallbacktype();
//获取等级
int level = callbacktype.getlevel(callbacktype);
//获取次数
int count = callbacktype.getcount(callbacktype);
//如果等级已经最高,则不再回调
if (callbacktype.hour_6.getlevel() == callbacktype.getlevel() && count == message.getcount()){
system.out.println( "messagehandlenext-等级最高,不再回调, 线下处理:" +jsonobject.tojsonstring(message));
} else {
//看count是否最大,count次数最大则增加level
if (message.getcount()<callbacktype.getcount()){
message.setcount(message.getcount()+ 1 );
} else { //如果不小,则增加level
message.setcount( 1 );
level += 1 ;
message.setcallbacktype(callbacktype.gettypebylevel(level));
}
messagequeue.putmessagetonext(message);
}
} catch (interruptedexception e) {
e.printstacktrace();
system.out.println( "messagehandlenext-放入队列数据失败" );
}
}
});
}
|
3.5结果实现
4.总结
本次实现了按照时间梯度去相应其他系统的接口,不再导致本身业务因其他系统的异常而阻塞。
源码:https://github.com/liuzwei/callback-demo
以上所述是小编给大家介绍的java按时间梯度实现异步回调接口,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://www.cnblogs.com/soinve/p/9555151.html