改造ServiceStack.Redis支持Redis3.x集群模式

时间:2022-01-30 17:25:40

Redis我一直都在使用,但是目前的Redis3.*版本横空出世,竟然支持了Redis集群,这可解决了我们很久以来的Redis扩展的难题。但是通过测试让我又喜又悲,我一直都是在使ServiceStack.Redis进行Redis的相关操作,现有的dll版本已经无法满足Redis3.0下的集群模式,终于现在有了时间,所以可以开搞了.

那么如何让ServiceStack.Redis支持集群我们也将陆续的开展工作,通过阅读redis cluster的官方文档找到了答案,其实很简单.

  1. 了解Redis3.0的集群模式
  2. 了解Hash Slot的计算方式
  3. 了解ServiceStack.Redis
  4. 寻找切入点
  5. 代码实现

一、Redis3.0集群模式

目前Redis3.0主要模式为主从设计,并且在主节点的算法上采用少数服从多数,每一个主节点都会产生N-1个副本进行主从复制,如果当其中一个主节点失败,集群将自动将其中一个从节点提升为主继续维持整个集群的正常工作.

二、Hash Slot

Redis3.0在Hash Slot上固定设定为16384个.并且各个节点都会平均的分布这些slot.

如:

Node A contains hash slots from 0 to 5500.

Node B contains hash slots from 5501 to 11000.

Node C contains hash slots from 11001 to 16384.

当我们要获取一个指定的Key,那么我将通过Slot的算法进行计算出所要操作的Key存在哪一个主节点内,Slot的计算公式为: Slot = CRC16(KEY)% 16384.

如 key=a 那么他的slot计算得出为 15495

通过比对A\B\C三个节点的slots则可以断定,这个key应该写入到节点C.

三、ServiceStack.Redis

类图结构:
改造ServiceStack.Redis支持Redis3.x集群模式

通过上面的类图,我们可以关注一下string client包含的这些接口,从类图上看它将封装了Redis操作的所有功能,并以接口对象的形式来实现具体操作的代码编写。

打开源代码分析哈,RedisClient类为操作具体实现类,使用partial class特性来使用多个文件合并类。

PooledRedisClientManager 类将作为RedisClient的管理类进行Client分配和一些管理操作,如故障转移.

至少我是是那么认为这两个类是这样搭配使用的.

四、切入点

是到了寻找如何下手的时候啦,看到前面所涉及的一些内容,可以设想一下当根据PooledRedisClientManager对象进行了RedisClient初始化,并当我们每次根据Key进行操作获取RedisClient时的无参函数GetClient(),扩展为GetClient(key)来根据key的Slot选择该返回的RedisClient对象,这样就每次都能够正确的节点地址进行连接操作啦,同理读取操作也是一样的.

思路步骤:

  1. 为ServiceStack.Redis增加集群模式的属性,确定开启和关闭集群模式
  2. 增加Cluster扩展类并支持一些Cluter模式下的公共方法,此类同样partial class RedisClient,提供查询Cluster Nodes信息功能、Hash Slot计算方法以及节点信息的实体属性类.
  3. 增加带参的GetClient(key)方法,通过计算key的slot来返回对应的RedisClient.

就是那么简单!

五、代码实现

在修改代码的同时为了不破坏原有的代码结构,我们还是使用if else来分隔我们的代码域,防止由于我们的疏忽而导致以前的程序出现问题,当然如果有足够的把握,也可以直接把这个函数重写啦!

修改PooledRedisClientManage

/// <summary>
/// wanglei add
/// open redis 3.0 cluster mode
/// </summary>
public bool? OpenCluster { get; set; }

寻找到private void InitClient(RedisClient client)方法,加入初始化判断.

    private void InitClient(RedisClient client)
    {
        if (this.ConnectTimeout != null)
            client.ConnectTimeout = this.ConnectTimeout.Value;
        if (this.SocketSendTimeout.HasValue)
            client.SendTimeout = this.SocketSendTimeout.Value;
        if (this.SocketReceiveTimeout.HasValue)
            client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
        if (this.IdleTimeOutSecs.HasValue)
            client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
        if (this.ForceReconnectInIdle.HasValue)
            client.ForceReconnectInIdle = this.ForceReconnectInIdle.Value;
        else
            client.ForceReconnectInIdle = false;
        if (this.NamespacePrefix != null)
            client.NamespacePrefix = NamespacePrefix;
        if (Db != null && client.Db != Db) //Reset database to default if changed
            client.ChangeDb(Db.Value);

        //wanglei add
        //close the cluster mode default.
        if (OpenCluster.HasValue)
        {
            client.OpenCluster = this.OpenCluster.HasValue;
        }
        else
        {
            client.OpenCluster = false;
            this.OpenCluster = false;
        }
        //wanglei add
        if (this.OpenCluster.Value)
        {
            InitClusterInfo(client);
        }
    }

    public List<ClusterNode> ClusterNodes;//wanglei add
    private bool _loadClusterNodes = false;//wanglei add

    /// <summary>
    /// init cluster info
    /// wanglei add
    /// </summary>
    /// <param name="client"></param>
    public void InitClusterInfo(RedisClient client)
    {
        if (_loadClusterNodes) return;
        ClusterNodes = new List<ClusterNode>();
        string host = string.Format("{0}:{1}", client.Host, client.Port);
        string nodes = client.GetClusterInfo();
        using (var reader = new StringReader(nodes))
        {
            string line;
            while ((line = reader.ReadLine()) != null)
            {
                if (string.IsNullOrWhiteSpace(line)) continue;
                ClusterNodes.Add(new ClusterNode(line));
            }
        }

        client.FillSlot(ClusterNodes);

        //according cluster nodes to reset the read or write client
        //1. the slave node as read from the client
        //2. the master node as write from the client            
        lock (readClients)
        {
            readClients = new RedisClient[0];
            ReadOnlyHosts = ClusterNodes.Where(w => w.Slave == true).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
                .ToRedisEndPoints();
        }

        lock (writeClients)
        {
            writeClients = new RedisClient[0];
            ReadWriteHosts = ClusterNodes.Where(w => w.Slave == false).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
                .ToRedisEndPoints();
        }


        _loadClusterNodes = true;
    }

无参函数到有参的过程所涉及到要修改的函数:

//获取写入客户端

public IRedisClient GetClient()

//得到一个写入角色的客户端

private RedisClient GetInActiveWriteClient() –此函数在修改中被注释掉由private RedisClient GetInActiveWriteClient(string key)代替.

//获取读客户端

public virtual IRedisClient GetReadOnlyClient()

//获取一个读角色的客户端

private RedisClient GetInActiveReadClient() –此函数在修改中被注释掉由private RedisClient GetInActiveReadClient(string key)代替.

以上四个函数为Manage类主要的入口点,需要利用函数重载为他们扩展一个参数key来支持slot计算.

    /// <summary>
    /// get a readwriteclient by key slot
    /// wanglei add
    /// </summary>
    /// <param name="key">the redis keyword</param>
    /// <returns></returns>
    public IRedisClient GetClient(string key)
    {
        lock (writeClients)
        {
            if (!OpenCluster.Value) //open cluster mode wanglei add
            {
                AssertValidReadWritePool();
            }

            RedisClient inActiveClient;
            while ((inActiveClient = GetInActiveWriteClient(key)) == null)
            {
                if (PoolTimeout.HasValue)
                {
                    // wait for a connection, cry out if made to wait too long
                    if (!Monitor.Wait(writeClients, PoolTimeout.Value))
                        throw new TimeoutException(PoolTimeoutError);
                }
                else
                    Monitor.Wait(writeClients, RecheckPoolAfterMs);
            }

            WritePoolIndex++;
            inActiveClient.Active = true;

            InitClient(inActiveClient);

            return inActiveClient;
        }
    }

    /// <summary>
    /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
    /// </summary>
    /// <returns></returns>
    public IRedisClient GetClient()
    {   
        //将代码移植到有参函数并且调用.
        return this.GetClient(string.Empty);
    }

    /// <summary>
    /// Called within a lock
    /// wanglei add
    /// </summary>
    /// <returns></returns>
    private RedisClient GetInActiveWriteClient(string key)
    {
        //if key is empty then will think not cluster mode (default).
        int slot = -1;
        if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
        {
            slot = RedisClient.HashSlot(key);
        }
        if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
        {
            ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave == false);
            if (clusterNode == null)
            {
                return null;
            }
            RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f => 
                    f.Host == clusterNode.Host && f.Port == clusterNode.Port
                );
            if (rePoint == null)
            {
                return null;
            }
            return InitNewClient(rePoint);
        }
        else
        {
            var desiredIndex = WritePoolIndex % writeClients.Length;
            //this will loop through all hosts in readClients once even though there are 2 for loops
            //both loops are used to try to get the prefered host according to the round robin algorithm
            for (int x = 0; x < ReadWriteHosts.Count; x++)
            {
                var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count;
                RedisEndpoint nextHost = ReadWriteHosts[nextHostIndex];
                for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
                {

                    if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
                        return writeClients[i];
                    else if (writeClients[i] == null || writeClients[i].HadExceptions)
                    {
                        if (writeClients[i] != null)
                            writeClients[i].DisposeConnection();

                        var client = InitNewClient(nextHost);
                        writeClients[i] = client;

                        return client;
                    }

                }
            }
        }
        return null;
    }

    /// <summary>
    /// get a readonlyclient by key slot
    /// wanglei add
    /// </summary>
    /// <param name="key"></param>
    /// <returns></returns>
    public IRedisClient GetReadOnlyClient(string key)
    {           
        lock (readClients)
        {
            if (!OpenCluster.Value) //wanglei add
            {
                AssertValidReadOnlyPool();
            }
            RedisClient inActiveClient;
            while ((inActiveClient = GetInActiveReadClient(key)) == null)
            {
                if (PoolTimeout.HasValue)
                {
                    // wait for a connection, cry out if made to wait too long
                    if (!Monitor.Wait(readClients, PoolTimeout.Value))
                        throw new TimeoutException(PoolTimeoutError);
                }
                else
                    Monitor.Wait(readClients, RecheckPoolAfterMs);
            }

            ReadPoolIndex++;
            inActiveClient.Active = true;
            InitClient(inActiveClient);
            return inActiveClient;
        }
    }



    /// <summary>
    /// Returns a ReadOnly client using the hosts defined in ReadOnlyHosts.
    /// </summary>
    /// <returns></returns>
    public virtual IRedisClient GetReadOnlyClient()
    {
        return GetReadOnlyClient(string.Empty);//wanglei add
    }



    /// <summary>
    /// According to the key value Called within a lock 
    /// wanglei add
    /// </summary>
    /// <returns></returns>
    private RedisClient GetInActiveReadClient(string key)
    {
        //if key is empty then will think not cluster mode (default).
        int slot = -1;
        if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
        {
            slot = RedisClient.HashSlot(key);
        }
        if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
        {
            ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave==false);
            if (clusterNode == null)
            {
                return null;
            }
            RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f =>
                    f.Host == clusterNode.Host && f.Port == clusterNode.Port
                );
            if (rePoint == null)
            {
                return null;
            }
            return InitNewClient(rePoint);
        }
        else
        {
            var desiredIndex = ReadPoolIndex % readClients.Length;
            //this will loop through all hosts in readClients once even though there are 2 for loops
            //both loops are used to try to get the prefered host according to the round robin algorithm
            for (int x = 0; x < ReadOnlyHosts.Count; x++)
            {
                var nextHostIndex = (desiredIndex + x) % ReadOnlyHosts.Count;
                var nextHost = ReadOnlyHosts[nextHostIndex];
                for (var i = nextHostIndex; i < readClients.Length; i += ReadOnlyHosts.Count)
                {
                    if (readClients[i] != null && !readClients[i].Active && !readClients[i].HadExceptions)
                        return readClients[i];
                    else if (readClients[i] == null || readClients[i].HadExceptions)
                    {
                        if (readClients[i] != null)
                            readClients[i].DisposeConnection();

                        var client = InitNewClient(nextHost);
                        readClients[i] = client;

                        return client;
                    }
                }
            }
        }
        return null;
    }

新增类

/// <summary>
/// redis cluster client extension 
/// wanglei add
/// </summary>
public partial class RedisClient
    : IRedisClient
{
    internal ClusterNode _clusterNode = null;
    internal const int NoSlot = -1;
    internal const int RedisClusterSlotCount = 16384;

    /// <summary>
    /// Get Cluster Nodes Info
    /// wanglei add
    /// </summary>
    internal string GetClusterInfo()
    {
        return SendExpectString(Commands.Cluster, "NODES".ToUtf8Bytes());
    }

    /// <summary>
    /// Fill Slot Range To all
    /// </summary>
    /// <param name="clusterNodes"></param>
    internal void FillSlot(List<ClusterNode> clusterNodes)
    {
        List<ClusterNode> nodes = clusterNodes.Where(s => s.ParentNodeId != "-").ToList();
        foreach (var node in nodes)
        {
            ClusterNode n = clusterNodes.FirstOrDefault(f => f.NodeID == node.ParentNodeId);
            if(n != null)
            {
                node.Slot1 = n.Slot1;
                node.Slot2 = n.Slot2;
            }
        }
    }

    internal static unsafe int IndexOf(byte* ptr, byte value, int start, int end)
    {
        for (int offset = start; offset < end; offset++)
            if (ptr[offset] == value) return offset;
        return -1;
    }

    static readonly ushort[] crc16tab =
        {
            0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
            0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
            0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
            0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
            0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
            0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
            0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
            0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
            0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
            0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
            0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
            0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
            0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
            0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
            0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
            0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
            0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
            0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
            0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
            0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
            0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
            0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
            0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
            0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
            0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
            0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
            0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
            0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
            0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
            0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
            0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
            0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
        };


    /// <summary>
    /// get slot number by key
    /// </summary>
    /// <param name="key"></param>
    /// <returns></returns>
    internal static unsafe int HashSlot(string key)
    {
        if (string.IsNullOrEmpty(key)) return NoSlot;
        unchecked
        {
            var blob = System.Text.Encoding.UTF8.GetBytes(key);
            fixed (byte* ptr = blob)
            {
                int offset = 0, count = blob.Length, start, end;
                if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
                    && (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
                    && --end != start)
                {
                    offset = start + 1;
                    count = end - start; // note we already subtracted one via --end
                }

                uint crc = 0;
                for (int i = 0; i < count; i++)
                    crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ ptr[offset++]) & 0x00FF]) & 0x0000FFFF;
                return (int)(crc % RedisClusterSlotCount);
            }
        }
    }
}

在获取客户端的时候,ServiceStack.Redis的设计者为了能够更好地获取RedisClient使用了轮询算法来依次获取每一个Client Pool 内的对象,但是切换到了集群模式则不同于单例模式,他的key slot完全阻碍了轮询的路径,所以为了能最少的影响代码我使用了if else来完全的切割了代码。

当获取只读客户端时候,集群模式也不同于单例,它还是依靠主节点来达到目的,所以我也是使用的ReadWriteClient来返回想要的Client对象.

所有代码已经修改完成.可以测试啦!

六、测试结果

测试代码:

PooledRedisClientManager prcm = new PooledRedisClientManager(RedisConfig.ReadWriteHost, RedisConfig.ReadOnlyHost,
                                            new RedisClientManagerConfig
                                            {
                                                MaxWritePoolSize = RedisConfig.MaxWritePoolSize,
                                                MaxReadPoolSize = RedisConfig.MaxReadPoolSize,
                                                AutoStart = RedisConfig.AutoStart
                                            });

prcm.OpenCluster = true; //我们为它开启集群模式.

string key_tmp = "key";
string value = "wanglei_test_value";
//测试写
for (int i = 1; i <= 10; i++)
{
    string key = key_tmp + i.ToString();
    using (IRedisClient irc = prcm.GetClient(key))
    {
         irc.SetEntry(key, value+i.ToString());
    }
}
//测试读
for (int i = 1; i <= 10; i++)
{
    string key = key_tmp + i.ToString();
    using (IRedisClient irc = prcm.GetReadOnlyClient(key))
    {
         Console.WriteLine(irc.GetValue(key));
    }
}

写入数据分布情况:

改造ServiceStack.Redis支持Redis3.x集群模式

七、一些问题

经过测试,大部分的key操作目前已经完全的兼容Redis3.0,但是我发现还有一些key操作没有解决,如set中的sunion操作,如果在集群模式下使用,两个key不在同一个节点下就会出现找不到key slot这样的问题,原生的redis-cli -c的集群模式也会有同样的问题,所以目前这类问题还没有得到解决,不知以后redis的作者会不会对这种操作有所改善.