Java按时间梯度实现异步回调接口的方法

时间:2022-09-12 09:55:57

1. 背景

  在业务处理完之后,需要调用其他系统的接口,将相应的处理结果通知给对方,若是同步请求,假如调用的系统出现异常或是宕机等事件,会导致自身业务受到影响,事务会一直阻塞,数据库连接不够用等异常现象,可以通过异步回调来防止阻塞,但异步的情况还存在一个问题,若调用一次不成功的话接下来怎么处理?这个地方就需要按时间梯度回调,比如前期按10s间隔回调,回调3次,若不成功按30s回调,回调2次,再不成功按分钟回调,依次类推……相当于给了对方系统恢复的时间,不可能一直处于异常或宕机等异常状态,若是再不成功可以再通过人工干预的手段去处理了,具体业务具体实现。

2. 技术实现

  大体实现思路如下图,此过程用到两个队列,当前队列和next队列,当前队列用来存放第一次需要回调的数据对象,如果调用不成功则放入next队列,按照制定的时间策略再继续回调,直到成功或最终持久化后人工接入处理。

  用到的技术如下:

•http请求库,retrofit2
•队列,linkedblockingqueue
•调度线程池,scheduledexecutorservice

Java按时间梯度实现异步回调接口的方法

3. 主要代码说明

3.1 回调时间梯度的策略设计

采用枚举来对策略规则进行处理,便于代码上的维护,该枚举设计三个参数,级别、回调间隔、回调次数;

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 回调策略
 */
public enum callbacktype {
  //等级1,10s执行3次
  seconds_10(1, 10, 3),
  //等级2,30s执行2次
  seconds_30(2, 30, 2),
  //等级3,60s执行2次
  minute_1(3, 60, 2),
  //等级4,5min执行1次
  minute_5(4, 300, 1),
  //等级5,30min执行1次
  minute_30(5, 30*60, 1),
  //等级6,1h执行2次
  hour_1(6, 60*60, 1),
  //等级7,3h执行2次
  hour_3(7, 60*60*3, 1),
  //等级8,6h执行2次
  hour_6(8, 60*60*6, 1);
 
  //级别
  private int level;
  //回调间隔时间 秒
  private int intervaltime;
  //回调次数
  private int count;
}

3.2 数据传输对象设计

声明抽象父类,便于其他对象调用传输继承。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 消息对象父类
 */
public abstract class messageinfo {
  //开始时间
  private long starttime;
  //更新时间
  private long updatetime;
  //是否回调成功
  private boolean issuccess=false;
  //回调次数
  private int count=0;
  //回调策略
  private callbacktype callbacktype;
}

要传输的对象,继承消息父类;

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 工单回调信息
 */
public class workordermessage extends messageinfo {
  //车架号
  private string vin;
  //工单号
  private string workorderno;
  //工单状态
  private integer status;
  //工单原因
  private string reason;
  //操作用户
  private integer userid;
}

3.3 调度线程池的使用

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//声明线程池,大小为16
private scheduledexecutorservice pool = executors.newscheduledthreadpool(16);
 
...略
 
while (true){
      //从队列获取数据,交给定时器执行
      try {
        workordermessage message = messagequeue.getmessagefromnext();
        long excuetime = message.getupdatetime()+message.getcallbacktype().getintervaltime()* 1000;
        long t = excuetime - system.currenttimemillis();
        if (t/1000 < 5) {//5s之内将要执行的数据提交给调度线程池
          system.out.println("messagehandlenext-满足定时器执行条件"+jsonobject.tojsonstring(message));
          pool.schedule(new callable<boolean>() {
            @override
            public boolean call() throws exception {
              remotecallback(message);
              return true;
            }
          }, t, timeunit.milliseconds);
        }else {
          messagequeue.putmessagetonext(message);
        }
      } catch (interruptedexception e) {
        system.out.println(e);
      }
    }

3.4 retrofit2的使用,方便好用。

具体可查看官网相关文档进行了解,用起来还是比较方便的。

retrofit初始化:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import retrofit2.retrofit;
import retrofit2.converter.gson.gsonconverterfactory;
public class retrofithelper {
  private static final string http_url = "http://baidu.com/";
  private static retrofit retrofit;
  public static retrofit instance(){
    if (retrofit == null){
      retrofit = new retrofit.builder()
          .baseurl(http_url)
          .addconverterfactory(gsonconverterfactory.create())
          .build();
    }
    return retrofit;
  }
}

如果需要修改超时时间,连接时间等可以这样初始话,retrofit采用okhttpclient

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import okhttp3.okhttpclient;
import retrofit2.retrofit;
import retrofit2.converter.gson.gsonconverterfactory;
import java.util.concurrent.timeunit;
public class retrofithelper {
  private static final string http_url = "http://baidu.com/";
  private static retrofit retrofit;
  public static retrofit instance(){
    if (retrofit == null){
      retrofit = new retrofit.builder()
          .baseurl(http_url)
          .client(new okhttpclient.builder()
              .connecttimeout(30, timeunit.seconds)//连接时间
              .readtimeout(30, timeunit.seconds)//读时间
              .writetimeout(30, timeunit.seconds)//写时间
              .build())
          .addconverterfactory(gsonconverterfactory.create())
          .build();
    }
    return retrofit;
  }
}

retrofit使用通过接口调用,要先声明一个接口;

?
1
2
3
4
5
6
7
8
9
import com.alibaba.fastjson.jsonobject;
import com.woasis.callbackdemo.bean.workordermessage;
import retrofit2.call;
import retrofit2.http.body;
import retrofit2.http.post;
public interface workordermessageinterface {
  @post("/api")
  call<jsonobject> updatebatteryinfo(@body workordermessage message);
}

接口和实例对象准备好了,接下来就是调用;

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void remotecallback(workordermessage message){
    //实例接口对象
    workordermessageinterface workordermessageinterface = retrofithelper.instance().create(workordermessageinterface.class);
    //调用接口方法
    call<jsonobject> objectcall = workordermessageinterface.updatebatteryinfo(message);
    system.out.println("远程调用执行:"+new date());
    //异步调用执行
    objectcall.enqueue(new callback<jsonobject>() {
      @override
      public void onresponse(call<jsonobject> call, response<jsonobject> response) {
        system.out.println("messagehandlenext****调用成功"+thread.currentthread().getid());
        message.setsuccess(true);
        system.out.println("messagehandlenext-回调成功"+jsonobject.tojsonstring(message));
      }
      @override
      public void onfailure(call<jsonobject> call, throwable throwable) {
        system.out.println("messagehandlenext++++调用失败"+thread.currentthread().getid());
        //失败后再将数据放入队列
        try {
          //对回调策略初始化
          long currenttime = system.currenttimemillis();
          message.setupdatetime(currenttime);
          message.setsuccess(false);
          callbacktype callbacktype = message.getcallbacktype();
          //获取等级
          int level = callbacktype.getlevel(callbacktype);
          //获取次数
          int count = callbacktype.getcount(callbacktype);
          //如果等级已经最高,则不再回调
          if (callbacktype.hour_6.getlevel() == callbacktype.getlevel() && count == message.getcount()){
            system.out.println("messagehandlenext-等级最高,不再回调, 线下处理:"+jsonobject.tojsonstring(message));
          }else {
            //看count是否最大,count次数最大则增加level
            if (message.getcount()<callbacktype.getcount()){
              message.setcount(message.getcount()+1);
            }else {//如果不小,则增加level
              message.setcount(1);
              level += 1;
              message.setcallbacktype(callbacktype.gettypebylevel(level));
            }
            messagequeue.putmessagetonext(message);
          }
        } catch (interruptedexception e) {
          e.printstacktrace();
          system.out.println("messagehandlenext-放入队列数据失败");
        }
      }
    });
  }

3.5结果实现

Java按时间梯度实现异步回调接口的方法

4.总结

本次实现了按照时间梯度去相应其他系统的接口,不再导致本身业务因其他系统的异常而阻塞。

源码:https://github.com/liuzwei/callback-demo

以上所述是小编给大家介绍的java按时间梯度实现异步回调接口,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!

原文链接:https://www.cnblogs.com/soinve/p/9555151.html