Redis实现延时队列

时间:2025-04-02 15:27:57
import redis.clients.jedis.Jedis; import redis.clients.jedis.Tuple; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class RedisDelayQueue { private static final String QUEUE_KEY = "delay_queue"; private static final long POLL_INTERVAL = 1000; // 轮询间隔,单位:毫秒 private Jedis jedis; private ScheduledExecutorService executorService; public RedisDelayQueue() { jedis = new Jedis("localhost"); executorService = Executors.newScheduledThreadPool(1); startPolling(); } public void addTask(String task, long delayMillis) { long executeTime = System.currentTimeMillis() + delayMillis; jedis.zadd(QUEUE_KEY, executeTime, task); } private void startPolling() { executorService.scheduleAtFixedRate(() -> { long now = System.currentTimeMillis(); Set<Tuple> tasks = jedis.zrangeByScoreWithScores(QUEUE_KEY, 0, now, 0, 1); if (!tasks.isEmpty()) { Tuple task = tasks.iterator().next(); String taskContent = task.getElement(); double score = task.getScore(); if (now >= score) { jedis.zrem(QUEUE_KEY, taskContent); processTask(taskContent); } } }, 0, POLL_INTERVAL, TimeUnit.MILLISECONDS); } private void processTask(String task) { System.out.println("Processing task: " + task); // 在这里处理任务 } public static void main(String[] args) { RedisDelayQueue delayQueue = new RedisDelayQueue(); delayQueue.addTask("Task 1", 5000); // 5秒后执行 delayQueue.addTask("Task 2", 10000); // 10秒后执行 } }