RabbitMQ AMQP 事务机制

时间:2022-05-01 16:19:30

1,在之前的文章中介绍了RabbitMQ的五种队列形式

其中,在工作队列中,为了保证消费者的公平性,采用了channel.basicQos(1),保证了每次只发一条消息给消费者消费,并且使用手动签收的方式,消费完成,主动告知消息中间件,这样就可以发送下一条消息

这是对消费者而言的手动应答模式:

public class Consumer {

    private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";

    public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("consumer01");
Connection connection = MQConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
try {
Thread.sleep(200);
} catch (Exception e) {
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, consumer); //false 代表使用手动消息应答,需要使用channel.basicAck(envelope.getDeliveryTag(),false) 告知消息中间件
} }

2,对于生产者而言,如果生产者发送消息给交换机,或者给消费者,如果失败,怎么知道?

RabbitMQ 中有自带的AMQB 机制中,有事务的效果。。。

package com.aiyuesheng.simple_queue;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.aiyuesheng.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "rabbitmq_simple_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 相当于开启事务 将当前channel设置为transaction模式 开启事务
channel.txSelect();
String msg = "Hello, World :" + i;
System.out.println(msg);
try {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int ii = 1/0;
// 提交事务
channel.txCommit();
} catch (Exception e) {
//事务回滚
channel.txRollback();
} finally {
// channel.close();
// connection.close();
} } } }