第二十一章 RabbitMQ

时间:2024-04-04 14:21:33
常见 MQ 产品
- ActiveMQ:基于 JMS。
- RabbitMQ:基于 AMQP 协议,erlang 语言开发,稳定性好。
- RocketMQ:基于 JMS,阿里巴巴产品,目前交由 Apache 基金会。
- Kafka:分布式消息系统,高吞吐量。
   
1.1 AMQP 简介
AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是 个线路层的协议规范,而不是 API 规范(例如 JMS )。由于 AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像 SMTP HTTP 等协议 样,只要开发者按照规范的格式发送数据,任何平台都可以通过 AMQP 进行消息交互。像目前流行的 StormMQ RabbitMQ 等都实现了 AMQP。
   
1.2 RabbitMQ 简介
RabbitMQ 一个实现了 AMQP 的开源消息中间件,使用高性能的 Erlang 编写。RabbitMQ 有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
  
1.3 RabbitMQ 的工作原理

   
组成部分说明:
- Broker:消息队列服务进程,此进程包括两个部分:Exchange 和 Queue。
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的。
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送。
- Consumer:消息消费者,即消费方客户端,接收 MQ 转发的消息。
  

生产者发送消息流程:
1. 生产者和 Broker 建立 TCP 连接。
2. 生产者和 Broker 建立通道。

3. 生产者通过通道消息发送给 Broker,由 Exchange 将消息进行转发。
4. Exchange 将消息转发到指定的 Queue(队列)。
   
消费者接收消息流程:
1. 消费者和 Broker 建立 TCP 连接。
2. 消费者和 Broker 建立通道。
3. 消费者监听指定的 Queue(队列)。
4. 当有消息到达 Queue 时 Broker 默认将消息推送给消费者。
5. 消费者接收到消息。
6. ack 回复。
   

二、RabbitMQ 安装

2.1 配置要求
系统:[Centos7](https://so.csdn.net/so/search?q=Centos7&spm=1001.2101.3001.7020)
Linux 内核:官方建议 3.10 以上
  
<span style="background:#ffff00">注意:本文的命令使用的是 root 用户登录执行,不是 root 的话所有命令前面要加 sudo</span>
  
1. 查看当前的内核版本
[root@localhost ~]# uname -r
3.10.0-1160.el7.x86_64
  
2. 更新 yum 包(使用 root 权限,生产环境中此步操作需慎重)
[root@localhost ~]# yum -y update
  
> yum -y update 升级所有包同时也升级软件和系统内核;
> yum -y upgrade 只升级所有包,不升级软件和系统内核。
  
3. 卸载旧版本(如果之前安装过的话)
[root@localhost ~]# yum remove docker  docker-common docker-selinux docker-engine
   
2.2 安装 Docker
1. 安装软件包
安装需要的软件包,yum-util 提供 yum-config-manager 功能,另两个是 devicemapper 驱动依赖。
[root@localhost ~]# yum install -y yum-utils device-mapper-persistent-data lvm2
  
2. 设置 yum 源
[root@localhost ~]# yum-config-manager --add-repo http://download.docker.com/linux/centos/docker-ce.repo(*仓库)
[root@localhost ~]# yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo(阿里仓库)
   
3. 选择 docker 版本
查看可用版本有哪些
[root@localhost ~]# yum list docker-ce --showduplicates | sort -r
   
4. 安装 docker
选择一个版本并安装:yum install docker-ce-版本号
[root@localhost ~]# yum -y install docker-ce-18.03.1.ce
   
5. 启动 Docker
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl enable docker 设置开机自启
   
6. 查看 docker 版本
[root@localhost ~]# docker version
  
2.3 安装 RabbitMQ
1. 查找镜像
[root@localhost ~]# docker search rabbitmq:management
   
2. 拉取镜像
[root@localhost ~]# docker pull macintoshplus/rabbitmq-management
  
3. 查看镜像
[root@localhost ~]# docker images
  
4. 创建容器
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl enable docker  设置开机自启
  
5. 查看容器
[root@localhost ~]# docker ps -a
   
6. 访问测试

   

三、RabbitMQ 六种消息模型

3.1 基本消息模型

   
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  
生产者
新建一个 maven 工程,添加 amqp-client 依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.1</version>
</dependency>
    
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置服务器地址
        factory.setHost("192.168.111.133");
        // 设置端口
        factory.setPort(5672);
        /**
         *  设置账号信息,用户名、密码、vhost
         *  设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
         */
        /*factory.setVirtualHost("/ly");
        factory.setUsername("ly");
        factory.setPassword("123456")*/;
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }

}
      
生产者发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private  final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3. 声明(创建)队列
        // 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细:
         * 1.queue,队列名称
         * 2.durable,是否持久化,如果持久化,mq重启后队列还在
         * 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,
         *              如果将此参数设置为true可用于临时队列的创建
         * 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,
         *              如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1.exchange,交换机,如果不指定将使用mq的欧仁交换机(设置为"")
         * 2.routingKey,路由Key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3.props,消息的属性
         * 4.body,消息的内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("[x] Send '" + message + "'");

        // 关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        channel.close();
        connection.close();
    }

}
    
输出结果:
Connected to the target VM, address: '127.0.0.1:49314', transport: 'socket'
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[x] Send 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:49314', transport: 'socket'
  
Process finished with exit code 0
  

   
web 管理页面:服务器地址/端口号 (本地:192.168.202.103:15672,默认用户及密码:guest guest)
   
消费者接收消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3. 声明(创建)队列
        // 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细:
         * 1.queue,队列名称
         * 2.durable,是否持久化,如果持久化,mq重启后队列还在
         * 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,
         *              如果将此参数设置为true可用于临时队列的创建
         * 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,
         *              如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag   消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsum
             * @param envelope      信封,通过envelope
             * @param properties    消息属性
             * @param body          消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 交换机
                String exchange = envelope.getExchange();
                // 消息id,mq再channel中用来标识消息的id,可用于确认消息已接受
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body, "utf-8");
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        // 参数: String queue,boolean autoAck,Consumer callback
        /**
         * 参数明细:
         * 1.queue      队列名称
         * 2.autoAck    自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,
         *              如果设置为false要通过编程实现回复。
         * 3.callback   消费方法,当消费者接收到消息要执行的方法。
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
   
输出结果:

     
再看看队列的消息,已经被消费了。

   
我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印。
    
3.1.1 消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
    
那么问题来了:RabbitMQ 怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是 RabbitMQ 无从得知,这样消息就丢失了!
  
因此,RabbitMQ 有一个 ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执 ACK 分两种情况:
自动 ACK:消息一旦被接收,消费者自动发送 ACK。
手动 ACK:消息接收后,不会发送 ACK,需要手动调用。
  
大家觉得哪种更好呢?
  
这需要看消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便。
如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
  

我们之前的测试都是自动 ACK 的,如果要手动 ACK,需要改动我们的代码(新建Recv2.java):
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
        final Channel channel = connection.createChannel();
        // 3. 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body, "utf-8");
                System.out.println(" [x] received : " + msg + "!");
                // 手动进行ACK
                /**
                 * void basicAck(long var1, boolean var3) throws IOException;
                 * var1     用来标识消息的id
                 * var3     是否批量 true:将一次性ack所有小于var1的消息
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数false,手动进行ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
   
最后一行代码设置第二个参数为 false
  
channel.basicConsume(QUEUE_NAME, false, consumer);
   
3.1.2 自动 ACK 存在的问题
修改消费者(Recv.java),添加异常,如下:

   
生产者不做任何修改,直接运行,消息发送成功:

   
运行消费者,程序抛出异常:

  
管理界面

  
消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。
  
重新运行生产者发送消息:

   
同样,在手动进行 ack 前抛出异常,(修改 Recv2.java)运行 Recv2

    
再看管理界面

   
消息没有被消费掉!
   
还有另外一种情况:修改消费者 Recv2,把监听队列第二个参数自动改成手动。(去掉之前制造的异常) ,并且消费方法中没手动进行 ACK。

   
生产者代码不变,再次运行:

  
运行消费者 :

   
但是,查看管理界面,发现:

   
停掉消费者的程序,发现:

  
这是因为虽然我们设置了手动 ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次变为 Ready。
   
正确的做法是:我们要在监听队列时设置第二个参数为 false,代码中手动进行 ACK。

    
再次运行消费者,查看 web 管理页面:

  
消费者消费成功!
  
3.2 work 消息模型
工作队列或者竞争消费者模式

   
work queues 与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在 Web 应用程序中特别有用,可以处理短的 HTTP 请求窗口中无法处理复杂的任务。
   
接下来我们来模拟这个流程:
- P:生产者:任务的发布者。
- C1:消费者 1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)。
- C2:消费者 2:领取任务并且完成任务,假设完成速度较快。
  
生产者
import com.ly.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 循环发布任务
        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 2);
        }
        // 关闭通道和连接
        channel.close();
        connection.close();
    }
}
   
消费者 1
import com.ly.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Recv {

    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println(" [消费者1] received : " + msg + "!");
                //模拟任务耗时1s
                try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
   
消费者 2
import com.ly.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Recv2 {

    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println(" [消费者2] received : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
   
生产者循环发送 50 条消息。

   
可以发现,两个消费者各自消费了不同 25 条消息,这就实现了任务的分发。
   
能者多劳
刚才的实现有问题吗?
- 消费者 1 比消费者 2 的效率要低,一次任务的耗时较长。
- 然而两人最终消费的消息数量是一样的。
- 消费者 2 大量时间处于空闲状态,消费者 1 一直忙碌。
   

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
  
怎么实现呢?
通过 BasicQos 方法设置 prefetchCount = 1。这样 RabbitMQ 就会使得每个 Consumer 在同一个时间点最多处理 1 个 Message。换句话说,在接收到该 Consumer 的 ack 前,他它不会将新的 Message 分发给它。相反,它会将其分派给不是仍然忙碌的下一个 Consumer。
   
值得注意的是:prefetchCount 在手动 ack 的情况下才生效,自动 ack 不生效(Recv.java)。

   
再次测试:

   
3.2.1 订阅模型分类
说明下:
1. 一个生产者多个消费者。
2. 每个消费者都有一个自己的队列。
3. 生产者没有将消息直接发送给队列,而是发送给 exchange(交换机、转发器)。
4. 每个队列都需要绑定到交换机上。
5. 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费。
例子:注册->发邮件、发短信
   
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。
  
Exchange 类型有以下几种:
- Fanout:广播,将消息交