背景
之前写的 Redis Cluster部署、管理和测试 和 Redis 5.0 redis-cli --cluster help说明 已经比较详细的介绍了如何安装和维护Cluster。但关于Cluster各个节点的通信和原理没有说明,为了方便自己以后查阅,先做些记录。顺便对Redis 4.0和5.0的相关特性也做下说明。
Redis 4.0 新功能说明
Redis4.0版本增加了很多新的特性,如:
Redis Memeory Command:详细分析内存使用情况,内存使用诊断,内存碎片回收;
PSYNC2:解决failover和从实例重启不能部分同步;
LazyFree: 再也不用怕big key的删除引起集群故障切换;
LFU: 支持近似的LFU内存淘汰算法;
Active Memory Defragmentation:内存碎片回收效果很好(实验阶段);
Modules: Redis成为更多的可能(觉得像mongo/mysql引入engine的阶段);
一、Lazyfree
redis-4.0带来的Lazyfree机制可以避免del,flushdb/flushall,rename等命令引起的redis-server阻塞,提高服务稳定性。
① unlink
在redis-4.0之前,redis执行del命令会在释放掉key的所有内存以后才会返回OK,这在key比较大的时候(比如说一个hash里头有1000W条数据),其他连接可能要等待很久。为了兼容已有的del语义,redis-4.0引入unlink命令,效果以及用法和del完全一样,但内存释放动作放到后台线程中执行。
UNLINK key [key ...]
② flushdb/flushall
flushdb/flushall在redis-4.0中新引入了选项,可以指定是否使用Lazyfree的方式来清空整个内存。
FLUSHALL [ASYNC]
FLUSHDB [ASYNC]
③ rename
执行 rename oldkey newkey 时,如果newkey已经存在,redis会先删除,这也会引发上面提到的删除大key问题。
lazyfree-lazy-server-del yes/no
④ 其他场景
某些用户对数据设置过期时间,依赖redis的淘汰机制去删除已经过期的数据,这同样也存在上面提到的问题,淘汰某个大key会导致进程CPU出现抖动,redis-4.0提供了两个配置,可以让redis在淘汰或者逐出数据时也使用lazyfree的方式。
lazyfree-lazy-eviction yes/no
lazyfree-lazy-expire yes/no
二、memory
redis-4.0之前只能通过info memory来了解redis内部有限的内存信息,4.0提供了memory命令,帮助用户全面了解redis的内存状态。
127.0.0.1:> memory help
) "MEMORY DOCTOR - Outputs memory problems report"
) "MEMORY USAGE <key> [SAMPLES <count>] - Estimate memory usage of key"
) "MEMORY STATS - Show memory usage details"
) "MEMORY PURGE - Ask the allocator to release memory"
) "MEMORY MALLOC-STATS - Show allocator internal stats"
① memory usage
usage子命令可以查看某个key在redis内部实际占用多少内存,这里有两点需要说明:
1. 不光key, value需要占用内存,redis管理这些数据还需要一部分内存
2. 对于hash, list, set, sorted set这些类型,结果是采样计算的,可以通过SAMPLES 来控制采样数量
在redis 4.0之前,我们只能通过info memory查看redis实例的内存大体使用状况;而内存的使用细节,比如expire的消耗,client output buffer, query buffer等是很难直观显示的。 memory stats命令就是为展现redis内部内存使用细节。
> memory stats
1) "peak.allocated" # Redis消耗的峰值内存(以字节为单位,请参阅INFO的used_memory_peak)
2) (integer) 4861848
3) "total.allocated" # Redis使用其分配器分配的字节总数(请参阅INFO的used_memory)
4) (integer) 4824184
5) "startup.allocated" # Redis在启动时消耗的初始内存量(以字节为单位)(请参阅INFO的used_memory_startup)
6) (integer) 1449744
7) "replication.backlog" # 复制积压的大小(以字节为单位)(请参阅INFO的repl_backlog_active)
8) (integer) 1048576
9) "clients.slaves" # 所有副本开销(输出和查询缓冲区,连接上下文)的总大小(以字节为单位)
10) (integer) 16922
11) "clients.normal" # 所有客户端开销(输出和查询缓冲区,连接上下文)的总大小(以字节为单位)
12) (integer) 49694
13) "aof.buffer" # 当前和重写AOF缓冲区的总大小(以字节为单位)(分别参见INFO的aof_buffer_length和aof_rewrite_buffer_length)
14) (integer) 0
15) "lua.caches" # Lua脚本的缓存开销的总大小(以字节为单位)
16) (integer) 0
17) "db.0" # 对于服务器的每个数据库,主字典和到期字典的开销(分别是overhead.hashtable.main和header.hashtable.expires)以字节为单位报告
18) 1) "overhead.hashtable.main"
2) (integer) 152
3) "overhead.hashtable.expires"
4) (integer) 0
19) "overhead.total" # 分配用于管理其内部数据结构的所有开销的总字节数,包括startup.allocated, replication.backlog, clients.slaves, clients.normal, aof.buffer及用于管理Redis键空间的内部数据结构的总和(请参阅INFO的used_memory_overhead)
20) (integer) 2565088
21) "keys.count" # 服务器中所有数据库中存储的key总数
22) (integer) 3
23) "keys.bytes-per-key" # 净内存使用量(总内存分配量减去启动内存分配量)与keys.count之间的比率
24) (integer) 1124813
25) "dataset.bytes" # 数据集的大小,以字节为单位,即从total.allocated中减去的总开销(请参阅INFO的used_memory_dataset)
26) (integer) 2259096
27) "dataset.percentage" # 净内存使用量中dataset.bytes的百分比
28) "66.947288513183594"
29) "peak.percentage" # peak.allocated占total.allocated的百分比
30) "99.225318908691406"
31) "allocator.allocated" # 分配器相关
32) (integer) 5244456
33) "allocator.active"
34) (integer) 5582848
35) "allocator.resident"
36) (integer) 12320768
37) "allocator-fragmentation.ratio"
38) "1.0645236968994141"
39) "allocator-fragmentation.bytes"
40) (integer) 338392
41) "allocator-rss.ratio"
42) "2.2068965435028076"
43) "allocator-rss.bytes"
44) (integer) 6737920
45) "rss-overhead.ratio" # 内存比率,内存占用物理内存比例。
46) "0.73503988981246948"
47) "rss-overhead.bytes" # 使用的物理内存的字节数
48) (integer) -3264512
49) "fragmentation" # 碎片率,请参阅INFO的mem_fragmentation_ratio
50) "1.8933625221252441"
51) "fragmentation.bytes" # 碎片大小,字节为单位。
52) (integer) 4273096
③ memory doctor
主要用于给一些诊断建议,提前发现潜在问题。
memory purge命令通过调用jemalloc内部命令,进行内存释放,尽量把redis进程占用但未有效使用内存,即常说的内存碎片释放给操作系统。只适用于使用jemalloc作为allocator的实例。
⑤ memory malloc-stats
用于打印allocator内部的状态,目前只支持jemalloc。
三、LFU
redis-4.0新增了 allkey-lfu 和 volatile-lfu 两种数据逐出策略,同时还可以通过object命令来获取某个key的访问频度。
object freq user_key
基于LFU机制,用户可以使用 scan + object freq 来发现热点key,当然redis也一起发布了更好用的 :
redis-cli --hotkeys
四、psync2
Redis4.0新特性psync2(partial resynchronization version2)部分重新同步(partial resync)增加版本;主要解决Redis运维管理过程中,从实例重启和主实例故障切换等场景带来的全量重新同步(full resync)问题。
五、持久化
redis有两种持久化的方式——RDB和AOF其中RDB是一份内存快照AOF则为可回放的命令日志他们两个各有特点也相互独立。4.0开始允许使用RDB-AOF混合持久化的方式结合了两者的优点通过aof-use-rdb-preamble配置项可以打开混合开关。
Redis 5.0 新功能说明
Redis5.0版是Redis产品的重大版本发布,它的最新特点:
新的流数据类型(Stream data type) https://redis.io/topics/streams-intro
新的 Redis 模块 API:定时器、集群和字典 API(Timers, Cluster and Dictionary APIs)
RDB 增加 LFU 和 LRU 信息
集群管理器从 Ruby (redis-trib.rb) 移植到了redis-cli 中的 C 语言代码
新的有序集合(sorted set)命令:ZPOPMIN/MAX 和阻塞变体(blocking variants)
升级 Active defragmentation 至 v2 版本
增强 HyperLogLog 的实现
更好的内存统计报告
许多包含子命令的命令现在都有一个 HELP 子命令
客户端频繁连接和断开连接时,性能表现更好
许多错误修复和其他方面的改进
升级 Jemalloc 至 5.1 版本
引入 CLIENT UNBLOCK 和 CLIENT ID
新增 LOLWUT 命令 http://antirez.com/news/123
在不存在需要保持向后兼容性的地方,弃用 "slave" 术语
网络层中的差异优化
Lua 相关的改进
引入动态的 HZ(Dynamic HZ) 以平衡空闲 CPU 使用率和响应性
对 Redis 核心代码进行了重构并在许多方面进行了改进
Redis Cluster总览
一、简介
在官方文档Cluster Spec中,作者详细介绍了Redis集群为什么要设计成现在的样子。最核心的目标有三个:
性能:增加集群功能后不能对性能产生太大影响,所以Redis采取了P2P而非Proxy方式、异步复制、客户端重定向等设计。
水平扩展:文档中称可以线性扩展到1000结点。
可用性:在Cluster推出之前,可用性要靠Sentinel保证。有了集群之后也自动具有了Sentinel的监控和自动Failover能力。
如果需要全面的了解,那一定要看官方文档Cluster Tutorial。
Redis Cluster是一个高性能高可用的分布式系统。由多个Redis实例组成的整体,数据按照Slot存储分布在多个Redis实例上,通过Gossip协议来进行节点之间通信。功能特点如下:
所有的节点相互连接
集群消息通信通过集群总线通信,集群总线端口大小为客户端服务端口+10000(固定值)
节点与节点之间通过二进制协议进行通信
客户端和集群节点之间通信和通常一样,通过文本协议进行
集群节点不会代理查询
数据按照Slot存储分布在多个Redis实例上
集群节点挂掉会自动故障转移
可以相对平滑扩/缩容节点
关于Cluster相关的源码可以见:src/cluster.c 和 src/cluster.h
二、通信
2.1 CLUSTER MEET
需要组建一个真正的可工作的集群,我们必须将各个独立的节点连接起来,构成一个包含多个节点的集群。连接各个节点的工作使用CLUSTER MEET命令来完成。
CLUSTER MEET <ip> <port>
CLUSTER MEET命令实现:
节点A会为节点B创建一个 clusterNode 结构,并将该结构添加到自己的 clusterState.nodes 字典里面。
节点A根据CLUSTER MEET命令给定的IP地址和端口号,向节点B发送一条MEET消息。
节点B接收到节点A发送的MEET消息,节点B会为节点A创建一个clusterNode结构,并将该结构添加到自己的clusterState.nodes字典里面。
节点B向节点A返回一条PONG消息。
节点A将受到节点B返回的PONG消息,通过这条PONG消息节点A可以知道节点B已经成功的接收了自己发送的MEET消息。
节点A将向节点B返回一条PING消息。
节点B将接收到的节点A返回的PING消息,通过这条PING消息节点B可以知道节点A已经成功的接收到了自己返回的PONG消息,握手完成。
节点A会将节点B的信息通过Gossip协议传播给集群中的其他节点,让其他节点也与节点B进行握手,最终,经过一段时间后,节点B会被集群中的所有节点认识。
2.2 消息处理 clusterProcessPacket
更新接收消息计数器
查找发送者节点并且不是handshake节点
更新自己的epoch和slave的offset信息
处理MEET消息,使加入集群
从goosip中发现未知节点,发起handshake
对PING,MEET回复PONG
根据收到的心跳信息更新自己clusterState中的master-slave,slots信息
对FAILOVER_AUTH_REQUEST消息,检查并投票
处理FAIL,FAILOVER_AUTH_ACK,UPDATE信息
2.3 定时任务clusterCron
对handshake节点建立Link,发送Ping或Meet
向随机节点发送Ping
如果是从查看是否需要做Failover
统计并决定是否进行slave的迁移,来平衡不同master的slave数
判断所有pfail报告数是否过半数
2.4 心跳数据
集群中的节点会不停(每几秒)的互相交换ping、pong包,ping和pong包具有相同的结构,只是类型不同,ping、pong包合在一起叫做心跳包。通常节点会发送ping包并接收接收者返回的pong包,不过这也不是绝对,节点也有可能只发送pong包,而不需要让接收者发送返回包。
节点间通过ping保持心跳以及进行gossip集群状态同步,每次心跳时,节点会带上多个clusterMsgDataGossip消息体,经过多次心跳,该节点包含的其他节点信息将同步到其他节点。
ping和pong包的内容可以分为header和gossip消息两部分:
- 发送消息头信息Header
- 所负责slots的信息
- 主从信息
- ip, port信息
- 状态信息
包含的信息:
NODE ID是一个160bit的伪随机字符串,它是节点在集群中的唯一标识
currentEpoch和configEpoch字段
node flag,标识节点是master还是slave,另外还有一些其他的标识位,如PFAIL和FAIL。
节点提供服务的hash slot的bitmap
发送者的TCP端口
发送者认为的集群状态(down or ok)
如果是slave,则包含master的NODE ID
- 发送其他节点Gossip信息。包含了该节点认为的其他节点的状态,不过不是集群的全部节点(随机)
- ping_sent, pong_received
- ip, port信息
- 状态信息,比如发送者认为该节点已经不可达,会在状态信息中标记其为PFAIL或FAIL
包含的信息:
NODE ID
节点的IP和端口
NODE flags
clusterMsg结构的currentEpoch、sender、myslots等属性记录了发送者自身的节点信息,接收者会根据这些信息,在自己的clusterState.nodes字典里找到发送者对应的结构,并对结构进行更新。
Redis集群中的各个节点通过ping来心跳,通过Gossip协议来交换各自关于不同节点的状态信息,其中Gossip协议由MEET、PING、PONG三种消息实现,这三种消息的正文都由两个clusterMsgDataGossip结构组成。
每次发送MEET、PING、PONG消息时,发送者都从自己的已知节点列表中随机选出两个节点(可以是主节点或者从节点),并将这两个被选中节点的信息分别保存到两个结构中。当接收者收到消息时,接收者会访问消息正文中的两个结构,并根据自己是否认识clusterMsgDataGossip结构中记录的被选中节点进行操作:
如果被选中节点不存在于接收者的已知节点列表,那么说明接收者是第一次接触到被选中节点,接收者将根据结构中记录的IP地址和端口号等信息,与被选择节点进行握手。
如果被选中节点已经存在于接收者的已知节点列表,那么说明接收者之前已经与被选中节点进行过接触,接收者将根据clusterMsgDataGossip结构记录的信息,对被选中节点对应的clusterNode结构进行更新。
2.5 数据结构
clusterNode 结构保存了一个节点的当前信息, 如记录了节点负责处理那些槽、创建时间、节点的名字、节点当前的配置纪元、节点的 IP 和端口等:
slots:位图,由当前clusterNode负责的slot为1
salve, slaveof:主从关系信息
ping_sent, pong_received:心跳包收发时间
clusterLink *link:节点间的连接
list *fail_reports:收到的节点不可达投票
clusterState 结构记录了在当前节点的集群目前所处的状态还有所有槽的指派信息:
myself:指针指向自己的clusterNode
currentEpoch:当前节点的最大epoch,可能在心跳包的处理中更新
nodes:当前节点记录的所有节点的字典,为clusterNode指针数组
slots:slot与clusterNode指针映射关系
migrating_slots_to,importing_slots_from:记录slots的迁移信息
failover_auth_time,failover_auth_count,failover_auth_sent,failover_auth_rank,failover_auth_epoch:Failover相关信息
clusterLink 结构保存了连接节点的有关信息, 比如套接字描述符, 输入缓冲区和输出缓冲区。
三、数据分布及槽信息
3.1 槽(slot)概念
Redis Cluster中有一个16384长度的槽的概念,他们的编号为0、1、2、3……16382、16383。这个槽是一个虚拟的槽,并不是真正存在的。正常工作的时候,Redis Cluster中的每个Master节点都会负责一部分的槽,当有某个key被映射到某个Master负责的槽,那么这个Master负责为这个key提供服务,至于哪个Master节点负责哪个槽,这是可以由用户指定的,也可以在初始化的时候自动生成。在Redis Cluster中,只有Master才拥有槽的所有权,如果是某个Master的slave,这个slave只负责槽的使用,但是没有所有权。
3.2 数据分片
在Redis Cluster中,拥有16384个slot,这个数是固定的,存储在Redis Cluster中的所有的键都会被映射到这些slot中。数据库中的每个键都属于这 16384 个哈希槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽,其中 CRC16(key) 用于计算键 key 的 CRC16 校验和,集群中的每个节点负责处理一部分哈希槽。
3.3 节点的槽指派信息
clusterNode结构的slots属性和numslot属性记录了节点负责处理那些槽:
struct clusterNode { //… unsignedchar slots[/]; };
Slots属性是一个二进制位数组(bitarray),这个数组的长度为16384/8=2048个字节,共包含16384个二进制位。Master节点用bit来标识对于某个槽自己是否拥有。比如对于编号为1的槽,Master只要判断序列的第二位(索引从0开始)是不是为1即可。时间复杂度为O(1)。
3.4 集群所有槽的指派信息
通过将所有槽的指派信息保存在clusterState.slots数组里面,程序要检查槽i是否已经被指派,又或者取得负责处理槽i的节点,只需要访问clusterState.slots[i]的值即可,复杂度仅为O(1)。
3.5 请求重定向
由于每个节点只负责部分slot,以及slot可能从一个节点迁移到另一节点,造成客户端有可能会向错误的节点发起请求。因此需要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不同的重定向场景:
a) MOVED错误
请求的key对应的槽不在该节点上,节点将查看自身内部所保存的哈希槽到节点 ID 的映射记录,节点回复一个 MOVED 错误。
需要客户端进行再次重试。
b) ASK错误
请求的key对应的槽目前的状态属于MIGRATING状态,并且当前节点找不到这个key了,节点回复ASK错误。ASK会把对应槽的IMPORTING节点返回给你,告诉你去IMPORTING的节点查找。
客户端进行重试 首先发送ASKING命令,节点将为客户端设置一个一次性的标志(flag),使得客户端可以执行一次针对 IMPORTING 状态的槽的命令请求,然后再发送真正的命令请求。
不必更新客户端所记录的槽至节点的映射。
四、数据迁移
当槽x从Node A向Node B迁移时,Node A和Node B都会有这个槽x,Node A上槽x的状态设置为MIGRATING,Node B上槽x的状态被设置为IMPORTING。
MIGRATING状态
如果key存在则成功处理
如果key不存在,则返回客户端ASK,客户端根据ASK首先发送ASKING命令到目标节点,然后发送请求的命令到目标节点
当key包含多个:
如果都存在则成功处理
如果都不存在,则返回客户端ASK
如果一部分存在,则返回客户端TRYAGAIN,通知客户端稍后重试,这样当所有的key都迁移完毕的时候客户端重试请求的时候回得到ASK,然后经过一次重定向就可以获取这批键
此时不刷新客户端中node的映射关系
IMPORTING状态
如果key不在该节点上,会被MOVED重定向,刷新客户端中node的映射关系
如果是ASKING则命令会被执行,key不在迁移的节点已经被迁移到目标的节点
Key不存在则新建
Key迁移的命令:
DUMP:在源(migrate)上执行
RESTORE:在目标(importing)上执行
DEL:在源(migrate)上执行
经过上面三步可以将键迁移,然后再将处于MIGRATING和IMPORTING状态的槽变为常态,完成整个重新分片的过程,具体的信息可见Redis Cluster部署、管理和测试 。
4.1 读写请求
槽里面的key还未迁移,并且槽属于迁移中。
假如槽x在Node A,需要迁移到Node B上,槽x的状态为migrating,其中的key1还没轮到迁移。此时访问key1则先计算key1所在的Slot,存在key1则直接返回。
4.2 MOVED请求
槽里面的key已经迁移过去,并且槽属于迁移完。
假如槽x在Node A,需要迁移到Node B上,迁移完成。此时访问key1则先计算key1所在的Slot,因为已经迁移至Node B上,Node A上不存在,则返回 moved slotid IP:PORT,再根据返回的信息去Node B访问key1,此时更新slot和node的映射。
4.3 ASK请求
槽里面的key已经迁移完,并且槽属于迁移中的状态。
假如槽x在Node A,需要迁移到Node B上,迁移完成,但槽x的状态为migrating。此时访问key1则先计算key1所在的Slot,不存在key1则返回ask slotid IP:PORT,再根据ask返回的信息发送asking请求到Node B,没问题后则最后再去Node B*问key1,此时不更新slot和node的映射。
五、通信故障
5.1 故障检测
集群中的每个节点都会定期地向集群中的其他节点发送PING消息,以此交换各个节点状态信息,检测各个节点状态:在线状态、疑似下线状态PFAIL、已下线状态FAIL。
当主节点A通过消息得知主节点B认为主节点D进入了疑似下线(PFAIL)状态时,主节点A会在自己的clusterState.nodes字典中找到主节点D所对应的clusterNode结构,并将主节点B的下线报告(failure report)添加到clusterNode结构的fail_reports链表中。
struct clusterNode {
//...
//记录所有其他节点对该节点的下线报告
list*fail_reports;
//...
};
如果集群里面,半数以上的主节点都将主节点D报告为疑似下线,那么主节点D将被标记为已下线(FAIL)状态,将主节点D标记为已下线的节点会向集群广播主节点D的FAIL消息,所有收到FAIL消息的节点都会立即更新nodes里面主节点D状态标记为已下线。
将 node 标记为 FAIL 需要满足以下两个条件:
有半数以上的主节点将 node 标记为 PFAIL 状态。
当前节点也将 node 标记为 PFAIL 状态。
5.2 多个从节点选主
选新主的过程基于Raft协议选举方式来实现的:
当从节点发现自己的主节点进行已下线状态时,从节点会广播一条CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求所有收到这条消息,并且具有投票权的主节点向这个从节点投票
如果一个主节点具有投票权,并且这个主节点尚未投票给其他从节点,那么主节点将向要求投票的从节点返回一条,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示这个主节点支持从节点成为新的主节点
每个参与选举的从节点都会接收CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,并根据自己收到了多少条这种消息来统计自己获得了多少主节点的支持
如果集群里有N个具有投票权的主节点,那么当一个从节点收集到大于等于集群N/+1张支持票时,这个从节点就成为新的主节点
如果在一个配置纪元没有从能够收集到足够的支持票数,那么集群进入一个新的配置纪元,并再次进行选主,直到选出新的主节点为止
5.3 故障转移
错误检测用于识别集群中的不可达节点是否已下线,如果一个master下线,会将它的slave提升为master,在gossip消息中,NODE flags的值包括两种PFAIL和FAIL。
PFAIL flag:
如果一个节点发现另外一个节点不可达的时间超过NODE_TIMEOUT ,则会将这个节点标记为PFAIL,即Possible failure(可能下线)。节点不可达是说一个节点发送了ping包,但是等待了超过NODE_TIMEOUT时间仍然没有收到回应(NODE_TIMEOUT必须大于一个网络包来回的时间)。
FAIL flag:
PFAIL标志只是一个节点本地的信息,为了使slave提升为master,需要将PFAIL升级为FAIL。PFAIL升级为FAIL需要满足一些条件:
A节点将B节点标记为PFAIL
A节点通过gossip消息收集其他大部分master节点标识的B节点的状态
大部分master节点在NODE_TIMEOUT * FAIL_REPORT_VALIDITY_MULT(2s)时间段内,标识B节点为PFAIL或FAIL
如果满足以上条件,A节点会将B节点标识为FAIL并且向所有节点发送B节点FAIL的消息。收到消息的节点也都会将B标为FAIL。
注意:FAIL状态是单向的,只能从PFAIL升级为FAIL,而不能从FAIL降为PFAIL。
清除FAIL状态:
- 节点重新可达,并且是slave节点
- 节点重新可达,并且是master节点,但是不提供任何slot服务
- 节点重新可达,并且是master节点,但是长时间没有slave被提升为master来顶替它
PFAIL提升到FAIL使用的是一种弱协议:
- 节点收集的状态不在同一时间点,会丢弃时间较早的报告信息,但是也只能保证节点的状态在一段时间内大部分master达成了一致
- 检测到一个FAIL后,需要通知所有节点,但是没有办法保证每个节点都能成功收到消息
当从节点发现自己的主节点变为已下线(FAIL)状态时,便尝试进Failover,成为新的主。以下是故障转移的执行步骤:
在下线主节点的所有从节点中选中一个从节点
被选中的从节点执行SLAVEOF NO NOE命令,成为新的主节点
新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己
新的主节点对集群进行广播PONG消息,告知其他节点已经成为新的主节点
新的主节点开始接收和处理槽相关的请求
相关结构
clusterNode:
typedef struct clusterNode {
mstime_t ctime; // 该node创建时间
char name[CLUSTER_NAMELEN]; // 40位的node名字
// node 状态标识通过投CLUSTER_NODE 定义。
// 包括 maser slave self pfail fail handshake noaddr meet migrate_to null_name 这些状态
int flags;
// 本节点最新epoch
uint64_t configEpoch;
// 当前node负责的slot 通过bit表示
unsigned char slots[CLUSTER_SLOTS/];
int numslots;
int numslaves;
struct clusterNode **slaves;
// 如果该node为从 则指向master节点
struct clusterNode *slaveof;
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */ clusterLink *link;
// 将该节点标记为失败的node list
// 节点收到gossip消息后,如果gossip里标记该节点为pfail则加入改list
// 比如:节点a向b发送gossip,消息包含了 c 节点且出于pfail,则a将被加入c的link。
list *fail_reports;
} clusterNode;
clusterMsgData:节点间通讯的数据结构 包含了 ping、fail、publish、update四种类型
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[];
} ping;
节点间通过ping保持心跳以及进行gossip集群状态同步,每次心跳时,节点会带上多个clusterMsgDataGossip(其他节点)消息体,经过多次心跳,该节点包含的其他节点信息将同步到其他节点。
clusterState:定义了完整的集群信息
struct clusterState{
// 集群最新的epoch,为64位的自增序列
uint64_t currentEpoch;
// 包含的所有节点信息
dict *nodes;
// 每个slot所属于的节点,包括处于migrating和importinng状态的slot
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
// 当前节点所包含的key 用于在getkeysinslot的时候返回key信息
zskiplist *slots_to_keys;
...
}
redis启动,判断是否允许cluster模式,如果允许,则调用clusterInit进行cluster信息的初始化。clusterState被初始化为初始值。在后续节点meet及ping过程逐步更新clusterState信息。
Send Ping:
节点创建成功后,节点会向已知的其他节点发送ping消息保持心跳,ping消息体同时会携带已知节点的信息,并通过gossip同步到集群的其他节点。
node的ping由clusterCron负责调用,服务启动时,在serverCron内部会注册clusterCron,该函数每秒执行10次,在clusterCron内部,维护着static变量iteration记录该函数被执行的次数:通过if (!(iteration % 10)){}的判断,使得各节点每秒发送一次心跳。ping节点选择的代码逻辑如下:
clusterCron:
void clusterCron(void)
{
// ...
// 如果没有设置handshake超时,则默认超时未1s
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < )
handshake_timeout = ; // 遍历nodes列表
while ((de = dictNext(di)) != NULL)
{
// 删除handshake超时的节点
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout)
{
clusterDelNode(node);
continue;
}
// 如果该节点的link为空,则为该节点新建连接,并且初始化ping初始时间
if (node->link == NULL)
{
// 如果该节点处于meet状态,则直接发送meet让节点加入集群
// 否则发送向该节点发送ping
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
}
// 函数每被调动10次,则发送一次ping,因此ping间隔为1s
if (!(iteration % ))
{
int j;
for (j = ; j < ; j++)
{
// 随机选取节点并过滤link为空的以及self
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this->link == NULL || this->ping_sent != )
continue;
if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE))
continue; // 挑选距离上次pong间隔最久的节点
// redis会尽量选择距离上次ping间隔最久的节点,
// 以此防止随机不均匀导致某些节点一直收不到ping
if (min_pong_node == NULL || min_pong > this->pong_received)
{
min_pong_node = this;
min_pong = this->pong_received;
}
}
}
}
发送ping的时候,会优先给新加入的节点发送ping,其实再选择最久没被更新的节点,通过对旧节点选择的加权,尽可能地保证了集群最新状态的一致。
每次ping请求,node会从已知的nodes列表里随机选取n个节点(n=1/10*len(nodes)&& n>=3),一段时间后,该节点已知的nodes将被同步到集群的其他节点,集群状态信息达成最终一致。具体实现代码如下(只列出部分代码,完整代码见cluster.c/clusterSendPing)
clusterSendPing:
void clusterSendPing(clusterLink *link, int type)
{
// 选取1/10 的节点数并且要求大于3.
// 1/10是个魔数,为啥是1/10在源码里有解释
int freshnodes = dictSize(server.cluster->nodes) - ;
wanted = floor(dictSize(server.cluster->nodes) / );
if (wanted < )
wanted = ;
if (wanted > freshnodes)
wanted = freshnodes;
while (freshnodes > && gossipcount < wanted && maxiterations--)
{
// 通过随机函数随机选择一个节点,保证所有节点尽可能被同步到整个集群
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
// 为了保证失败的节点尽可能快地同步到集群其他节点,
// 优先选取处于pfail以及fail状态的节点
if (maxiterations > wanted * &&
!(this->flags & (CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
continue;
}
// 如果被选中的节点处于
// 1.handshake 并且noaddr状态
// 2.其他节点没有包含该节点的信息,并且该节点没有拥有slot
// 则跳过该节点并且将可用的节点数减1,以较少gossip数据同步的开销
if (this->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == ))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
}
通过随机选取合适数量的节点,以及对节点状态的过滤,保证了尽可能快的达成最终一致性的同时,减少gossip的网络开销。
cluster监听cluster端口,并通过clusterAcceptHandler接受集群节点发起的连接请求,通过aeCreateFileEvent将clusterReadHandler注册进事件回调,读取node发送的数据包。clusterReadHandler读取到完整的数据包后,调用clusterProcessPacket处理包请求。clusterProcessPacket包含收到数据包后完整的处理逻辑。
clusterProcessPacket:
int clusterProcessPacket(clusterLink *link)
{
// 判断是否为ping请求并校验数据包长度
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
uint32_t explen; /* expected length of this packet */ explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip) * count);
if (totlen != explen)
return ;
}
// ... // 是否为已知节点
sender = clusterLookupNode(hdr->sender);
if (sender && !nodeInHandshake(sender))
{
// 比较epoch并更新为最大的epoch
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is publishing a newer one. */
if (senderConfigEpoch > sender->configEpoch)
{
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_FSYNC_CONFIG);
}
}
// 回复pong数据包
clusterSendPing(link, CLUSTERMSG_TYPE_PONG); // 获取gossip消息并处理gossip请求
if (sender)
clusterProcessGossipSection(hdr, link);
}
clusterProcessGossipSection 读取携带的gossip node内容,并判断这些node是否failover:
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link)
{
// ... if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL))
{
if (clusterNodeAddFailureReport(node, sender))
{
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
markNodeAsFailingIfNeeded(node);
}
else
{
// 如果该node并非出于fail状态,则从fail link里删除该node
if (clusterNodeDelFailureReport(node, sender))
{
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender)
{
while ((ln = listNext(&li)) != NULL)
{
fr = ln->value;
if (fr->node == sender)
break;
}
if (!ln)
return ;
// 如果之前被标记为失败,则从失败list里删除
listDelNode(l, ln);
}
cluster会根据收到的gossip包里的msgdata来更新集群的状态信息,包括epoch,以及其余节点的状态。如果node被标记为pfail或fail,则被加入fail_reports,当fail_reports长度超过半数节点数量时,该节点及被标记为failover。
总结:
Redis Cluster是去中心化的结构,集群元数据信息分布在每个节点上,主备切换依赖于多个节点协商选主。采用无中心节点方式实现,无需proxy代理,客户端直接与redis集群的每个节点连接,根据同样的hash算法计算出key对应的slot,然后直接在slot对应的Redis上执行命令。从CAP定理来看,Cluster支持了AP(Availability&Partition-Tolerancy),这样让Redis从一个单纯的NoSQL内存数据库变成了分布式NoSQL数据库。