1、创建延时队列
package com.example.demo.utils;
import lombok.Data;
import java.util.concurrent.DelayQueue;
/**
* 延时队列
* 需要保证队列单例
*/
@Data
public class DelayTaskQueue {
private static class Holder{
static DelayQueue<DelayTask> instance = new DelayQueue(); //单例保证队列唯一
}
public static DelayQueue<DelayTask> getInstance() {
return Holder.instance;
}
}
2、创建消息任务(可根据自己业务来)
package com.example.demo.utils;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时任务
* 需要实现 Delayed 接口 重写getDelay和compareTo指定相应的规则
*/
@Data
@Accessors(chain = true)//链式调用注解
public class DelayTask implements Delayed {
private String id;
private Long time;
private Integer type;
@Override
public long getDelay(TimeUnit unit) {
// 计算该任务距离过期还剩多少时间
long remaining = time - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
// 比较、排序:对任务的延时大小进行排序,将延时时间最小的任务放到队列头部
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
3、消息生产者(根据业务需要投递消息进入队列)
package com.example.demo.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
* 在需要使用到延时队列的业务进行投递任务(消息)
*/
@Slf4j
public class DelayTaskProducer {
/**
*
* @param id 业务id
* @param time 消费时间 单位:毫秒
* @param type 业务类型
*/
public static void delayTask(String id,Long time,Integer type){
DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//创建队列 1
DelayTask delayTask = new DelayTask();//创建任务
delayTask.setId(id)
.setTime(time)
.setType(type);
log.info("=============入延时队列,{}",delayTask);
boolean offer = delayQueue.offer(delayTask);//任务入队
if(offer){
log.info("=============入延时队列成功,{}",delayQueue);
}else{
log.info("=============入延时队列失败");
}
}
}
4、消息消费者(规定自己相应的业务操作)
package com.example.demo.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
@Data
@Slf4j
@Component
public class DelayTaskConsumer implements CommandLineRunner {
/* @Autowired
private IProPatMeetingService meetingService;
@Autowired
private ParActivityService activityService;*/
@Override
public void run(String ...args) throws Exception {
DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//获取同一个put进去任务的队列
new Thread(() -> {
while (true) {
try {
// 从延迟队列的头部获取已经过期的消息
// 如果暂时没有过期消息或者队列为空,则take()方法会被阻塞,直到有过期的消息为止
DelayTask delayTask = delayQueue.take();//阻塞
switch (delayTask.getType()) {//判断业务类型,执行对应的业务操作
case 1:
log.info("====================会议消费,{}",delayTask.getType());
//ParMeeting meeting = (());
//(meeting,true,null);
break;
case 2:
log.info("====================活动报名消费,{}",delayTask.getType());
//ParActivity activityApply = (());
//(activityApply, PartyActivityPushMessageType.apply_activity_type.getCode());
break;
case 3:
log.info("====================活动开始消费,{}",delayTask.getType());
//ParActivity activityStart = (());
//(activityStart,PartyActivityPushMessageType.activity_start_type.getCode());
break;
default:
log.info("====================未找到消费类型,{}",delayTask.getType());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}