RabbitMQ简介、安装、基本特性API--Java测试
新的阅读体验地址:http://www.zhouhong.icu/post/141
本篇文章所有的代码:https://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo
一、初识RabbitMQ
是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
AMQP协议Advanced Message Queuing Protocol(高级消息队列协议)
定义:具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,
是应用层协议的一个开放标准,为面向消息中间件设计。
AMQP专业术语:
- Server:又称broker,接受客户端的链接,实现AMQP实体服务
- Connection:连接,应用程序与broker的网络连接
- Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
- Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
- virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
- Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
- Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
- Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)
RabbitMQ整体架构
Exchange和队列是多对多关系,实际操作一般为1个exchange对多个队列,为避免设计过于复杂.
二、单机版快速安装
- 1、首先在Linux上进行一些软件的准备工作
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 3、安装服务命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 4、启动
启动服务
systemctl start rabbitmq-server
查看是否启动
lsof -i:5672
- 5、启动、安装web管理插件(管控台)
rabbitmq-plugins enable rabbitmq_management
- 6、查看管理端口有没有启动
lsof -i:15672
或者:
netstat -tnlp | grep 15672
- 7、添加用户
#添加用户 用户名 admin 密码 admin web管理工具可用此用户登录
sudo rabbitmqctl add_user admin admin
#设置用户角色 管理员
sudo rabbitmqctl set_user_tags admin administrator
#设置用户权限(接受来自所有Host的所有操作)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"
#查看用户权限
sudo rabbitmqctl list_user_permissions admin
- 重新启动
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
- 访问:http://192.168.2.121:15672/ 使用 admin 登录
- 代码测试
- 引入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2.发送端:
package com.zhouhong.rabbitmq.api.helloworld; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { public static void main(String[] args) throws Exception { // 1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 2 创建Connection Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 声明 String queueName = "test001"; // 参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数 channel.queueDeclare(queueName, false, false, false, null); Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for(int i = 0; i < 5;i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName , props , msg.getBytes()); } } }
3.接收端
package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
// durable 是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
4.结果(先启动接收端进行监控,再启动发送端)
收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4
三、RabbitMQ----交换机
- Name:交换机名称。
- Type:交换机类型 direct、topic、fanout、headers。
- Durability:是否持久化,ture为持久化。
- Auto Delete :当最后一个绑定道Exchange上的队列删除后,自动删除该Exchange。
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False。
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用。
- DirectExchange的消息被转发道RouteKey中指定的Queue。
交换机-----Direct exchange
Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
代码:
- 发送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
//必须要和接收端 routingKey 一一对应
String routingKey = "test_direct_routingKey";
//5 发送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
- 接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
交换机-----topic exchange
exchange 将Routekey和某个topic进行一个模糊匹配,发送给对应队列、可以用通配符进行匹配
比如下面例子
代码:
- 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// 只能匹配一个 例如:user.txt、user.py都可以,但是user.txt.py 不行
//String routingKey = "user.*";
// user.txt、user.py 、user.txt.py 都可以匹配到
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer1 start.. ");
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
- 发送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
交换机-----Fanout exchange 广播模式
1.不处理路由键,只需要简单的将队列绑定到交换机上。
2.发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
3.Fanout交换机转发消息是最快的。
代码:见示例文章开始GitHub地址
四、RabbitMQ高级特性
1、消息如何保障 100% 的投递成功
生产端的可靠性投递的标志:
1、消息成功发出
2、mq节点成功接收
3、发送端MQ节点确认应答
4、完善的消息补偿机制
解决:消息信息落库,对消息状态进行打标
幂等性
1、 select count(1) from t_order where id = 唯一id(或)指纹码
2、唯一id或指纹码机制,利用数据库主键去重
2、Confirm
第一步:再channel上开启确认模式:channel.confirmSelect();
第二步:再channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日期等后续处理!
3、return消息机制
ReturnListener用于处理不可路由的消息
我们的消息生产者,通过指定一个Exchage和Routingkey,把消息送达某一个队列中去,然后我们的消费者监听队列,进行消费处理操作,如果没有合适的队列,则会由returnListener进行接受。
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。
4、消费端ACK与重回队列
消费端ACK:
- 在工作的时候一般不会选择自动ack
- 消费端的手工ack分为两种ACK和NACK
- 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。这种建议回复NACK,不要重回队列
- 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功
消费端的重回队列
- 是为了对没有处理成功的消息,把消息重新会投递给broker。
- 重回队列,会回到队列的尾部
- 也会造成一条消息一直重复投递,死循环了
- 在实际应用中,都会关闭重回队列,也就是设置为false
5、TTL队列和消息
TTL: time to live的缩写,也就是生存时间。
- RabbitMQ 支持消息过期时间,在消息发送时可以进行指定
- RabbitMQ支持队列过期时间,从消息入队列开始计算,只要超过了队列的超时间时间配置,那么消息会自动的清除
死队列: DLX,Dead-Letter-Exchange
- 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX.
消息变成死信的几种情况
- 消息被拒绝 并且requeue = false
- 消息TTL过期
- 队列达到最大长度
DLX也是一个正常的Exchange,实际上是一个属性控制
- 当队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上,进而被路由到另一个队列.
- 可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitMQ3.0以前的immediate参数功能。
- 在正常队列上添加参数:arguments.put("x-dead-letter-exchange","dlx.exchange");这样消息过期、requeue、队列达到最大长度时,就可以直接路由到死信队列。