【应用】SpringBoot 整合 RabbitMQ

时间:2022-12-21 08:04:26

引入依赖

创建 maven 项目 RabbirMQ,引入所使用的依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- JSON -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.20</version>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

RabbitMQ 生产者

配置文件以及主启动类

首先在项目中创建子 maven 项目 RabbitProducer,配置 RabbitMQ 服务地址

server:
  port: 8081

spring:
  rabbitmq:
    host: <RabbitMq Server ip>
    port: 5672

创建主启动类

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

}

配置 RabbitMQ 的 Exchange 以及 Queue

此处分别配置两种模式的交换机 Exchange,即 Direct 模式和 Topic 模式,有关交换机模式的内容请阅读RabbitMQ 基础

Direct Exchange

@Configuration
public class RabbitConfigFroDirect {
    /**
     * 创建 Direct 模式 Queue
     * Queue name:My_Direct_Queue01
     */
    @Bean
    public Queue directQueue01() {
        return new Queue("My_Direct_Queue01");
    }
    /**
     * 创建 Direct 模式 Queue
     * Queue name:My_Direct_Queue02
     */
    @Bean
    public Queue directQueue02() {
        return new Queue("My_Direct_Queue02");
    }
    /**
     * 创建 Direct 模式 Exchange
     * Exchange name:My_Direct_Exchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("My_Direct_Exchange");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue01
     * Routing Key:Queue01
     */
    @Bean
    public Binding bindingDirect01() {
        return BindingBuilder.bind(directQueue01()).to(directExchange()).with("Queue01");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue02
     * Routing Key:Queue02
     */
    @Bean
    public Binding bindingDirect02() {
        return BindingBuilder.bind(directQueue02()).to(directExchange()).with("Queue02");
    }

}

Topic Exchange

@Configuration
public class RabbitConfigForTopic {
    /**
     * 创建 Topic 模式 Queue
     * Queue name:My_Topic_Queue01
     */
    @Bean
    public Queue topicQueue01() {
        return new Queue("My_Topic_Queue01");
    }
    /**
     * 创建 Topic 模式 Queue
     * Queue name:My_Topic_Queue02
     */
    @Bean
    public Queue topicQueue02() {
        return new Queue("My_Topic_Queue02");
    }
    /**
     * 创建 Topic 模式 Exchange
     * Exchange name:My_Topic_Exchange
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("My_Topic_Exchange");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue01
     * Routing Key:*.topic
     */
    @Bean
    public Binding bindingTopic01() {
        return BindingBuilder.bind(topicQueue01()).to(topicExchange()).with("*.topic");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue02
     * Routing Key:#.topic
     */
    @Bean
    public Binding bindingTopic02() {
        return BindingBuilder.bind(topicQueue02()).to(topicExchange()).with("#.topic");
    }

}

编写消息发送的服务类

编写消息发送服务类的接口,定义发送消息的方法,需要指定 Routing Key 以及消息内容

public interface ProducerService {

    /**
     * 发送消息至 RabbitMQ 队列
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 处理结果
     */
    String sendMessage(String routingKey, String detail);

}

对不同模式的 Exchange 分别实现上述接口,实现消息发送的方法。在调用消息发送的方法时,需要指定 Exchange、Routing Key 以及 消息内容

@Service
public class DirectProducerServiceImpl implements ProducerService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @Override
    public String sendMessage(String routingKey, String detail) {
        amqpTemplate.convertAndSend("My_Direct_Exchange", routingKey, detail);
        return "消息发送成功:{Exchange:" + "My_Direct_Exchange|" + "Routing Key:" + routingKey + "|data:" + detail + "}";
    }
}
@Service
public class TopicProducerServiceImpl implements ProducerService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @Override
    public String sendMessage(String routingKey, String detail) {
        amqpTemplate.convertAndSend("My_Topic_Exchange", routingKey, detail);
        return "消息发送成功:{Exchange:" + "My_Topic_Exchange|" + "Routing Key:" + routingKey + "|data:" + detail + "}";
    }

}

编写控制层接收请求并调用消息发送服务

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Resource(name = "directProducerServiceImpl")
    private ProducerService directProducerService;

    @Resource(name = "topicProducerServiceImpl")
    private ProducerService topicProducerService;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @GetMapping ("/direct/{routingKey}/{detail}")
    public String sendForDirect(@PathVariable("routingKey") String routingKey,
                                @PathVariable("detail") String detail) {
        return directProducerService.sendMessage(routingKey, detail);
    }

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @GetMapping ("/topic/{routingKey}/{detail}")
    public String sendForTopic(@PathVariable("routingKey") String routingKey,
                                @PathVariable("detail") String detail) {
        return topicProducerService.sendMessage(routingKey, detail);
    }

}

RabbitMQ 消费者

配置文件以及主启动类

创建子 maven 项目 RabbitConsumer,编写配置文件

server:
 port: 8081

spring:
 rabbitmq:
 host: <RabbitMq Server ip>
 port: 5672

创建主启动类

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }

}

编写消息接收服务

使用@RabbitListener注解监听指定的队列,获取数据并进行处理即可

注:一般消息的传输使用 JSON 字符串的形式实现

@Component
public class RabbitReceiver {

    /**
     * 监听消息队列 My_Direct_Queue01
     */
    @RabbitListener(queues = "My_Direct_Queue01")
    public void directReceiver01(String detail) {
        System.out.println("消息接收成功:{" + "Queue:My_Direct_Queue01|data:" + detail + "}");
    }
    /**
     * 监听消息队列 My_Direct_Queue02
     */
    @RabbitListener(queues = "My_Direct_Queue02")
    public void directReceiver02(String detail) {
        System.out.println("消息接收成功:{" + "Queue:My_Direct_Queue02|data:" + detail + "}");
    }
    /**
     * 监听消息队列 My_Topic_Queue01
     */
    @RabbitListener(queues = "My_Topic_Queue01")
    public void topicReceiver01(String detail) {
        System.out.println("消息接收成功:{" + "Queue:My_Topic_Queue01|data:" + detail + "}");
    }
    /**
     * 监听消息队列 My_Topic_Queue02
     */
    @RabbitListener(queues = "My_Topic_Queue02")
    public void topicReceiver02(String detail) {
        System.out.println("消息接收成功:{" + "Queue:My_Topic_Queue02|data:" + detail + "}");
    }

}