第四章 RabbitMQ
与Spring
整合
文章目录
- 第四章 `RabbitMQ`与`Spring`整合
- 4.1-`AMQP`核心组件
- 4.2-`SprigAMQP`管理组件`RabbitAdmin`应用
- 4.3-`SprigAMQP-RabbitMQ`声明式配置使用
- 4.4-`SpringAMQP`消息模板组件-`RabbitTemplate`
- 4.5-`SpringAMQP`简单消息监听容器-`SimpleMessageListenerContainer`
- 4.6-`SpringAMQP`消息监听适配器-`MessageListenerAdapter`
- 4.7-`SpringAMPQ`消息转换器-`MessageConverter`
- 4.8-`RabbitMQ`与`SpringBoot2.0`整合
- 生产端
- 消费端
- 4.9-`RabbitMQ`与`SpringCloud Stream`整合
4.1-AMQP
核心组件
-
RabbitAdmin
//管控组件 -
SpringAMQP
声明 //声明交换机、绑定、队列等操作,使用@Bean
注解,直接加入IOC
-
RabbitTemplate
//消息模板 -
SimpleMessageListenerContainer
//简单消息监听容器,可以在消费者端进行详细配置 -
MessageListenerAdapter
//消息监听适配器,适用于添加完监听器之后 -
MessageConverter
// 消息转换器,比如消息序列号和反序列化
4.2-SprigAMQP
管理组件RabbitAdmin
应用
-
作用:
RabbitAdmin
类可以很好的操作RabbitMQ
,比如增加删除绑定交换机、队列,在Spring
中直接进行注入即可使用//ConnectionFactory 必须是 ; @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }
注意:
autoStartup
必须要设置为true
,否则Spring
容器不会加载RabbitAdmin
类 -
RabbitAdmin
底层实现就是从Spring
容器中获取Exchange、Bingding、RoutingKey、Queue
的@Bean
声明,然后使用RabbitTemplate
的execute
方法执行对应的声明、修改、删除等一系列RabbitMq基础操作 -
代码演示
-
引入依赖
<dependency> <groupId></groupId> <artifactId>amqp-client</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.3.</version> </dependency>
-
配置Bean
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //引入的是下的connectionFactory,而不是包下的 @Configuration @ComponentScan({""}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
-
编写测试类
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Autowired private RabbitAdmin rabbitAdmin; @Test public void testRabbitAdmin(){ rabbitAdmin.declareExchange(new DirectExchange("", false,false )); rabbitAdmin.declareQueue(new Queue("",false )); rabbitAdmin.declareBinding(new Binding("", Binding.DestinationType.QUEUE, "", "direct", null)); //BindingBuilder 链式操作,但在操作中,如果没有事先声明创建交换机和队列,会报错提示找不到 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("", false)) .to(new DirectExchange("", false, false)) .with("direct") ); rabbitAdmin.purgeQueue(""); } }
-
-
RabbitAdmin
源码- 实现了
InitializingBean
接口,表明在Bean
配置加载完后再加载RabbitAdmin
配置。找到afterPropertiesSet()
方法中最要的initialize()
初始化方法。 - 可以看到
Exchange、Queue、Binding
都是从Spring
容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings
三种容器中。 后续的源码中,也可以看出通过筛选Spring
容器中RabbitMQ
的信息之后,再去建立RabbitMQ
服务器的连接。主要通过Spring
以@Bean
的方式,将配置加载到Spring
容器之后,再从容器中获取相关信息,再去建立连接。
- 实现了
4.3-SprigAMQP-RabbitMQ
声明式配置使用
-
在
Rabbit
基础API
里面声明一个Exchange
、声明一个绑定、一个队列// 用的是 amqp-client 依赖; channel.exchangeDeclare() channel.queueDeclare() channel.queueBind()
-
使用
SpringAMQP
去声明,就需要使用SpringAMQP
的如下模式,即声明@Bean
方式// 用的是spring-boot-starter-amqp 依赖; /** * 针对消费者配置 * 1\. 设置交换机类型 * 2\. 将队列绑定到交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding003() { //同一个Exchange绑定了2个队列 return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); }
4.4-SpringAMQP
消息模板组件-RabbitTemplate
-
消息模板,是与
SpringAMQP
整合的时候进行发送消息的关键类 -
该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口
ConfirmCallback
,返回值确认接口ReturnCallback
等。同样的我们需要进行注入到Spring
容器中,然后直接使用 -
在与
Spring
整合时,需要实例化,但是在和Springboot
整合时,在配置文件中添加配置即可 -
代码演示
RabbitMQConfig
-配置参数@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; }
测试类
@Autowired private RabbitTemplate rabbitTemplate; @Test public void sendMsg(){ //创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "消息描述。。。"); messageProperties.getHeaders().put("type", "自定义消息类型"); //消息体 Message message = new Message("hello rabbit".getBytes(), messageProperties); //转换并发送 rabbitTemplate.convertAndSend("dir01", "", message, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新增的属性"); return message; } }); }
简单写法
@Test public void sendmsg2(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq消息".getBytes(), messageProperties); rabbitTemplate.send("topic", "", message); rabbitTemplate.convertAndSend("topic1", "", "hello,1"); }
4.5-SpringAMQP
简单消息监听容器-SimpleMessageListenerContainer
-
这个类非常强大,我们可以对它进行很多设置,对于消费者的配置项,这个类可以满足
-
监听队列(多个队列)、自动启动、自动声明功能
-
设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
-
设置消费者数量、最小最大数量、批量消费
-
设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
-
设置消费者标签生成策略、是否独占模式、消费者属性等
-
设置具体的监听器、消息转换器等
-
SimpleMessageListenerContainer
可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量大小、接收消息的模式等 -
很多基于
RabbitMQ
的自制后端管控台在进行动态设置的时候,也是根据这一特性去实现的主要使用方法:
-
messageListenerAdapter
消息监听适配器 -
defaultListenerMethod
默认监听方法 -
Delegate
委托对象,实际真实的委托人,进行处理消息 -
queueOrTagToMethodName
队列与方法进行绑定代码演示
RabbitMQConfig
-配置参数@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory); listenerContainer.setConcurrentConsumers(1); // 设置当前消费者为1 listenerContainer.setMaxConcurrentConsumers(5);// 设置最大消费者5 listenerContainer.setQueues(queue001(),queue002(),queue003());// 设置监听的队列,此处使用了4.3 声明式配置中定义的队列 listenerContainer.setDefaultRequeueRejected(false);// 设置是否重回队列 listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置接收方式,AUTO-自动接收,MANUAL-手动接收,NULL-不接收 listenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() { //设置消费者标签策略 @Override public String createConsumerTag(String s) { return s+"_"+ UUID.randomUUID().toString(); } }); listenerContainer.setMessageListener(new MessageListener() { // 监听消息 @Override public void onMessage(Message message) { String msg = new String(message.getBody()); System.out.println("消息:"+msg); } }); return listenerContainer; }
4.6-SpringAMQP
消息监听适配器-MessageListenerAdapter
RabbitMQconfig
- 配置参数
//原始方法,可以直接new MessageListener
listenerContainer.setMessageListener(new MessageListener() { // 监听消息
@Override
public void onMessage(Message message) {
String msg = new String(message.getBody());
System.out.println("消息:"+msg);
}
});
/**
* 适配器方式,根据自定义来处理
*/
// 1.1 默认是有自己的方法名字的:handleMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
listenerContainer.setMessageListener(adapter);
// 1.2 可以自己指定一个方法的名字: consumeMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
listenerContainer.setMessageListener(adapter);
// 2 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new MessageConverter() { // 消息转换器
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
// java对象,转换成message对象
return new Message(o.toString().getBytes(),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// message对象,转换为java对象
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
});
listenerContainer.setMessageListener(adapter);
return listenerContainer;
MessageDelegate
消息代理
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("默认方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息内容:" + messageBody);
}
public void method1(String messageBody) {
System.err.println("method1 收到消息内容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息内容:" + new String(messageBody));
}
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
}
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
public void consumeMessage(File file) {
System.err.println("文件对象 方法, 消息内容:" + file.getName());
}
}
单元测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitAdminTest {
@Autowired
private RabbitAdmin rabbitAdmin ;
@Autowired
private RabbitTemplate rabbitTemplate;
// 测试对应 RabbitMQConfig-1.1和1.2
@Test
public void testAdapter1() {
MessageProperties messageProperties = new MessageProperties();
Message message = new Message("hello rabbitmq-1".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "", message);
}
// 测试对应 RabbitMQConfig-2 也可以添加一个转换器: 从字节数组转换为String
@Test
public void testAdapter2() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("hello rabbitmq-2".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "", message);
}
// 测试对应 RabbitMQConfig-3 可以将队列名称,和我们自定义的方法名进行绑定
@Test
public void testAdapter3() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message1 = new Message("hello rabbitmq-3.1".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "", message1);
Message message2 = new Message("hello rabbitmq-3.2".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic002", "", message2);
}
}
4.7-SpringAMPQ
消息转换器-MessageConverter
- 发送消息时,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要使用
MessageConverter
- 自定义转换器,一般都需要实现接口重写两个方法
-
toMessage
java
对象转换为Message
-
fromMessage
Message
对象转换为java
对象
-
- 除了转换string类型数据,还有以下
-
json
转换器Jackson2JsonMessageConverter
可以进行java
对象的转换功能 -
DefaultJackson2JavaTypeMapper
映射器 可以进行java
对象的映射关系 - 自定义二进制转换器,比如图片、PDF、PPT、流媒体
-
RabbitMqConfig
配置参数
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.2 支持java对象转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//添加包信任,否则报错:The class '' is not in the trusted packages
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.3 支持java多对象映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//添加包信任,否则报错:The class '' is not in the trusted packages
javaTypeMapper.setTrustedPackages("*");
HashMap<String, Class<?>> classHashMap = new HashMap<>();
classHashMap.put("order",com.lc.study.spring.OrderEntity.class);
classHashMap.put("package",com.lc.study.spring.PackageEntity.class);
javaTypeMapper.setIdClassMapping(classHashMap);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
listenerContainer.setMessageListener(adapter);
// 1.4 text/image/pdf 转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局转换器,大的
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
//文本转换器
MessageConverter testConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
};
converter.addDelegate("text",testConverter);
converter.addDelegate("html/text",testConverter);
converter.addDelegate("xml/text",testConverter);
converter.addDelegate("text/plain",testConverter);
// json转换器
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
converter.addDelegate("json",jsonMessageConverter);
converter.addDelegate("application/json",jsonMessageConverter);
// 图片转换器
MessageConverter imageConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException("图片转换异常");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ?"png":_extName.toString();
System.out.println(extName);
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "C:/Users/24669/Downloads/workspace/" + fileName +"."+extName;
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
};
converter.addDelegate("image/png",imageConverter);
converter.addDelegate("image",imageConverter);
// pdf 转换器
MessageConverter pdfConverter = new MessageConverter() {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException("pdf转换异常");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "C:/Users/24669/Downloads/workspace/" + fileName +".pdf";
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
};
converter.addDelegate("application/pdf",pdfConverter);
adapter.setMessageConverter(converter);
listenerContainer.setMessageListener(adapter);
return listenerContainer;
MessageDelegate
消息代理 同上一章
单元测试
//测试对应 1.1 支持json格式的转换器
@Test
public void testJsonMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0001");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println("json消息体:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","",message);
}
//测试对应 1.2 支持java对象格式的转换器
@Test
public void testJavaEntityMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0002");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println("java对象:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
properties.getHeaders().put("__TypeId__","");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","",message);
}
//测试对应 1.3 支持java多对象映射的转换器
@Test
public void testJavaEntityMapperMessage() throws JsonProcessingException {
OrderEntity order = new OrderEntity();
order.setId("0003");
order.setName("订单消息");
order.setContent("小笼包+豆浆+茶叶蛋");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println("java对象:"+json);
MessageProperties properties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties.setContentType("application/json");
properties.getHeaders().put("__TypeId__","order");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("topic001","",message);
PackageEntity pack = new PackageEntity();
pack.setId("0001");
pack.setName("快递包裹信息");
pack.setDescription("你的快递已送达");
String json2 = mapper.writeValueAsString(pack);
System.out.println("json2对象:"+json2);
MessageProperties properties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
properties2.setContentType("application/json");
properties2.getHeaders().put("__TypeId__","package");
Message message2 = new Message(json2.getBytes(), properties2);
rabbitTemplate.convertAndSend("topic001","spring.entity2",message2);
}
//测试对应 1.4 text/image/pdf 的转换器
@Test
public void testTextJsonImagePdfMessage() throws Exception {
byte[] body1 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", ""));
MessageProperties properties1 = new MessageProperties();
properties1.setContentType("image/png");
properties1.getHeaders().put("extName","png");
Message message1 = new Message(body1, properties1);
rabbitTemplate.convertAndSend("topic001","",message1);
byte[] body2 = Files.readAllBytes(Paths.get("C:/Users/24669/Downloads", ""));
MessageProperties properties2 = new MessageProperties();
properties2.setContentType("application/pdf");
Message message2 = new Message(body2, properties2);
rabbitTemplate.convertAndSend("topic001","",message2);
}
4.8-RabbitMQ
与SpringBoot2.0
整合
生产端
-
publisher-confirms
实现一个监听器,用于监听Broker
端给我们返回的确认请求 -
publisher-returns
保证消息对于Broker
端可达,如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功 -
在发送消息的时候,对
template
进行配置mandatory = true
保证监听有效 -
当
mandatory
标志位设置为true
时,如果exchange
根据自身类型和消息routeKey
无法找到一个符合条件的queue
,那么会将消息返回给生产者;当mandatory
设置为false
时,出现上述情形broker
会直接将消息扔掉。// pom文件 <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId></groupId> <version>2.3.</version> </parent> <dependencies> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
// 配置 server: port: 5555 spring: rabbitmq: addresses: 192.168.2.58:5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
// 声明式创建元数据(交换机、队列、绑定) @Configuration @ComponentScan(value = {".*"}) public class ProducerConfig { //声明交换机和队列,以及绑定 @Bean public TopicExchange topicExchange() { return new TopicExchange("springboot.exchange1", true, false); } @Bean public Queue queue1() { //如果你想创建一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。 //1-只对首次声明它的连接(Connection)可见;2-会在其连接断开的时候自动删除。 // String name, boolean durable, boolean exclusive, boolean autoDelete return new Queue(".queue1",true,false,false); } @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(topicExchange()).with("springboot.#"); } }
// 自定义消费发送类,重写rabbitTemplate @Component public class RabbitSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitConfirmCallback rabbitConfirmCallback; @Autowired private RabbitReturnCallback rabbitReturnCallback; public void sendMessage(Object message, MessageProperties properties) { Message msg = new Message(message.toString().getBytes(), properties); rabbitTemplate.setMandatory(true); //消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。 //消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。 rabbitTemplate.setConfirmCallback(rabbitConfirmCallback); rabbitTemplate.setReturnCallback(rabbitReturnCallback); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 第一次测试,使用正确的routingkey;第二次测试,使用错误的routingkey rabbitTemplate.convertAndSend("springboot.exchange1", "", msg, correlationData); } }
// 消息confirmCallBack类,确认模式 @Component public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData:" + correlationData); System.err.println("ack:" + ack); if (!ack) { System.err.println("异常处理..."+cause); } else { System.err.println("消息已送达"); } } } // 消息 returnCallBack类,退回模式 @Component public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.err.println("消息未达的原因:" + s); } }
// 测试类 @RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitMqApplication.class) public class ProducerTest { @Autowired private RabbitSender rabbitSender; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:"); @Test public void testSender() throws Exception { MessageProperties properties = new MessageProperties(); properties.getHeaders().put("key","1231231"); properties.getHeaders().put("send_time",simpleDateFormat.format(new Date())); rabbitSender.sendMessage("springBoot+rabbitmq",properties); } }
消费端
-
手动签收数据
-
消费端使用
@RabbitMQListener
注解server: port: 5556 spring: rabbitmq: addresses: 192.168.2.58:5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # 手动签收消息
@Component public class RabbitReceiver { // 使用注解进行时间监听绑定,也可以用作创建元数据 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "springboot.queue1",durable = "true"), exchange = @Exchange(value = "springboot.exchange1",durable = "true",type = "topic",ignoreDeclarationExceptions = "true"), key = "springboot.#" )) @RabbitHandler public void onMessage(Message message, Channel channel) throws IOException { System.out.println("消费端:"+ new String((byte[]) message.getPayload())); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手动ACK channel.basicAck(deliveryTag,false); } }
4.9-RabbitMQ
与SpringCloud Stream
整合
- 支持消息的生产与发送,可以是不同的消息中间件(
Rabbitmq
---->Spring Cloud Stream
---->Kafka
) -
SpringCloud Stream
整合消息中间件,会有输入端和输出端,中间绑定消息中间件可以是Rabbitmq,Kafka
,相当于重写了出、入接口 -
@Output
输出注解,用户定义发送消息接口 -
@Input
输入注解,用于定义消费接口 -
@StreamListener
用于定义监听方法注解 - 缺点:不能100%实现消息的可靠性投递,会存在少量消息丢失问题。因为要兼容
Kafka