rabbitmq 持久化 事务 发送确认模式

时间:2022-09-11 16:18:19

部分内容来自:http://blog.csdn.net/hzw19920329/article/details/54315940 http://blog.csdn.net/hzw19920329/article/details/54340711

持久化

rabbitmq默认没有开启消息的持久化,消息存储在内存中,如果此时重启服务器,那么消息江湖丢失。

开启持久化会牺牲性能。响应时间和吞吐量。

如果需要在崩溃中恢复,那么开启持久化需要做一下3步:

  • 生产者在生产消息的时候,将消息的投递模式设置为2(持久)
  • 发送到持久化的交换器(配置了durable=true)
  • 到达持久化的队列(配置了durable=true)

所以,持久化在集群下工作的并不好,在集群的时候就讲述。

最好只为关键消息做持久化。

事务

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

        RabbitMQ为我们提供了两种方式:

        方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;

        方式二:通过将channel设置成confirm模式来实现;

        这篇博客我们讲解AMQP事务机制,下一篇再探讨channel的confirm模式

        首先,我们通过实例来看看AMQP的事务模式是怎么使用的:

        RabbitMQ中与事务机制有关的方法有三个,分别是Channel里面的txSelect(),txCommit()以及txRollback(),txSelect用于将当前Channel设置成是transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定是到达broker了,如果在txCommit执行之前broker异常奔溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了;

       具体实例:

  1. public class ProducerTest {  
  2.     public static void main(String[] args) {  
  3.         String exchangeName = "confirmExchange";  
  4.         String queueName = "confirmQueue";  
  5.         String routingKey = "confirmRoutingKey";  
  6.         String bindingKey = "confirmRoutingKey";  
  7. ;  
  8.           
     
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("172.16.151.74");  
  11.         factory.setUsername("test");  
  12.         factory.setPassword("test");  
  13. );  
  14.           
     
  15.         //创建生产者  
  16.         Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
  17.         producer.run();  
  18.     }  
  19. }  
  20.   
     
  21. class Sender  
  22. {  
  23.     private ConnectionFactory factory;  
  24.     private int count;  
  25.     private String exchangeName;  
  26.     private String  queueName;  
  27.     private String routingKey;  
  28.     private String bindingKey;  
  29.       
     
  30.     public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
  31.         this.factory = factory;  
  32.         this.count = count;  
  33.         this.exchangeName = exchangeName;  
  34.         this.queueName = queueName;  
  35.         this.routingKey = routingKey;  
  36.         this.bindingKey = bindingKey;  
  37.     }  
  38.       
     
  39.     public void run() {  
  40.         Channel channel = null;  
  41.         try {  
  42.             Connection connection = factory.newConnection();  
  43.             channel = connection.createChannel();  
  44.             //创建exchange  
  45.             channel.exchangeDeclare(exchangeName, "direct", truefalsenull);  
  46.             //创建队列  
  47.             channel.queueDeclare(queueName, truefalsefalsenull);  
  48.             //绑定exchange和queue  
  49.             channel.queueBind(queueName, exchangeName, bindingKey);  
  50.             //发送持久化消息  
  51. ;i < count;i++)  
  52.             {  
  53.                 //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
  54.                 //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
  55.                 //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
  56.                 //开启事务  
  57.                 channel.txSelect();  
  58. )+"条消息").getBytes());  
  59. )  
  60.                 {  
  61. ;  
  62.                 }  
  63.                 //提交事务  
  64.                 channel.txCommit();  
  65.             }  
  66.         } catch (Exception e) {  
  67.             try {  
  68.                 //回滚操作  
  69.                 channel.txRollback();  
  70.             } catch (IOException e1) {  
  71.                 e1.printStackTrace();  
  72.             }  
  73.             e.printStackTrace();  
  74.         }  
  75.     }  
  76. }  

       在第57行通过channel.txSelect方法开启事务,第64行通过channel.txCommit提交事务,为了模拟broker代理服务器异常奔溃或者发布过程中抛出异常,我们通过第61行除以0的操作来模拟(实际中第58行的basicPublish方法是有可能会抛出IOException异常),在捕获到异常之后,第69行调用了channel.txRollback进行事务回滚操作,运行整个程序你会发现在"confirmQueue"这个队列中只存储了一条消息,因为在59行i等于1的时候,抛出了异常,调用了第69行进行了事务回滚操作;在实际应用中,可以在回滚操作之后进行消息重发操作;
       我们来通过抓包看看程序执行过程中发出了哪些请求:

   

       1:第一条消息调用channel.txSelect开启事务

       2:第一条消息调用channel.txCommit提交事务

       3:第二条消息调用channel.txSelect开启事务

       4:因为除以0的操作程序抛出异常,执行catch语句中的channel.txRollback回滚事务

       从上面的分析中,我们知道使用事务确实能够解决发布者与broker代理服务器之间的消息确认,只有消息成功被broker接收事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,就拿上面的程序发送1000条消息,使用事务的话需要58244毫秒,而不使用事务的话仅仅需要89毫秒,因此在实际中使用事务会带来很大的性能损失,那么有没有更好的方法既能保证发布者知道消息已经正确到达,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法的,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

发送确认模式

采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式;

       生产者确认模式实现原理:

       生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

       confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;

       开启confirm模式的方法:

       生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,(注意一点,已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的),如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的,而且我也没找到我们自己能够设置no-wait标志的方法);

       生产者实现confiem模式有三种编程方式:

       (1):普通confirm模式,每发送一条消息,调用waitForConfirms()方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;

       (2):批量confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;

       
 

       讲完了基本的原理之后,代码级别我们该怎么设置channel信道为confirm模式呢?以及我们该怎么获取broker返回给我们的确认消息呢?

       测试1:普通confirm模式

       首先从最简单的开始,仅仅将channel设置成confirm模式,并且生产者每发送一条消息就等待broker回应确认消息,至于确认消息是什么我们不去做任何处理,为了测试方便,此处生产者只发送了5条消息,实现代码如下:

  1. public class ProducerTest {  
  2.     public static void main(String[] args) {  
  3.         String exchangeName = "confirmExchange";  
  4.         String queueName = "confirmQueue";  
  5.         String routingKey = "confirmRoutingKey";  
  6.         String bindingKey = "confirmRoutingKey";  
  7. ;  
  8.           
     
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("172.16.151.74");  
  11.         factory.setUsername("test");  
  12.         factory.setPassword("test");  
  13. );  
  14.           
     
  15.         //创建生产者  
  16.         Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
  17.         producer.run();  
  18.     }  
  19. }  
  20.   
     
  21. class Sender  
  22. {  
  23.     private ConnectionFactory factory;  
  24.     private int count;  
  25.     private String exchangeName;  
  26.     private String  queueName;  
  27.     private String routingKey;  
  28.     private String bindingKey;  
  29.       
     
  30.     public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
  31.         this.factory = factory;  
  32.         this.count = count;  
  33.         this.exchangeName = exchangeName;  
  34.         this.queueName = queueName;  
  35.         this.routingKey = routingKey;  
  36.         this.bindingKey = bindingKey;  
  37.     }  
  38.       
     
  39.     public void run() {  
  40.         Channel channel = null;  
  41.         try {  
  42.             Connection connection = factory.newConnection();  
  43.             channel = connection.createChannel();  
  44.             //创建exchange  
  45.             channel.exchangeDeclare(exchangeName, "direct", truefalsenull);  
  46.             //创建队列  
  47.             channel.queueDeclare(queueName, truefalsefalsenull);  
  48.             //绑定exchange和queue  
  49.             channel.queueBind(queueName, exchangeName, bindingKey);  
  50.             channel.confirmSelect();  
  51.             //发送持久化消息  
  52. ;i < count;i++)  
  53.             {  
  54.                 //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
  55.                 //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
  56.                 //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
  57. )+"条消息").getBytes());  
  58.                 if(channel.waitForConfirms())  
  59.                 {  
  60.                     System.out.println("发送成功");  
  61.                 }  
  62.             }  
  63.             final long start = System.currentTimeMillis();  
  64.             System.out.println("执行waitForConfirmsOrDie耗费时间: "+(System.currentTimeMillis()-start)+"ms");  
  65.         } catch (Exception e) {  
  66.             e.printStackTrace();  
  67.         }  
  68.     }  
  69. }  

       在第50行调用Channel信道的confirmSelect方法将当前信道设置成了confirm模式,第57行通过for循环调用Channel的basicPublish方法发送了5条消息到消息队列中,第58行调用waitForConfirms方法等待broker服务端返回ack或者nack消息,这种模式每发送一条消息就会等待broker代理服务器返回消息,这点我们可以从抓包的角度观察结果:

  

       可以看到上面生产者通过Confirm.Select将当前Channel信道设置成confirm模式,broker代理服务器收到之后回传Confirm.Select-Ok同一将当前Channel设置成confirm模式,此外看到返回5条Basic.Ack消息;

        测试2:批量confirm模式

        这种模式生产者不是每发送一条就等待broker确认,而是发送一批,实现代码见下:

  1. public class ProducerTest {  
  2.     public static void main(String[] args) {  
  3.         String exchangeName = "confirmExchange";  
  4.         String queueName = "confirmQueue";  
  5.         String routingKey = "confirmRoutingKey";  
  6.         String bindingKey = "confirmRoutingKey";  
  7. ;  
  8.           
     
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("172.16.151.74");  
  11.         factory.setUsername("test");  
  12.         factory.setPassword("test");  
  13. );  
  14.           
     
  15.         //创建生产者  
  16.         Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
  17.         producer.run();  
  18.     }  
  19. }  
  20.   
     
  21. class Sender  
  22. {  
  23.     private ConnectionFactory factory;  
  24.     private int count;  
  25.     private String exchangeName;  
  26.     private String  queueName;  
  27.     private String routingKey;  
  28.     private String bindingKey;  
  29.       
     
  30.     public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
  31.         this.factory = factory;  
  32.         this.count = count;  
  33.         this.exchangeName = exchangeName;  
  34.         this.queueName = queueName;  
  35.         this.routingKey = routingKey;  
  36.         this.bindingKey = bindingKey;  
  37.     }  
  38.       
     
  39.     public void run() {  
  40.         Channel channel = null;  
  41.         try {  
  42.             Connection connection = factory.newConnection();  
  43.             channel = connection.createChannel();  
  44.             //创建exchange  
  45.             channel.exchangeDeclare(exchangeName, "direct", truefalsenull);  
  46.             //创建队列  
  47.             channel.queueDeclare(queueName, truefalsefalsenull);  
  48.             //绑定exchange和queue  
  49.             channel.queueBind(queueName, exchangeName, bindingKey);  
  50.             channel.confirmSelect();  
  51.             //发送持久化消息  
  52. ;i < count;i++)  
  53.             {  
  54.                 //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
  55.                 //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
  56.                 //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
  57. )+"条消息").getBytes());  
  58.             }  
  59.             long start = System.currentTimeMillis();  
  60.             channel.waitForConfirmsOrDie();  
  61.             System.out.println("执行waitForConfirmsOrDie耗费时间: "+(System.currentTimeMillis()-start)+"ms");  
  62.         } catch (Exception e) {  
  63.             e.printStackTrace();  
  64.         }  
  65.     }  
  66. }  

       第50行调用channel.confirmSelect将当前channel信道设置成confirm模式,接着在第57行通过for循环发送了100条消息,第60行调用了channel的waitForConfirmsOrDie,从waitForConfirmsOrDie方法的注释上可以看出,该方法会等到最后一条消息得到确认或者得到nack才会结束,也就是说在waitForConfirmsOrDie处会造成当前程序的阻塞,以测试1程序发送100条消息为例,阻塞时间是135ms,我们再来看看对测试1的抓包情况:

   

       从红色箭头的标号1出可以看到:首先是24向74发送了Confirm.Select消息表示请求将当前信道设置为confirm模式,接着74向24回送了Confirm.Select-Ok消息表示同意将信道设置成confirm模式,从红色标号2处NoWait字段的值为false也印证了我们如果直接调用Channel信道的confirmSelect()方法的话,实际上默认是开启broker回传Confirm.Select-Ok确认消息的;  

       接下来我们看看broker回传给客户端的确认消息数据包是什么样子的呢?同样通过抓包看看结果:

  

       你会发现,在上面测试1中我们通过for循环发送了100条消息,但是在抓包的时候我们仅仅看到有两个Basic.Ack确认消息回传回来,原因在于上面截图的标号3处,你会发现Multiple域的值是True的,之前我们已经讲过broker可以设置Multiple域表示broker已经收到当前确认消息的Delivery-Tag域之前标号的消息,以上面截图为例的话表示broker告诉发送者编号4之前的消息已经全部收到了,从这点我们看出broker端默认情况下是进行批量回复的,并不是针对每条消息都发送一条ack消息;

       测试2:

       测试1我们仅仅是测试发送者能够收到broker的确认消息以及知道了broker对消息默认是采用批量回复方式的,那么在程序中我们该怎么获取到broker回传回来的确认消息呢,假如我们有时候需要在收到确认消息之后做一些提示性操作该怎么办呢?测试1中,我们采用的是Channel信道的waitForConfirmsOrDie等待broker端回传回ack确认消息的,但我们没法拿到这个ack消息进行后期操作,要想拿到ack消息的话,我们可以给当前Channel信道绑定监听器,具体来说就是调用Channel信道的addConfirmListener方法进行设置,Channel信道在收到broker的ack消息之后会回调设置在该信道监听器上的handleAck方法,在收到nack消息之后会回调设置在该信道监听器上的handleNack方法。

       实现代码:

  1. public class ProducerTest {  
  2.     public static void main(String[] args) {  
  3.         String exchangeName = "confirmExchange";  
  4.         String queueName = "confirmQueue";  
  5.         String routingKey = "confirmRoutingKey";  
  6.         String bindingKey = "confirmRoutingKey";  
  7. ;  
  8.           
     
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("172.16.151.74");  
  11.         factory.setUsername("test");  
  12.         factory.setPassword("test");  
  13. );  
  14.           
     
  15.         //创建生产者  
  16.         Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
  17.         producer.run();  
  18.     }  
  19. }  
  20.   
     
  21. class Sender  
  22. {  
  23.     private ConnectionFactory factory;  
  24.     private int count;  
  25.     private String exchangeName;  
  26.     private String  queueName;  
  27.     private String routingKey;  
  28.     private String bindingKey;  
  29.       
     
  30.     public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
  31.         this.factory = factory;  
  32.         this.count = count;  
  33.         this.exchangeName = exchangeName;  
  34.         this.queueName = queueName;  
  35.         this.routingKey = routingKey;  
  36.         this.bindingKey = bindingKey;  
  37.     }  
  38.       
     
  39.     public void run() {  
  40.         Channel channel = null;  
  41.         try {  
  42.             Connection connection = factory.newConnection();  
  43.             channel = connection.createChannel();  
  44.             //创建exchange  
  45.             channel.exchangeDeclare(exchangeName, "direct", truefalsenull);  
  46.             //创建队列  
  47.             channel.queueDeclare(queueName, truefalsefalsenull);  
  48.             //绑定exchange和queue  
  49.             channel.queueBind(queueName, exchangeName, bindingKey);  
  50.             channel.confirmSelect();  
  51.             //发送持久化消息  
  52. ;i < count;i++)  
  53.             {  
  54.                 //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
  55.                 //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
  56.                 //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
  57. )+"条消息").getBytes());  
  58.             }  
  59.             long start = System.currentTimeMillis();  
  60.             channel.addConfirmListener(new ConfirmListener() {  
  61.                   
     
  62.                 @Override  
  63.                 public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
  64.                     System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);  
  65.                 }  
  66.                   
     
  67.                 @Override  
  68.                 public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
  69.                     System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);  
  70.                 }  
  71.             });  
  72.             System.out.println("执行waitForConfirmsOrDie耗费时间: "+(System.currentTimeMillis()-start)+"ms");  
  73.         } catch (Exception e) {  
  74.             e.printStackTrace();  
  75.         }  
  76.     }  
  77. }  

       第60行我们调用了Channel信道的addConfirmListener设置了监听器,并且在监听器的handleAck和handleNack方法中打印了信息,运行程序查看输出:

  

       可以看到,虽然我们还是发送了100条消息,同样我们并没有收到100个ack消息 ,只收到两个ack消息,并且这两个ack消息的multiple域都为true,这点和测试1是相同的,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;

       也就是我们通过信道Channel的waitForConfirmsOrDie方法或者为信道设置监听器都可以保证发送者收到broker回传的ack或者nack消息,那么这两种方式有什么区别呢?从测试一的第61行代码以及测试2的第72行代码处你就能找到答案啦,测试1中调用waitForConfirmsOrDie方法发送100条消息并且全部收到确认需要135ms,测试2中通过监听器的方式仅仅需要1ms,说明调用waitForConfirmsOrDie会造成程序的阻塞,通过监听器并不会造成程序的阻塞

 

如果把上面的发送消息部分改为:

for(int
i = 0;i < count;i++)

{

 

channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());

}

TimeUnit.SECONDS.sleep(1);

 

for(int
i = 0;i < count;i++)

{

channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());

}

最终ConfirmListener的打印为:

ack: deliveryTag = 1 multiple: false

ack: deliveryTag = 100 multiple: true

ack: deliveryTag = 106 multiple: true

ack: deliveryTag = 200 multiple: true

我的理解如下:

条投递成功
至少已经持久化在交换器部分

ack: deliveryTag = 100 multiple: true:第2-100条投递成功
至少已经持久化在交换器部分

ack: deliveryTag = 106 multiple: true:第101-106条投递成功
至少已经持久化在交换器部分

ack: deliveryTag = 200 multiple: true:第107-200条投递成功
至少已经持久化在交换器部分