百度上查的大部分都是一些很简单的单消费者或者单生产者的例子,并且多是同一个服务器的配置,本文的例子为多服务器配置下的消费生产和消费者配置。
1、POM引入spring-cloud-starter-stream-rabbit
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2、application.properties
通用配置:
#rabbit的配置信息
rabbitmq.addresses=amqp://10.18.75.231:5672
rabbitmq.username=user_admin
rabbitmq.password=12345678
当存在多个binder时必须指定一个默认的binder:
# 设置一个默认的binder,如果不配置将报错
spring.cloud.stream.defaultBinder=boss
消费者配置:
1 # 配置ecm消费者的服务器配置信息
2 spring.cloud.stream.binders.ecm.type=rabbit
3 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses=${rabbitmq.addresses}
4 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username=${rabbitmq.username}
5 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password=${rabbitmq.password}
6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-host=ecm
7
8 #交易系统ECM的货柜模板变更消费者
9 spring.cloud.stream.bindings.ecm_shop_template.binder=ecm
10 spring.cloud.stream.bindings.ecm_shop_template.destination=这里填exchange的名字
11 #默认情况下同一个队列的只能被同一个group的消费者消费
12 spring.cloud.stream.bindings.ecm_shop_template.group=这里是消费者的名称
13 spring.cloud.stream.bindings.ecm_shop_template.contentType=text/plain
14 #指定该主题的类型为广播模式
15 spring.cloud.stream.bindings.ecm_shop_template.consumer.exchangeType=fanout
16 #消费失败的消息放入dlq队列
17 spring.cloud.stream.bindings.ecm_shop_template.consumer.autoBindDlq=true
18 spring.cloud.stream.bindings.ecm_shop_template.consumer.republishToDlq=true
生产者配置:
1 # BOSS消息生产者服务器配置
2 spring.cloud.stream.binders.boss.type=rabbit
3 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses=${rabbitmq.addresses}
4 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username=${rabbitmq.username}
5 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password=${rabbitmq.password}
6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-host=boss
7
8 #BOSS基础信息生产者
9 spring.cloud.stream.bindings.message_output.destination=exchange的名称
10 #exchange的类型为广播模式
11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeType=fanout
下面是java代码
1、定义消息的Input和Output配置信息
1 import org.springframework.cloud.stream.annotation.Input;
2 import org.springframework.cloud.stream.annotation.Output;
3 import org.springframework.messaging.MessageChannel;
4
5 /**
6 * mq连接源定义
7 *
8 * 其中类中的2个属性的值和properties里的配置需要一致
9 **/
10 public interface MqMessageSource {
11 // BOSS生产者
12 String MESSAGE_OUTPUT = "message_output";
13 // ECM消费者
14 String ECM_SHOP_TEMPLATE_INPUT = "ecm_shop_template";
15
16 @Output(MESSAGE_OUTPUT)
17 MessageChannel messageOutput();
18
19 @Input(ECM_SHOP_TEMPLATE_INPUT)
20 MessageChannel messageInput();
21
22 }
2、消息消费
1 import org.springframework.beans.factory.annotation.Autowired;
2 import org.springframework.cloud.stream.annotation.EnableBinding;
3 import org.springframework.cloud.stream.annotation.StreamListener;
4 import org.springframework.messaging.Message;
5
6 import com.alibaba.fastjson.JSONObject;
7
8 import lombok.extern.slf4j.Slf4j;
9
10 /**
11 * MQ消费者
12 * @author yangzhilong
13 *
14 */
15 @Slf4j
16 @EnableBinding(MqMessageSource.class)
17 public class MqMessageConsumer {
18
19 @Autowired
20 private XXService xxService;
21
22 /**
23 * 消费ECM的货柜模板变更
24 * @param message
25 */
26 @StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT)
27 public void receive(Message<String> message) {
28 log.info("接收货柜模板开始,参数={}", JSONObject.toJSONString(message));
29 if (null == message) {
30 return;
31 }
32 try {
33 String payload = message.getPayload();
34 log.info("具体消息内容= {}", JSONObject.toJSONString(payload));
35 JSONObject jsonObject = JSONObject.parseObject(payload);
36 ShopReqDto shopReqDto = new ShopReqDto();
37 shopReqDto.setCode(jsonObject.getString("shopNo"));
38 shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong("goodsMarketTemplateId"));
39 shopReqDto.setGoodsMarketTemplateName(jsonObject.getString("goodsMarketTemplateName"));
40 ResponseResult<String> responseResult = xxService.updateTemplateIdAndName(shopReqDto);
41 if(responseResult.isSuccess()){
42 log.info("【MQ消费货柜模板更新信息成功】");
43 }else{
44 log.error("【MQ消费货柜模板更新信息失败】,返回结果信息:" + JSONObject.toJSONString(responseResult));
45 }
46 } catch (Exception e) {
47 log.error("接收处理货柜模板MQ时出现异常:{}", e);
48 throw new RuntimeException(e);
49 }
50 }
51 }
3、消息生产者代码
1 import org.springframework.beans.factory.annotation.Autowired;
2 import org.springframework.cloud.stream.annotation.EnableBinding;
3 import org.springframework.cloud.stream.annotation.Output;
4 import org.springframework.messaging.MessageChannel;
5 import org.springframework.messaging.support.MessageBuilder;
6 import com.alibaba.fastjson.JSON;
7 import lombok.extern.slf4j.Slf4j;
8
9 /**
10 * 消息生产者
11 *
12 **/
13 @EnableBinding(MqMessageSource.class)
14 @Slf4j
15 public class MqMessageProducer {
16 @Autowired
17 @Output(MqMessageSource.MESSAGE_OUTPUT)
18 private MessageChannel channel;
19
20
21 //品牌
22 public void sendBrandAdd(Brand brand) {
23 BossMessage<Brand> message = new BossMessage<>();
24 message.setData(brand);
25 message.setOpType(MqMessageProducer.ADD);
26 message.setDataType(MqMessageProducer.BRAND);
27 channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build());
28 log.info("【MQ发送内容】" + JSON.toJSONString(message));
29 }
30 }