Redis实现延时队列
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class RedisDelayQueue {
private static final String QUEUE_KEY = "delay_queue";
private static final long POLL_INTERVAL = 1000; // 轮询间隔,单位:毫秒
private Jedis jedis;
private ScheduledExecutorService executorService;
public RedisDelayQueue() {
jedis = new Jedis("localhost");
executorService = Executors.newScheduledThreadPool(1);
startPolling();
}
public void addTask(String task, long delayMillis) {
long executeTime = System.currentTimeMillis() + delayMillis;
jedis.zadd(QUEUE_KEY, executeTime, task);
}
private void startPolling() {
executorService.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
Set<Tuple> tasks = jedis.zrangeByScoreWithScores(QUEUE_KEY, 0, now, 0, 1);
if (!tasks.isEmpty()) {
Tuple task = tasks.iterator().next();
String taskContent = task.getElement();
double score = task.getScore();
if (now >= score) {
jedis.zrem(QUEUE_KEY, taskContent);
processTask(taskContent);
}
}
}, 0, POLL_INTERVAL, TimeUnit.MILLISECONDS);
}
private void processTask(String task) {
System.out.println("Processing task: " + task);
// 在这里处理任务
}
public static void main(String[] args) {
RedisDelayQueue delayQueue = new RedisDelayQueue();
delayQueue.addTask("Task 1", 5000); // 5秒后执行
delayQueue.addTask("Task 2", 10000); // 10秒后执行
}
}