1、首先需要部署环境,这里我把生产者和消费者放在同一个spring boot项目中
2、使用Spring Initializr 新建一个项目,选择spring web和cloud steam
生成项目并导入到idea中。
3、在pom.xml中,需要添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
4、在配置文件中配置spring cloud steam相关配置信息
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest cloud: stream: bindings: greetingChannel: destination : greetings greetingChannel_input: destination: greetings group: group rabbit: bindings: greetingChannel: producer: delayed-exchange: true #延迟队列
这里需要保证本地安装有rabbitMQ,并且安装有对应的插件rabbitmq-delayed-message-exchange 自行百度实现即可
5、编写接口,在接口中定义消息产生和消息消费的两个方法
public interface HelloBinding { @Output("greetingChannel") MessageChannel greeting(); String GREETING = "greetingChannel_input"; @Input(GREETING) MessageChannel greetingInput(); } 这里的output和input中的字符串必须和配置文件中的两个字符串对应(已标红)
6、新建生产者类,绑定刚刚写好的接口类(红色代码)。具体代码如下
@EnableBinding(HelloBinding.class)
@RestController
public class ProducerControl {
@Autowired
private HelloBinding binding;
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
MessageChannel greet = binding.greeting();
String greeting = "Hello, " + name + "!";
System.out.println("发送前的时间:" + LocalDateTime.now());
Message<String> msg = MessageBuilder.withPayload(greeting).setHeader("x-delay", 1000 * 60 * 1)
.build();
boolean send = greet.send(msg);
System.out.println("消息发送情况" + send + LocalDateTime.now());
}
}
在以上代码中 ,定义了一个RestController,然后给方法定义了GetMapping地址,方便测试,接收一个name参数。当接口被调用的时候,传入name值,拼接字符串,然后使用MessageBuilder新建一个消息,用HelloBinding接口提供的方法获取输出通道(使用OutPut注解的方法),然后发送数据出去,这里实现延时效果的主要代码就是setHeader("x-delay", 1000 * 60 * 1),第一个参数不能修改,第二个参数可以设置延时时间,单位为ms,我这里设置了1分钟;另外,我们还需要在配置文件中给生产者设置启用延时,参考配置文件蓝色代码块。
7、到这里,生产者已经搞定,一旦接口被调用,则会产生一个延时1分钟的消息,一分钟后消息会到达rabbitMQ的queen中,这里有一个坑,如果我们设置通道的时候,没有给通道分组(即设置group,绿色部分代码),则发送消息时仍然会有通道,但是一旦停止项目,则这个通道会随之消失,如果在延时消息时间到达之前,项目仍然没有启动,则queen一直都不存在,那么延时消息就没有通道可以走,这个时候,消息就丢失了。所以说需要给channel设置group值,进行分组,分组还有一些其他的好处,大家自行百度即可
8、创建消息消费者,这个类比较简单,我们直接贴代码即可
@EnableBinding(HelloBinding.class) public class HelloListener { @StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(LocalDateTime.now()+"收到的消息为:"+msg); } }
这里值得注意的就是需要和HelloBinding绑定,然后在方法中设置监听的地址必须和HelloBinding接口的Input注解标注的方法的字符串一致,否则监听的就不是同一个通道了;最后会接收到字符串类型的数据,如果需要传递其他类型对象,将对象序列化为json字符串即可
9、运行当前项目,然后访问http://localhost:8080/greet/john即可产生一个延时消息,一分钟后会进入queen通道,然后被消费者取出并消费
参考链接:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/ (英文)