文章目录
- 一、基本使用
- 1、环境准备
- 2、Hello World
- (1)生产者代码实例
- (2)消费者代码实例
- 3、抽取工具类代码实例
- 4、多消费者监听一个队列
- (1)生产者代码实例
- (2)多个消费者代码实例
- (3)结果
- 5、手动消息应答
- (1)消息重新入队
- (2)消费者手动应答代码实例
- (3)Multiple 的解释
- 6、消息持久化
- 7、消费者消费限流
- 8、发布确认
- (1)单个发布确认
- (2)批量发布确认
- (3)异步发布确认
- 二、使用Exchanges交换机
- 1、Exchanges 概念
- 2、扇出(fanout)
- 3、直接(direct)
- (1)实战
- 4、主题(topic)
- 5、headers
- 三、整合SpringBoot
- 1、环境初始化
- 2、生产者
- 3、消费者
一、基本使用
1、环境准备
(1)下载安装RabbitMQ
官网地址:https://www.rabbitmq.com/download.html
RabbitMQ-核心概念解析与安装手册
(2)导包
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
2、Hello World
单个消费者监听单个队列。
(1)生产者代码实例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者:发消息
*/
public class Producer {
// 队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try ( // 获取链接
Connection connection = factory.newConnection();
// 获取channel
Channel channel = connection.createChannel()
) {
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
(2)消费者代码实例
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
3、抽取工具类代码实例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
// 获取连接channel工具类
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
4、多消费者监听一个队列
多个消费者监听单个队列,生产者生产消息之后,所有消费者轮询地处理消息。每个消息只能被一个消费者消费一次,相当于消费者做了负载均衡。
(1)生产者代码实例
import com.rabbitmq.client.Channel;
import java.util.Scanner;
// 生产者不断地生产消息
public class Producer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
}
(2)多个消费者代码实例
import com.boot.security.server.myLearn.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
// 消费者1
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
System.out.println("C1 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
可以同时启动多个消费者进行消费。
(3)结果
生产者:
请输入要发送的消息...
111
发送消息完成:111
222
发送消息完成:222
333
发送消息完成:333
444
发送消息完成:444
消费者1:
C1 消费者启动等待消费......
接收到消息:111
接收到消息:333
消费者2:
C2 消费者启动等待消费......
接收到消息:222
接收到消息:444
5、手动消息应答
Channel.basicAck(用于肯定确认)RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)与 Channel.basicNack 相比少一个参数。不处理该消息了直接拒绝,可以将其丢弃了。
(1)消息重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
(2)消费者手动应答代码实例
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
// 消费者
public class Consumer {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
/**
* 1.消息标记tag
* 2.是否批量应答未应答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
System.out.println("消费者启动等待消费......");
boolean autoAck = false; // 手动消息应答
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
(3)Multiple 的解释
basicAck的第二个参数是一个boolean值。true 代表批量应答 channel 上未应答的消息。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
6、消息持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
7、消费者消费限流
channel.basicQos(1);
以上设置可以保证该消费者同一时间只能消费一条数据,只有该数据应答完毕才能处理其他数据。
8、发布确认
(1)单个发布确认
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
//从控制台当中接受信息
System.out.println("请输入要发送的消息...");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 服务端返回false或超时时间内未返回,生产者可以消息重发
channel.waitForConfirms();
System.out.println("发送消息完成:" + message);
}
}
(2)批量发布确认
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms(); // 阻塞
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
}
(3)异步发布确认
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
}
}
二、使用Exchanges交换机
1、Exchanges 概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列
。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
生产者只能将消息发送到交换机(exchange)
,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
Exchanges 总共有以下类型:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
之前我们使用的Exchanges都是空字符串,也就是默认的交换机。
channel.basicPublish("", queueName, null, message.getBytes());
2、扇出(fanout)
扇出(fanout)是将接收到的所有消息广播到它知道的所有队列中。
// 生产者
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
// 消费者,多个消费者都可以同时接收到消息
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台1打印接收到的消息" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
3、直接(direct)
bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。
上图,Q1只能消费orange的消息,Q2只能消费black和green的消息。
Q1和Q2同时绑定black之后,与扇出(fanout)就类似了。
(1)实战
// 生产者
public class EmitDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("blue", "blue 信息");
bindingKeyMap.put("orange", "orange 信息");
bindingKeyMap.put("green", "green 信息");
//black 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("black", "black 信息");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
}
// 消费者
public class ReceiveDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "blue"); // 绑定routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "orange"); // 绑定routingKey
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message = "C1接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "green"); // 绑定routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "blue"); // 绑定routingKey
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message = "C2接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
4、主题(topic)
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的:
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
5、headers
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
再绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则路由该消息到此队列中。
headers类型的交换器的性能很差,不建议使用。
三、整合SpringBoot
1、环境初始化
(1)导包
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
2、生产者
@Autowired
private RabbitTemplate rabbitTemplate;
// 指定交换机、routekey
rabbitTemplate.convertAndSend("exchange", "routekey", "my test");
3、消费者
@Component
public class Consumer {
/**
* 监听直播状态改变事件
*/
@RabbitListener(bindings = @QueueBinding(
// 指定队列
value = @Queue(value = "routekey.queue"),
// 指定交换机
exchange = @Exchange(value = "exchange", type = ExchangeTypes.TOPIC),
// routing key
key = "routekey"),
// 应答模式为手动应答
ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
System.out.println("接收到消息:" + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}