通过应用监控redis主从切换确保数据一致性

时间:2024-10-20 13:52:08

redis服务提供了两种方式保证数据不丢失:

  1. rdb持久化,通过将Redis在内存中的数据集快照保存到磁盘上来实现redis重启数据不丢失,开启方式:
# 1小时有1条修改命令 5分钟有100条修改命令 1分钟有10000条修改命令
save 3600 1 300 100 60 10000

# 数据文件目录
dir /home/redis-7.2.1/datas/

# rdb文件名
dbfilename dump.rdb

# 是否对RDB文件压缩,开启后会使用LZF压缩
rdbcompression yes

这种持久化方式有可能导致数据丢失,一般在生产环境不配置这种持久化,客户端可以通过向Redis服务器发送save或bgsave命令让服务器生成rdb文件,主从复制时如果从节点比主节点落后数据非常多时会使用rdb方式更新数据,由于它是数据快照方式持久化,所以恢复数据性能非常好。

  1. aof持久化,通过保存服务器收到的每一个写操作命令到文件来进行数据持久化的。当Redis重启时,可以通过重新执行这些命令来恢复数据到内存中。
# 开启持久化
appendonly yes

# append追加文件名
appendfilename "appendonly.aof"

# 文件目录
appenddirname "appendonlydir"

# 追加模式支持3种:always每条命令都保存、everysec每分钟保存一次、no不指定让操作系统决定持久化时间
# appendfsync always
appendfsync everysec
# appendfsync no

# 自动触发aof文件重写条件
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

如果同时开启rdb和aof持久化方式,redis重启时优先使用aof持久化文件,因为它可以确保丢失更少的数据,随着处理命令的增加,可以通过手动执行bgrewriteaof重写aof文件,避免文件增大过快。
上面介绍了两种持久化数据的方式,尽管aof方式提供了 always 模式保证数据一条不丢失,但一般在生产中都是集群部署,而redis在主从复制时是异步的,这就导致在主从切换时存在数据丢失的风险,如果程序使用redis时不允许数据丢失,那就需要在代码中保证数据一致性,而前面介绍的 通过redis实现高性能扣费 就是不允许数据丢失的使用场景,下面介绍一下我是如何确保主从切换时的数据一致性的。
要确保数据不丢失,就需要程序能够感知到redis主从切换,当redis产生了主从切换,通过加载本地已经持久化的数据补偿丢失的数据。可以通过本地启动一个定时任务,定时监控redis集群节点信息,发现master节点改变就重新加载本地数据:
要感知redis集群节点角色改变,我们要存储节点的角色信息,然后通过定时任务不断的获取集群当前角色与存储的历史角色比较,不一致就发生了变更。
首先定义一个数据结构保存节点信息:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * redis集群节点数据
 *
 * @Author xingo
 * @Date 2024/9/18
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ClusterNode {

    /**
     * 集群是否为主节点
     */
    private boolean master;

    /**
     * 节点索引值
     */
    private short idx;

    /**
     * 节点主机和端口
     */
    private String hostAndPort;
}

再定义一个服务类处理节点数据,节点的历史状态保存到一个哈希集合中:

import io.lettuce.core.cluster.SlotHash;
import org.springframework.stereotype.Service;
import org.xingo.entity.ClusterNode;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * redis集群节点数据
 *
 * @Author xingo
 * @Date 2024/9/18
 */
@Service
public class RedisNodeSlotService {

    /**
     * 节点信息
     */
    private Map<String, ClusterNode> nodeMap = new HashMap<>();
    /**
     * hash槽信息
     */
    private Map<Integer, Short> slotMap = new HashMap<>();

    /**
     * 增加节点信息
     * @param hostAndPort
     * @param node
     */
    public void addNode(String hostAndPort, ClusterNode node) {
        nodeMap.put(hostAndPort, node);
    }

    /**
     * 获取节点信息
     * @param hostAndPort
     * @return
     */
    public ClusterNode getNode(String hostAndPort) {
        return nodeMap.get(hostAndPort);
    }

    /**
     * 返回节点集合
     * @return
     */
    public Collection<ClusterNode> allNodes() {
        return Collections.unmodifiableCollection(nodeMap.values());
    }

    /**
     * 清空节点信息集合
     */
    public void clearNodes() {
        nodeMap.clear();
    }

    /**
     * 获取节点的下一个索引值
     * @return
     */
    public short nextIdx() {
        short maxIdx = 0;
        if(nodeMap.isEmpty()) {
            maxIdx = 0;
        } else {
            for (ClusterNode node : nodeMap.values()) {
                if(node.getIdx() > maxIdx) {
                    maxIdx = node.getIdx();
                }
            }
        }
        maxIdx += 1;
        return maxIdx;
    }

    /**
     * 添加一个hash槽所在的节点索引值
     * @param slot
     * @param idx
     */
    public void addSlot(Integer slot, Short idx) {
        slotMap.put(slot, idx);
    }

    /**
     * 获取某个hash槽所在的节点索引值
     * @param slot
     */
    public void getSlotNodeIdx(Integer slot) {
        slotMap.get(slot);
    }

    /**
     * 获取键的hash槽
     * @param key
     * @return
     */
    public Integer getKeySlot(String key) {
        if(key == null || "".equals(key.trim())) {
            return null;
        }

        return SlotHash.getSlot(key);
    }

    /**
     * 获取键的节点
     * @param key
     * @return
     */
    public Short getKeyNode(String key) {
        Integer slot = getKeySlot(key);
        return slot != null ? slotMap.get(slot) : null;
    }
}

上面分析时已经阐述了要通过一个定时任务不断监听集群的当前状态,通过比较历史状态是否一致来判断是否发生变化:

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import org.springframework.data.redis.connection.RedisServer;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.entity.ClusterNode;
import org.xingo.front.service.impl.LoadDataService;
import org.xingo.front.service.impl.RedisNodeSlotService;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;

/**
 * @Author xingo
 * @Date 2024/9/18
 */
@Slf4j
@Component
@EnableScheduling
public class CheckRedisNodeSlotJob {

    @Autowired
    private RedisNodeSlotService redisNodeSlotService;
    @Autowired
    private LoadDataService loadDataService;
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Scheduled(cron="0/5 * * * * ?")
    public void run() {
//        JedisConnectionFactory
        LettuceConnectionFactory factory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();

        if(factory.getClusterConfiguration() != null) {     // 集群模式
            RedisClusterConnection conn = factory.getClusterConnection();
            Iterable<RedisClusterNode> nodes = conn.clusterGetNodes();
            RedisConnectionUtils.releaseConnection(conn, factory);
            Iterator<RedisClusterNode> iterator = nodes.iterator();
            boolean changeStatus = false;
            while (iterator.hasNext()) {
                RedisClusterNode next = iterator.next();
                String hostAndPort = next.getHost() + ":" + next.getPort();

                // redis节点集合
                if (redisNodeSlotService.getNode(hostAndPort) == null) {
                    ClusterNode clusterNode = ClusterNode.builder().master(next.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                    redisNodeSlotService.addNode(hostAndPort, clusterNode);
                }
                ClusterNode node = redisNodeSlotService.getNode(hostAndPort);
                // 从节点升级为主节点
                if(next.isMaster() && !node.isMaster()) {
                    log.error("redis集群发生主从切换|{}|{}|{}", hostAndPort, node.isMaster(), next.isMaster());
                    changeStatus = true;
                }
            }
            if(changeStatus) {
                // redis集群状态发生变化通知所有应用变更本地缓存的redis节点状态并且检查最近一段时间的数据
                this.init();
                loadDataService.checkCache();
            }
        } else if(factory.getSentinelConfiguration() != null) {     // 哨兵模式
            RedisSentinelConnection conn = factory.getSentinelConnection();
            Collection<RedisServer> masters = conn.masters();
            try {
                conn.close();
            } catch (IOException e) {
                log.error("关闭连接异常", e);
            }
            for (RedisServer master : masters) {
                String hostAndPort = master.getHost() + ":" + master.getPort();
                if(redisNodeSlotService.getNode(hostAndPort) == null) {
                    // redis主节点发生变化通知所有应用变更本地缓存的redis节点状态并且检查最近一段时间的数据
                    log.error("redis哨兵主节点切换|{}|{}", JacksonUtils.toJSONString(redisNodeSlotService.allNodes()), hostAndPort);
                    this.init();
                    loadDataService.checkCache();
                }
            }
        } else {    // 单机模式
            System.out.println("======== 单机模式 ========");
        }
    }

    /**
     * 服务启动时初始化集群信息
     */
    @PostConstruct
    public void init() {
        LettuceConnectionFactory factory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();

        if(factory.getClusterConfiguration() != null) {     // 集群模式
            RedisClusterConnection conn = factory.getClusterConnection();
            Iterable<RedisClusterNode> nodes = conn.clusterGetNodes();
            RedisConnectionUtils.releaseConnection(conn, factory);
            Iterator<RedisClusterNode> iterator = nodes.iterator();
            while (iterator.hasNext()) {
                RedisClusterNode next = iterator.next();
                String hostAndPort = next.getHost() + ":" + next.getPort();

                // redis节点集合
                if (redisNodeSlotService.getNode(hostAndPort) == null) {
                    ClusterNode clusterNode = ClusterNode.builder().master(next.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                    redisNodeSlotService.addNode(hostAndPort, clusterNode);
                } else {
                    redisNodeSlotService.getNode(hostAndPort).setMaster(next.isMaster());
                }
                short idx = redisNodeSlotService.getNode(hostAndPort).getIdx();

                // redis槽集合
                RedisClusterNode.SlotRange slotRange = next.getSlotRange();
                Set<Integer> slots = slotRange.getSlots();
                if (!slots.isEmpty()) {
                    for (Integer slot : slots) {
                        redisNodeSlotService.addSlot(slot, idx);
                    }
                }
            }
        } else if(factory.getSentinelConfiguration() != null) {     // 哨兵模式
            redisNodeSlotService.clearNodes();
            RedisSentinelConnection conn = factory.getSentinelConnection();
            Collection<RedisServer> masters = conn.masters();
            try {
                conn.close();
            } catch (IOException e) {
                log.error("关闭连接异常", e);
            }
            for (RedisServer master : masters) {
                String hostAndPort = master.getHost() + ":" + master.getPort();
                ClusterNode clusterNode = ClusterNode.builder().master(master.isMaster()).hostAndPort(hostAndPort).idx(redisNodeSlotService.nextIdx()).build();
                redisNodeSlotService.addNode(hostAndPort, clusterNode);
            }
        } else {    // 单机模式
            System.out.println("======== 单机模式 ========");
        }
    }
}

截止到当前,监听redis集群状态的代码已经全部完成,接下来就是当监听到集群状态变化时的处理逻辑,我们这里模拟的是当集群状态改变时重新加载本地数据到redis确保数据不丢失。业务数据采用的是日志先行WAL(Write-Ahead Logging)方式确保数据不丢失,所以也可以通过这个日志恢复数据到redis。还是用前面的扣费逻辑数据举例:

2024-09-19 10:30:52.542|9|1836593772545314816|100|1726713052295

规定使用“|”分隔日志内容,第一列是时间戳、第二列是用户ID、第三列是订单ID、第四列是扣费金额、第五列是扣费时间戳,日志文件名是deduct.log,通过解析这个日志文件拆分字段,比较订单ID是否存在,把那些不在redis中的数据重新加载到redis:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.xingo.common.RedisKeyUtils;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import