整体思路
1. pom增加redis依赖;
2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
3. 将消息订阅bean及监听器注册到配置中;
1. pom
-
<?xml version="1.0" encoding="UTF-8"?>
-
<project xmlns="/POM/4.0.0"
-
xmlns:xsi="http:///2001/XMLSchema-instance"
-
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
-
<modelVersion>4.0.0</modelVersion>
-
-
<parent>
-
<groupId></groupId>
-
<artifactId>spring-boot-starter-parent</artifactId>
-
<version>2.7.6</version>
-
</parent>
-
-
-
-
-
<dependencies>
-
-
<dependency>
-
<groupId></groupId>
-
<artifactId>spring-boot-starter-data-redis</artifactId>
-
</dependency>
2. 消息监听器实现代码
-
package ;
-
-
import ;
-
import org.;
-
import org.;
-
import ;
-
import ;
-
import ;
-
-
import ;
-
import ;
-
import ;
-
-
/**
-
* @Description: TODO
-
**/
-
@Component
-
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {
-
-
private static final Logger log = ();
-
-
// 创建一个线程池
-
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
-
10, 20, 0L, , new LinkedBlockingQueue<>());
-
@Override
-
public void onMessage(MapRecord message) {
-
// 异步处理消息
-
(()->{
-
(().getName() + ":接收到的消息:" + () + ";" + (()));
-
});
-
-
}
-
}
3. redis订阅bean及监听器注册
-
package ;
-
-
import ;
-
import org.;
-
import org.;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
import ;
-
-
import ;
-
import ;
-
-
/**
-
* @Description: TODO
-
**/
-
@Configuration
-
public class RedisMQConfig {
-
-
@Autowired
-
private RedisMQListener redisMQListener;
-
-
@Autowired
-
private RedisUtils redisUtils;
-
-
private static RedisTemplate<Object, Object> redisTemplate;
-
private static final Logger log = ();
-
-
public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
-
this.redisTemplate = redisTemplate;
-
}
-
-
@Bean
-
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
-
if ((Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
-
StreamInfo.XInfoGroups xInfoGroups = ().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
-
if (()) {
-
().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
-
} else {
-
if (().filter(xInfoGroups1 -> ().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(()).isEmpty()) {
-
().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
-
}
-
}
-
} else {
-
().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
-
}
-
StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = ()
-
.pollTimeout((1)).build();
-
StreamMessageListenerContainer streamMessageListenerContainer = (redisConnectionFactory, options);
-
Subscription subscription = ((Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), (Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ()), redisMQListener);
-
();
-
return subscription;
-
}
-
-
}
4. 测试生产消息 消息监听成功
4.1 生产消息
-
@RequestMapping("/produceMessage")
-
public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
-
String key = ("key");
-
String value = ("value");
-
MapRecord<Object, String, Object> mapRecord = (Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, (key, value));
-
().add(mapRecord);
-
("produceMessage Thread Name:" + ().getName());
-
return formatResult(null);
-
}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果