Redis延迟队列和分布式延迟队列的简答实现

时间:2022-03-17 06:05:57

最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现。

  在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。

1. 实现一个简单的延迟队列。

  我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图。DelayQueue实现了Delay、BlockingQueue接口。也就是DelayQueue是一种阻塞队列。

Redis延迟队列和分布式延迟队列的简答实现

  我们在看一下Delay的类图。Delayed接口也实现了Comparable接口,也就是我们使用Delayed的时候需要实现CompareTo方法。因为队列中的数据需要排一下先后,根据我们自己的实现。Delayed接口里边有一个方法就是getDelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。

Redis延迟队列和分布式延迟队列的简答实现

  我们创建一个Message类,实现了Delayed接口,我们主要把getDelay和compareTo进行实现。在Message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间fireTime。同时按照延迟时间的升序进行排序。我重写了里边的toString方法,用于将Message按照我写的方法进行输出。

  1. package com.hqs.delayQueue.bean;
  2.  
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.DelayQueue;
  5. import java.util.concurrent.Delayed;
  6. import java.util.concurrent.TimeUnit;
  7.  
  8. /**
  9. * @author huangqingshi
  10. * @Date 2020-04-18
  11. */
  12. public class Message implements Delayed {
  13.  
  14. private String body;
  15. private long fireTime;
  16.  
  17. public String getBody() {
  18. return body;
  19. }
  20.  
  21. public long getFireTime() {
  22. return fireTime;
  23. }
  24.  
  25. public Message(String body, long delayTime) {
  26. this.body = body;
  27. this.fireTime = delayTime + System.currentTimeMillis();
  28. }
  29.  
  30. public long getDelay(TimeUnit unit) {
  31.  
  32. return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  33. }
  34.  
  35. public int compareTo(Delayed o) {
  36. return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
  37. }
  38.  
  39. @Override
  40. public String toString() {
  41. return System.currentTimeMillis() + ":" + body;
  42. }
  43.  
  44. public static void main(String[] args) throws InterruptedException {
  45. System.out.println(System.currentTimeMillis() + ":start");
  46. BlockingQueue<Message> queue = new DelayQueue<>();
  47. Message message1 = new Message("hello", 1000 * 5L);
  48. Message message2 = new Message("world", 1000 * 7L);
  49.  
  50. queue.put(message1);
  51. queue.put(message2);
  52.  
  53. while (queue.size() > 0) {
  54. System.out.println(queue.take());
  55. }
  56. }
  57. }

  里边的main方法里边声明了两个Message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。

1587218430786:start
1587218435789:hello
1587218437793:world

  这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用Redis实现分布式的任务处理。

  2. 使用Redis的list实现分布式延迟队列。

  本地需要安装一个Redis,我自己是使用Docker构建一个Redis,非常快速,命令也没多少。我们直接启动Redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。

  1. docker pull redis
  2. docker run -itd --name redisLocal -p 6379:6379 redis
  3. docker exec -it redisLocal /bin/bash
  4. redis-cli

  我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.6.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.hqs</groupId>
  12. <artifactId>delayQueue</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>delayQueue</name>
  15. <description>Demo project for Spring Boot</description>
  16.  
  17. <properties>
  18. <java.version>1.8</java.version>
  19. </properties>
  20.  
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter</artifactId>
  25. </dependency>
  26.  
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. <exclusions>
  32. <exclusion>
  33. <groupId>org.junit.vintage</groupId>
  34. <artifactId>junit-vintage-engine</artifactId>
  35. </exclusion>
  36. </exclusions>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-data-redis</artifactId>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-starter-web</artifactId>
  45. </dependency>
  46. <dependency>
  47. <groupId>redis.clients</groupId>
  48. <artifactId>jedis</artifactId>
  49. <version>2.9.0</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-devtools</artifactId>
  54. <scope>runtime</scope>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.projectlombok</groupId>
  58. <artifactId>lombok</artifactId>
  59. <optional>true</optional>
  60. </dependency>
  61. </dependencies>
  62.  
  63. <build>
  64. <plugins>
  65. <plugin>
  66. <groupId>org.springframework.boot</groupId>
  67. <artifactId>spring-boot-maven-plugin</artifactId>
  68. </plugin>
  69. </plugins>
  70. </build>
  71.  
  72. </project>

  加上Redis的配置放到application.properties里边即可实现Redis连接,非常的方便。

  1. # redis
  2. redis.host=127.0.0.1
  3. redis.port=6379
  4. redis.password=
  5. redis.maxIdle=100
  6. redis.maxTotal=300
  7. redis.maxWait=10000
  8. redis.testOnBorrow=true
  9. redis.timeout=100000

  接下来实现一个基于Redis的list数据类型进行实现的一个类。我们使用RedisTemplate操作Redis,这个里边封装好我们所需要的Redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10W个,也是避免数据量过大对Redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算Redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到Redis里边。

  1. package com.hqs.delayQueue.cache;
  2.  
  3. import com.hqs.delayQueue.bean.Message;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6.  
  7. import java.util.concurrent.BlockingQueue;
  8.  
  9. /**
  10. * @author huangqingshi
  11. * @Date 2020-04-18
  12. */
  13. @Slf4j
  14. public class RedisListDelayedQueue{
  15.  
  16. private static final int MAX_SIZE_OF_QUEUE = 100000;
  17. private RedisTemplate<String, String> redisTemplate;
  18. private String queueName;
  19. private BlockingQueue<Message> delayedQueue;
  20.  
  21. public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
  22. this.redisTemplate = redisTemplate;
  23. this.queueName = queueName;
  24. this.delayedQueue = delayedQueue;
  25. init();
  26. }
  27.  
  28. public void offerMessage(Message message) {
  29. if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
  30. throw new IllegalStateException("超过队列要求最大值,请检查");
  31. }
  32. try {
  33. log.info("offerMessage:" + message);
  34. delayedQueue.offer(message);
  35. } catch (Exception e) {
  36. log.error("offMessage异常", e);
  37. }
  38. }
  39.  
  40. public void init() {
  41. new Thread(() -> {
  42. while(true) {
  43. try {
  44. Message message = delayedQueue.take();
  45. redisTemplate.opsForList().leftPush(queueName, message.toString());
  46. } catch (InterruptedException e) {
  47. log.error("取消息错误", e);
  48. }
  49. }
  50. }).start();
  51. }
  52. }

  接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listDelayedQueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从Redis的list中获取。方法也非常简单。

  1. package com.hqs.delayQueue.controller;
  2.  
  3. import com.hqs.delayQueue.bean.Message;
  4. import com.hqs.delayQueue.cache.RedisListDelayedQueue;
  5. import com.hqs.delayQueue.cache.RedisZSetDelayedQueue;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.data.redis.core.RedisTemplate;
  9. import org.springframework.stereotype.Controller;
  10. import org.springframework.web.bind.annotation.GetMapping;
  11. import org.springframework.web.bind.annotation.ResponseBody;
  12.  
  13. import java.util.Set;
  14. import java.util.concurrent.*;
  15.  
  16. /**
  17. * @author huangqingshi
  18. * @Date 2020-04-18
  19. */
  20. @Slf4j
  21. @Controller
  22. public class DelayQueueController {
  23.  
  24. private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();
  25.  
  26. //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的
  27. @Autowired
  28. RedisTemplate<String, String> redisTemplate;
  29.  
  30. private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS,
  31. new LinkedBlockingDeque<>());
  32.  
  33. @GetMapping("/redisTest")
  34. @ResponseBody
  35. public String redisTest() {
  36. redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS);
  37. System.out.println(redisTemplate.opsForValue().get("a"));
  38. return "s";
  39. }
  40.  
  41. @GetMapping("/redis/listDelayedQueue")
  42. @ResponseBody
  43. public String listDelayedQueue() {
  44.  
  45. Message message1 = new Message("hello", 1000 * 5L);
  46. Message message2 = new Message("world", 1000 * 7L);
  47.  
  48. String queueName = "list_queue";
  49.  
  50. BlockingQueue<Message> delayedQueue = new DelayQueue<>();
  51.  
  52. RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue);
  53.  
  54. redisListDelayedQueue.offerMessage(message1);
  55. redisListDelayedQueue.offerMessage(message2);
  56. asyncListTask(queueName);
  57.  
  58. return "success";
  59. }
  60.  
  61. @GetMapping("/redis/zSetDelayedQueue")
  62. @ResponseBody
  63. public String zSetDelayedQueue() {
  64.  
  65. Message message1 = new Message("hello", 1000 * 5L);
  66. Message message2 = new Message("world", 1000 * 7L);
  67.  
  68. String queueName = "zset_queue";
  69.  
  70. BlockingQueue<Message> delayedQueue = new DelayQueue<>();
  71.  
  72. RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue);
  73.  
  74. redisZSetDelayedQueue.offerMessage(message1);
  75. redisZSetDelayedQueue.offerMessage(message2);
  76. asyncZSetTask(queueName);
  77.  
  78. return "success";
  79. }
  80.  
  81. public void asyncListTask(String queueName) {
  82. taskExecPool.execute(() -> {
  83. for(;;) {
  84. String message = redisTemplate.opsForList().rightPop(queueName);
  85. if(message != null) {
  86. log.info(message);
  87. }
  88. }
  89. });
  90. }
  91.  
  92. public void asyncZSetTask(String queueName) {
  93. taskExecPool.execute(() -> {
  94. for(;;) {
  95. Long nowTimeInMs = System.currentTimeMillis();
  96. System.out.println("nowTimeInMs:" + nowTimeInMs);
  97. Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs);
  98. if(messages != null && messages.size() != 0) {
  99. redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs);
  100. for (String message : messages) {
  101. log.info("asyncZSetTask:" + message + " " + nowTimeInMs);
  102. }
  103. log.info(redisTemplate.opsForZSet().zCard(queueName).toString());
  104. }
  105. try {
  106. TimeUnit.SECONDS.sleep(1);
  107. } catch (InterruptedException e) {
  108. e.printStackTrace();
  109. }
  110. }
  111. });
  112. }
  113.  
  114. }

  我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到Redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好的进行这个问题的处理。

3.使用Redis的zSet实现分布式延迟队列。

  我们需要再写一个ZSet的队列处理。下边的offerMessage主要是把消息直接放入缓存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。

  1. package com.hqs.delayQueue.cache;
  2.  
  3. import com.hqs.delayQueue.bean.Message;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6.  
  7. import java.util.concurrent.BlockingQueue;
  8.  
  9. /**
  10. * @author huangqingshi
  11. * @Date 2020-04-18
  12. */
  13. @Slf4j
  14. public class RedisZSetDelayedQueue {
  15.  
  16. private static final int MAX_SIZE_OF_QUEUE = 100000;
  17. private RedisTemplate<String, String> redisTemplate;
  18. private String queueName;
  19. private BlockingQueue<Message> delayedQueue;
  20.  
  21. public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
  22. this.redisTemplate = redisTemplate;
  23. this.queueName = queueName;
  24. this.delayedQueue = delayedQueue;
  25. }
  26.  
  27. public void offerMessage(Message message) {
  28. if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
  29. throw new IllegalStateException("超过队列要求最大值,请检查");
  30. }
  31. long delayTime = message.getFireTime() - System.currentTimeMillis();
  32. log.info("zset offerMessage" + message + delayTime);
  33. redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime());
  34. }
  35.  
  36. }

  上边的Controller方法已经写好了测试的方法。/redis/zSetDelayedQueue,里边主要使用ZSet的zRangeByScore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removeRangeByScore,将数据删除。这样数据可以直接写到Redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖Redis,如果Redis内存不足或者连不上的时候,系统将变得不可用。

4. 总结一下,另外还有哪些可以延迟队列。

  上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。

  其实还有很多东西可以实现延迟队列。

  1) RabbitMQ就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。

  2)Kafka也可以实现这个功能。

  3)Netty的HashedWheelTimer也可以实现这个功能。

最后放上我的代码: https://github.com/stonehqs/delayQueue

到此这篇关于Redis延迟队列和分布式延迟队列的简答实现的文章就介绍到这了,更多相关Redis延迟队列和分布式延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

原文链接:https://www.cnblogs.com/huangqingshi/p/12728831.html