redis stream restTemplate消息监听队列框架搭建

时间:2024-10-13 07:23:44

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="/POM/4.0.0"
  3. xmlns:xsi="http:///2001/XMLSchema-instance"
  4. xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
  5. <modelVersion>4.0.0</modelVersion>
  6. <parent>
  7. <groupId></groupId>
  8. <artifactId>spring-boot-starter-parent</artifactId>
  9. <version>2.7.6</version>
  10. </parent>
  11. <dependencies>
  12. <dependency>
  13. <groupId></groupId>
  14. <artifactId>spring-boot-starter-data-redis</artifactId>
  15. </dependency>

2. 消息监听器实现代码

  1. package ;
  2. import ;
  3. import org.;
  4. import org.;
  5. import ;
  6. import ;
  7. import ;
  8. import ;
  9. import ;
  10. import ;
  11. /**
  12. * @Description: TODO
  13. **/
  14. @Component
  15. public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {
  16. private static final Logger log = ();
  17. // 创建一个线程池
  18. private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  19. 10, 20, 0L, , new LinkedBlockingQueue<>());
  20. @Override
  21. public void onMessage(MapRecord message) {
  22. // 异步处理消息
  23. (()->{
  24. (().getName() + ":接收到的消息:" + () + ";" + (()));
  25. });
  26. }
  27. }

3. redis订阅bean及监听器注册

  1. package ;
  2. import ;
  3. import org.;
  4. import org.;
  5. import ;
  6. import ;
  7. import ;
  8. import ;
  9. import ;
  10. import ;
  11. import ;
  12. import ;
  13. import ;
  14. import ;
  15. import ;
  16. import ;
  17. import ;
  18. /**
  19. * @Description: TODO
  20. **/
  21. @Configuration
  22. public class RedisMQConfig {
  23. @Autowired
  24. private RedisMQListener redisMQListener;
  25. @Autowired
  26. private RedisUtils redisUtils;
  27. private static RedisTemplate<Object, Object> redisTemplate;
  28. private static final Logger log = ();
  29. public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
  30. this.redisTemplate = redisTemplate;
  31. }
  32. @Bean
  33. public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
  34. if ((Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
  35. StreamInfo.XInfoGroups xInfoGroups = ().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
  36. if (()) {
  37. ().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
  38. } else {
  39. if (().filter(xInfoGroups1 -> ().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(()).isEmpty()) {
  40. ().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
  41. }
  42. }
  43. } else {
  44. ().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
  45. }
  46. StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = ()
  47. .pollTimeout((1)).build();
  48. StreamMessageListenerContainer streamMessageListenerContainer = (redisConnectionFactory, options);
  49. Subscription subscription = ((Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), (Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ()), redisMQListener);
  50. ();
  51. return subscription;
  52. }
  53. }

4. 测试生产消息 消息监听成功

4.1 生产消息

  1. @RequestMapping("/produceMessage")
  2. public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
  3. String key = ("key");
  4. String value = ("value");
  5. MapRecord<Object, String, Object> mapRecord = (Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, (key, value));
  6. ().add(mapRecord);
  7. ("produceMessage Thread Name:" + ().getName());
  8. return formatResult(null);
  9. }

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果