15.消息队列RabbitMQ

时间:2023-02-27 08:02:44

一、基本概念

        RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

        安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量,安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面,访问地址  账号密码均为 guest。

rabbitmq-plugins enable rabbitmq_management

二、用户

  1. 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  3. 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  4. 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  5. 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。

三、工作模式

        RabbitMQ主要有五种工作模式,分别是:

  1. 简单模式(hello world)
  2. 工作队列模式(work queue)
  3. 发布/订阅模式(publish/subscribe)
  4. 路由模式(routing)
  5. 主题模式(topic)

         导入依赖:

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>

        工具类:

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("vhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

        1.简单模式(hello world):

//发送信息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", "hello", null, message.getBytes());

        //关闭通道和连接
        channel.close();
        connection.close();
}

//接收消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume("hello", true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
        }
}

        2.工作队列模式(work queue):多个消费者消费同一队列消息。

//接收消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示接收到消息马上自动确认完成
        channel.basicConsume("hello", false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // 返回确认状态,否则表示使用自动确认模式
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

        3.发布/订阅模式(publish/subscribe):通过交换机发送消息到多个队列。

//发送消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        channel.close();
        connection.close();
}

//接收消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // 返回完成状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

        4.路由模式(routing):通过交换机进行路由匹配发送消息到不同队列。

//发送消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange及类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 消息内容
        String message = "Hello World!";
        //指定消息路由
        channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());

        channel.close();
        connection.close();
}

//接收消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,并指定多个路由
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // 返回完成状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

        5.主题模式(topic):通过交换机进行通配符匹配发送消息到不同队列。

//发送消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange及类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!";
        //指定消息匹配关键字
        channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());

        channel.close();
        connection.close();
}

//接收消息
public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机,并指定多个通配符
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            // 返回完成状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
}

四、Spring整合

        Spring 提供了 RabbitTemplate 类执行消息发送。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: 192.168.88.88
    port: 5672
    username: guest
    password: guest
    virtual-host: /
@Configuration
public class MQConfig {

    @Bean
    public Exchange exchange1(){
        return ExchangeBuilder.fanoutExchange("fanout").build();
    }
    @Bean
    public Exchange exchange2(){
        return ExchangeBuilder.directExchange("direct").build();
    }
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable("hello1").build();
    }
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable("hello2").build();
    }
    @Bean
    public Binding binding1(Exchange exchange1,Queue queue1){
        return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();
    }
    @Bean
    public Binding binding2(Exchange exchange2,Queue queue2){
        return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();
    }
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),
        exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {

    @RabbitHandler
    public void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        
        //手动返回状态
        if () {
            // RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
            channel.basicAck(deliveryTag, false);
        } else {
            // 第三个参数true,表示这个消息会重新进入队列
            channel.basicNack(deliveryTag, false, true);
        }

    }
}