springBoot之延时队列

时间:2025-04-02 08:45:55

1、创建延时队列

package com.example.demo.utils;
import lombok.Data;
import java.util.concurrent.DelayQueue;
/**
 * 延时队列
 * 需要保证队列单例
 */
@Data
public class DelayTaskQueue {
    private static class Holder{
         static DelayQueue<DelayTask> instance = new DelayQueue(); //单例保证队列唯一
    }
    public static DelayQueue<DelayTask> getInstance() {
        return Holder.instance;
    }
}

2、创建消息任务(可根据自己业务来)

package com.example.demo.utils;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 延时任务
 * 需要实现 Delayed 接口 重写getDelay和compareTo指定相应的规则
 */
@Data
@Accessors(chain = true)//链式调用注解
public class DelayTask implements Delayed {
    private String id;
    private Long time;
    private Integer type;
    @Override
    public long getDelay(TimeUnit unit) {
        // 计算该任务距离过期还剩多少时间
        long remaining = time - System.currentTimeMillis();
        return unit.convert(remaining, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        // 比较、排序:对任务的延时大小进行排序,将延时时间最小的任务放到队列头部
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

3、消息生产者(根据业务需要投递消息进入队列)

package com.example.demo.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
 * 在需要使用到延时队列的业务进行投递任务(消息)
 */
@Slf4j
public class DelayTaskProducer {
    /**
     *
     * @param id  业务id
     * @param time 消费时间  单位:毫秒
     * @param type 业务类型
     */
    public static void delayTask(String id,Long time,Integer type){
        DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//创建队列 1
        DelayTask delayTask = new DelayTask();//创建任务
        delayTask.setId(id)
                .setTime(time)
                .setType(type);
        log.info("=============入延时队列,{}",delayTask);
        boolean offer = delayQueue.offer(delayTask);//任务入队
        if(offer){
            log.info("=============入延时队列成功,{}",delayQueue);
        }else{
            log.info("=============入延时队列失败");
        }
    }
}

4、消息消费者(规定自己相应的业务操作)

package com.example.demo.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
@Data
@Slf4j
@Component
public class DelayTaskConsumer implements CommandLineRunner {
  /*  @Autowired
    private IProPatMeetingService meetingService;
    @Autowired
    private ParActivityService activityService;*/
    @Override
    public void run(String ...args) throws Exception {
        DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//获取同一个put进去任务的队列
        new Thread(() -> {
            while (true) {
                try {
                    // 从延迟队列的头部获取已经过期的消息
                    // 如果暂时没有过期消息或者队列为空,则take()方法会被阻塞,直到有过期的消息为止
                    DelayTask delayTask = delayQueue.take();//阻塞
                    switch (delayTask.getType()) {//判断业务类型,执行对应的业务操作
                        case 1:
                            log.info("====================会议消费,{}",delayTask.getType());
                            //ParMeeting meeting = (());
                            //(meeting,true,null);
                            break;
                        case 2:
                            log.info("====================活动报名消费,{}",delayTask.getType());
                            //ParActivity activityApply = (());
                            //(activityApply, PartyActivityPushMessageType.apply_activity_type.getCode());
                            break;
                        case 3:
                            log.info("====================活动开始消费,{}",delayTask.getType());
                            //ParActivity activityStart = (());
                            //(activityStart,PartyActivityPushMessageType.activity_start_type.getCode());
                            break;
                        default:
                            log.info("====================未找到消费类型,{}",delayTask.getType());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}