- 1、POM中引入spring-cloud-starter-stream-rabbit
<dependency>
<groupId></groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
- 2、定义配置文件
spring:
cloud:
stream:
bindings:
input:
destination: order
content-type: application/json
default-binder: defaultRabbit
group: test
output:
destination: order
content-type: application/json
default-binder: defaultRabbit
group: test
testInput:
destination: user
content-type: application/json
default-binder: defaultRabbit
group: test
testOutput:
destination: user
content-type: application/json
default-binder: defaultRabbit
group: test
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: bys
password: bys2018
virtual-host: /
配置说明:
* 1、binders: 一组rabbitMQ的连接信息;
* 2、bindings:声明输入和输出通道的接口集合;
* 3、input、output,stream提供的默认生产者与消费者;
* 4、destination要绑定的交换机,对应rabbitmq中的Exchanges。
* 5、group对应rabbitmq中的queue;
-
3、默认通道实例:
- 3.1 消息生产实例(绑定stream提供的默认的通道)
//绑定消息通道,Source是Stream提供的,跟配置文件output关联。 @EnableBinding() public class DefaultSendServiceImpl implements IDefaultSendService { @Autowired Source source; @Override public boolean sendMsg(String msg) { return ().send((msg).build()); } }
- 3.2 消息消费实例
//消息接收端,stream给我们提供了Sink,Sink源码里面是绑定input的,跟配置文件的input关联的。 @Component @EnableBinding(value = {}) public class DefaultReceiveServiceImpl implements IDefaultReceiveService { @StreamListener() public void recieve(Object payload){ ("默认接收消息" + payload); } }
- 3.1 消息生产实例(绑定stream提供的默认的通道)
-
4、自定义消息通道:
- 4.1、自定义生产者、消费者通道接口
public interface MqMessageSource { String TEST_OUT_PUT = "testOutput"; @Output(TEST_OUT_PUT) MessageChannel testOutput(); } public interface MqInputMessageSource { String TEST_IN_PUT = "testInput"; @Input(TEST_IN_PUT) SubscribableChannel testInput(); }
- 4.1、消息生产实例:(绑定自定义通道)
@EnableBinding() public class SendServiceImpl implements ISendService { @Autowired @Qualifier(MqMessageSource.TEST_OUT_PUT) MessageChannel messageChannel; @Override public boolean sendMsg(String msg) { return ((msg).build()); } }
- 4.2、消息消费实例:
@Component @EnableBinding(value = {}) public class ReceiveServiceImpl implements IReceiveService { private final static Logger logger = (); @Override @StreamListener(MqInputMessageSource.TEST_IN_PUT) public void receiveTime(Message<String> message) { ("接收消息" + ().toString()); } }
- 4.1、自定义生产者、消费者通道接口