四种途径提高RabbitMQ传输消息数据的可靠性(一)

时间:2021-08-15 03:11:46

前言

RabbitMQ虽然有对队列及消息等的一些持久化设置,但其实光光只是这一个是不能够保障数据的可靠性的,下面我们提出这样的质疑:

(1)RabbitMQ生产者是不知道自己发布的消息是否已经正确达到服务器呢,如果中间发生网络异常等情况呢?消息必然会丢失!

(2)RabbitMQ如果没有设置队列持久化,RabbitMQ服务器重后队列的元数据会丢失,消息自然也会丢失!

(3)RabbitMQ如果消费者设置自动确认,即autoAck为true,那么不管消费者发生什么情况,该消息会自动从队列中移除,实际上消费者有可能挂掉,消息必然会丢失!

(4)RabbitMQ中的消息如果没有匹配到队列时,那么消息也会丢失!

本文其实也就是结合以上四个方面进行讲解的,主要参考《RabbitMQ实战指南》(有需要PDF电子书的可以评论或者私信我),本文截图也来自其中,另外可以对一些RabbitMQ的概念的认识可以参考我的另外两篇博文认识RabbitMQ交换机模型RabbitMQ是如何运转的?


一、设置mandotory参数、AE备份交换器

针对前言中的第(4)个问题,我们可以通过设置mandotory参数与AE备份交换器来解决

1、mandotory参数

  1)当为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,此时RabbitMQ会调用Basic.Return命令将消息返回给生产者,消息将不会丢失

  2)当为false时,消息将会被直接丢弃。

  3)RabbitMQ通过addReturnListener添加ReturnLisener监听器监听获取没有被正确路由到合适队列的消息

channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener(){
  public void handleReturn(int replyCode, String replyText,
                String exchange, String routingKey,
                AMQP.BasicProperties basicProperties,
                byte[] body) throws IOException {
    String message = new String(body);
    System.out.println("Basic.Return 返回的结果是: " + message);
  }
});

2、AE备份交换器

  Alternate Exchange,简称AE,不设置mandatory参数,那么消息将会被丢失,设置mandatory参数的话,需要添加ReturnListner监听器,增加复杂代码,如果既不想增加代码又不想消息丢失,则使用AE,将没有被路由的消息存储于RabbitMQ中。当mandatory参数用AE一起使用时,mandatory将失效。在介绍AE之前,也认识RabbitMQ对于消息的过期时间TTL设置以及队列的过期时间TTL设置

2.1 TTL过期时间设置

  可以对队列设置TTL与消息设置TTL,其中消息设置TTL经常用于死信队列、延迟队列等高级应用中。

  1)设置消息TTL

  设置TTL过期时间一般有两种当时:一是通过队列属性,对队列中所有消息设置相同的TTL。二就是对消息本身单独设置,每条消息TTL不同。如果一起使用时候,TTL小的为准,当一旦超过设置的TTL时间时,就会变成“死信”。

  方式一:针对每条消息设置TTL是通过增加expiration的属性参数实现的,不可能像方式二一样扫描整个队列再判断是否过期,只有当该消息即将被消费时再判定是否过期即可删除,也就是消息即使已经过期,但不一定立马被删除!

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 持久化消息
builder deliveryMode(2);
// 设置 TTL=60000ms
builder expiration( 60000 );
AMQP.BasicProperties properties = builder. build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

  方式二:通过队列属性设置消息TTL是增加x-message-ttl参数实现的,只需要扫描整个队列头部即可立即删除,也就是消息一旦过期就会被删除!

Map<String, Object> argss = new HashMap<String , Object>();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;

  2)设置队列TTL

  通过在队列中添加参数x-message-ttl参数实现设置队列被自动删除前处于未被使用状态的时间,注意是队列的使用状态,并不是消息是否被消费的状态

  设置ttl=30min的队列,时间一到RabbitMQ会保证队列被删除,但是不会保证删除的速度有多快。

Map<String, Object> args = new HashMap<String, Object>{);
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

2.2 AE备份交换器的使用

  声明交换器的时候,添加alternate-exchange参数实现,或通过策略实现。前者优先级高。从代码角度需要以下三个步骤,具体代码如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("a1ternate-exchange", "myAe");
channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args);
channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ;
channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11);
channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key");
channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);

  1)声明normalExchange类型为direct的交换器、类型为fanout的myAe备份交换器;并且normalExchange的备份交换器为myAe(备份交换器建议使用fanout类型交换器)

  2)声明normalQueue队列,声明unrouteQueue队列;

  3)通过路由键normalKey绑定normalExchange与normalQueue,不适用路由键绑定unrouteQueue与myAe

        四种途径提高RabbitMQ传输消息数据的可靠性(一)

二、消费者手动确认

针对前言中第(3)个问题,我们需要在消费者消费完消息后手动进行确认,保证消息数据不丢失!

1、autoAck参数设置

  1) 当autoAck参数为false时,手动确认:

  RabbitMQ会等待消费者显式地回复确认信号后从内存中移去消息(实际上是先标示删除标记,之后再删除),这是一般推荐使用的方式,因为使用手动确认有足够的时间处理消息,不需要担心消费者进程挂掉之后消息丢失问题。此时的消息就会分为两个部分:一是等待投递给消费者的消息;二是已经投递给消费者但还没有收到消费者确认信号的消息。

  2) 当autoAck为true时,自动确认:

  RabbitMQ会自动隐式地回复确认信号后从内存中移去消息, RabbitMQ不需要管消费者是否真正消费了这些消息,RabbitMQ会自动把发送出去的消息置为确认,然后直接从内存中删除。

2、重新投递

  问:如果选择手动确认,即autoAck为false时,消费者由于某些原因断开了,那么消息的确认会受到影响,那么此时的消息会丢失吗?

 这也就是一开始提出来的问题,其实是不必担心消息会被丢失,因为RabbitMQ如果一直没收到消费者的确认信号,并且消费此消息的消费者已经断开,则RabbitMQ会重新安排消息进入队列等待给下一个消费者。也就是RabbitMQ不会设置消息的过期时间(当然也可以设置过期时间,但与之有关系方式消息丢失的特性是死信队列),它只判断是否需要重新安排入队列重新投递,而判断的唯一标准是消费此消息的消费者连接是否已经断开,即RabbitMQ会允许消费一条消息的时间很久很久。

3、消费者拒绝消息

  1)使用channel.basicReject方法,但只能拒绝一条。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

  deliveryTag:消息的唯一标识

  requeue:表示是否可以拒绝的消息重新存入队列

  2)使用channel.basicNack。不同于前者,此方法可以批量拒绝。

void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;

  multiple:设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。  

  3)问:关键在于,消费者拒绝消费消息后怎么处理?是丢弃,还是重新回到队列呢?

当参数requeue设置为true时候,可以重新进入队列,投递给下一个消费者。如果为false,消息就会把队列中消息立马移除,再结合启用“死信队列”,防止消息丢失并且可以分析异常情况的发生。


  最后,由于剩下的两种方式涉及的内容较多,所以在此将分成两篇继续介绍,请看下篇四种途径提高RabbitMQ传输数据的可靠性(二)

四种途径提高RabbitMQ传输消息数据的可靠性(一)的更多相关文章

  1. 四种途径提高RabbitMQ传输数据的可靠性(二)

    前言 上一篇四种途径提高RabbitMQ传输消息数据的可靠性(一)已经介绍了两种方式提高数据可靠性传输的方法,本篇针对上一篇中提出的问题(1)与问题(2)提出解决常用的方法. 本文其实也就是结合以上四 ...

  2. 四种途径提升RabbitMQ传输数据的可靠性

    前言 RabbitMQ虽然有对队列及消息等的一些持久化设置,但其实光光只是这一个是不能够保障数据的可靠性的,下面我们提出这样的质疑: (1)RabbitMQ生产者是不知道自己发布的消息是否已经正确达到 ...

  3. 四种途径将HTML5 web应用变成android应用

    作为下一代的网页语言,HTML5拥有很多让人期待已久的新特性.HTML5的优势之一在于能够实现跨平台游戏编码移植,现在已经有很多公司在移动 设备上使用HTML5技术.随着HTML5跨平台支持的不断增强 ...

  4. 四种数据库随机获取N条数据的方法

    1.SQL Server: SELECT TOP  n  *  FROM  tableName ORDER BY NEWID(); 2.ORACLE: SELECT * FROM (SELECT * ...

  5. Javascript 中使用Json的四种途径

    1.jQuery插件支持的转换方式: 复制代码代码如下: $.parseJSON( jsonstr ); //jQuery.parseJSON(jsonstr),可以将json字符串转换成json对象 ...

  6. 小白鼠排队(map容器插入数据的四种方法)

    题目描述 N只小白鼠(1 <= N <= 100),每只鼠头上戴着一顶有颜色的帽子.现在称出每只白鼠的重量,要求按照白鼠重量从大到小的顺序输出它们头上帽子的颜色.帽子的颜色用“red”,“ ...

  7. USB的四种传输类型与端点

    1.事务 在介绍USB传输类型之前,请允许我先简答介绍一下USB事务. 事务一般由令牌包.数据包(可选).握手包组成. 令牌包:用来启动一个事务,总是由主机发送. 数据包:可以从主机到设备,也可以由设 ...

  8. 160624、Spark读取数据库&lpar;Mysql&rpar;的四种方式讲解

    目前Spark支持四种方式从数据库中读取数据,这里以Mysql为例进行介绍. 一.不指定查询条件 这个方式链接MySql的函数原型是: 1 def jdbc(url: String, table: S ...

  9. RabbitMQ系列&lpar;四&rpar;--消息如何保证可靠性传输以及幂等性

    一.消息如何保证可靠性传输 1.1.可能出现消息丢失的情况 1.Producer在把Message发送Broker的过程中,因为网络问题等发生丢失,或者Message到了Broker,但是出了问题,没 ...

随机推荐

  1. MySQL学习笔记五:数据类型

    MySQL支持多种数据类型,大致可以分为数值,日期/时间和字符类型. 数值类型 MySQL支持所有标准SQL数值数据类型,包括严格数值数据类型(INTEGER.SMALLINT.DECIMAL和NUM ...

  2. 微信小程序购物商城系统开发系列-目录结构

    上一篇我们简单介绍了一下微信小程序的IDE(微信小程序购物商城系统开发系列-工具篇),相信大家都已经蠢蠢欲试建立一个自己的小程序,去完成一个独立的商城网站. 先别着急我们一步步来,先尝试下写一个自己的 ...

  3. MVC代码中如何调用api接口

    关于代码解释,为了方便读者浏览时更好理解代码的含义,我把注释都写在代码里面了.因为一开始我只考虑到功能上的实现并没有考虑代码的优化所以代码我就全写在一个页面了.至于那些生成扑克牌类.计算类等代码优化方 ...

  4. php 常用数组操作

    php常用的数组操作函数,包括数组的赋值.拆分.合并.计算.添加.删除.查询.判断.排序等 array_combine 功能:用一个数组的值作为新数组的键名,另一个数组的值作为新数组的值 <?p ...

  5. WBS练习

    我们把这次团队程序设计分成了6个模块,让每一个同学都能参与其中,然后让每一个人选一个自己喜欢的模块,最后数据库设计这个部分就大家一起来做. Everybody's task allocation is ...

  6. 方格取数&lpar;1&rpar;(HDU 1565状压dp)

    题意: 给你一个n*n的格子的棋盘,每个格子里面有一个非负数. 从中取出若干个数,使得任意的两个数所在的格子没有公共边,就是说所取的数所在的2个格子不能相邻,并且取出的数的和最大.   分析:直接枚举 ...

  7. C&num; DataTable的詳細使用方法

    在项目中经经常使用到DataTable,假设DataTable使用得当,不仅能使程序简洁有用,并且可以提高性能,达到事半功倍的效果,现对DataTable的使用技巧进行一下总结. 一.DataTabl ...

  8. &lbrack;每日一题&rsqb; OCP1z0-047 &colon;2013-08-28 DELETE&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;&period;160

    转载请注明出处:http://blog.csdn.net/guoyjoe/article/details/10475707 正确答案:ACD 根据题库,操作如下: A答案能删除: oe@OCM> ...

  9. 201521123015《Java程序设计》第1周学习总结

    1.本周学习总结 知道了JAVA语言的发展历史和目前使用的版本,还有什么是JDK(Java Development Kit).JRE (Java Runtime Environment).JVM(Ja ...

  10. linux解决病毒系列之一,删除十字符libudev&period;so病毒文件

    前两天被服务器商通知服务器带宽流量增加,我想了想我们服务走的内网,没有什么大的带宽占用,于是我马上登录服务器. 用top命令查看运行情况,我擦,有一个进程吃了很高的cup,于是我赶紧用kill -9 ...