springBoot整合rabbitMQ的方法详解

时间:2022-08-26 20:54:22

引入pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.5</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.wxy</groupId>
  12. <artifactId>test-rabbitmq</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>test-rabbitmq</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28.  
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.amqp</groupId>
  36. <artifactId>spring-rabbit-test</artifactId>
  37. <scope>test</scope>
  38. </dependency>
  39. <dependency>
  40. <groupId>junit</groupId>
  41. <artifactId>junit</artifactId>
  42. <scope>test</scope>
  43. </dependency>
  44. </dependencies>
  45.  
  46. <build>
  47. <plugins>
  48. <plugin>
  49. <groupId>org.springframework.boot</groupId>
  50. <artifactId>spring-boot-maven-plugin</artifactId>
  51. </plugin>
  52. </plugins>
  53. </build>
  54.  
  55. </project>

测试

  1. package com.wxy.rabbit;
  2.  
  3. import org.junit.jupiter.api.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9.  
  10. import java.util.Arrays;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13.  
  14. @RunWith(SpringRunner.class)
  15. @SpringBootTest
  16. class TestRabbitmqApplicationTests {
  17.  
  18. @Autowired
  19. RabbitTemplate rabbitTemplate;
  20.  
  21. @Test
  22. public void sendmessage() {
  23. String exchange = "exchange.direct";
  24. String routingkey = "wxy.news";
  25. //object为消息发送的消息体,可以自动实现消息的序列化
  26. Map<String,Object> msg = new HashMap<>();
  27. msg.put("msg","使用mq发送消息");
  28. msg.put("data", Arrays.asList("helloword",123456,true));
  29. rabbitTemplate.convertAndSend(exchange, routingkey,msg);
  30. }
  31.  
  32. @Test
  33. public void receive() {
  34. Object object = rabbitTemplate.receiveAndConvert("wxy.news");
  35. System.out.println(object);
  36. }
  37.  
  38. }

默认消息转换类型

  1. ###############在RabbitTemplate默认使用的是SimpleMessageConverter#######
  2. private MessageConverter messageConverter = new SimpleMessageConverter();
  3.  
  4. ###############源码:使用SerializationUtils.deserialize###############
  5. public Object fromMessage(Message message) throws MessageConversionException {
  6. Object content = null;
  7. MessageProperties properties = message.getMessageProperties();
  8. if (properties != null) {
  9. String contentType = properties.getContentType();
  10. if (contentType != null && contentType.startsWith("text")) {
  11. String encoding = properties.getContentEncoding();
  12. if (encoding == null) {
  13. encoding = this.defaultCharset;
  14. }
  15.  
  16. try {
  17. content = new String(message.getBody(), encoding);
  18. } catch (UnsupportedEncodingException var8) {
  19. throw new MessageConversionException("failed to convert text-based Message content", var8);
  20. }
  21. } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
  22. try {
  23. content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
  24. } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
  25. throw new MessageConversionException("failed to convert serialized Message content", var7);
  26. }
  27. }
  28. }

springBoot整合rabbitMQ的方法详解

将默认消息类型转化成自定义json格式

  1. 第一:上面SimpleMessageConverterorg.springframework.amqp.support.converter包下MessageConverter接口的一个实现类
  2.  
  3. 第二:查看该接口MessageConverter下支持哪些消息转化
  4. ctrl+H查看该接口中的所有实现类
  5.  
  6. 第三步:找到json相关的convert

springBoot整合rabbitMQ的方法详解

  1. RabbitTemplateConfigurer中定义if (this.messageConverter != null)则使用配置的messageConverter
  1. ################## if (this.messageConverter != null)则使用配置的messageConverter
  2. public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
  3. PropertyMapper map = PropertyMapper.get();
  4. template.setConnectionFactory(connectionFactory);
  5. if (this.messageConverter != null) {
  6. template.setMessageConverter(this.messageConverter);
  7. }
  8.  
  9. template.setMandatory(this.determineMandatoryFlag());
  10. Template templateProperties = this.rabbitProperties.getTemplate();
  11. if (templateProperties.getRetry().isEnabled()) {
  12. template.setRetryTemplate((new RetryTemplateFactory(this.retryTemplateCustomizers)).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));
  13. }
  14.  
  15. templateProperties.getClass();
  16. map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
  17. templateProperties.getClass();
  18. map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
  19. templateProperties.getClass();
  20. map.from(templateProperties::getExchange).to(template::setExchange);
  21. templateProperties.getClass();
  22. map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
  23. templateProperties.getClass();
  24. map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
  25. }

配置一个messageConversert(org.springframework.amqp.support.converter包中的)

  1. package com.wxy.rabbit.config;
  2.  
  3. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  4. import org.springframework.amqp.support.converter.MessageConverter;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7.  
  8. @Configuration
  9. public class MessageConverConfig {
  10.  
  11. @Bean
  12. public MessageConverter getMessageConvert(){
  13. return new Jackson2JsonMessageConverter();
  14. }
  15. }

再次发送消息体json格式

springBoot整合rabbitMQ的方法详解

使用注解@RabbitListener监听

监听多个队列

  1. @RabbitListener(queues = {"wxy.news","wxy.emps"})

监听单个队列

  1. @RabbitListener(queues = "wxy.news")
  1. package com.wxy.rabbit.service;
  2.  
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Service;
  6.  
  7. @Service
  8. public class RabbitMqReceiveService {
  9.  
  10. @RabbitListener(queues = {"wxy.news","wxy.emps"})
  11. public void getReceiveMessage(){
  12. System.out.println("监听到性的消息");
  13. }
  14.  
  15. @RabbitListener(queues = {"wxy.news","wxy.emps"})
  16. public void getReceiveMessageHead(Message message){
  17. System.out.println(message.getBody());
  18. System.out.println( message.getMessageProperties());
  19. }
  20.  
  21. }

在程序中创建队列,交换器,并进行绑定

  1. @Test
  2. public void create() {
  3. //创建一个点对点的交换器
  4. amqpAdmin.declareExchange(new DirectExchange("amqpexchange.direct"));
  5. //创建一个队列
  6. // String name,:队列名称
  7. // boolean durable :持久化
  8. amqpAdmin.declareQueue(new Queue("amqp.queue",true));
  9. //绑定
  10. //String destination, Binding.DestinationType destinationType, String exchange, String routingKey
  11. // @Nullable Map<String, Object> arguments
  12. amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,
  13. "amqpexchange.direct","wxy.news", null));
  14.  
  15. }

到此这篇关于springBoot整合rabbitMQ的方法详解的文章就介绍到这了,更多相关springBoot整合rabbitMQ内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/qq_38423256/article/details/115770100