一致性hash算法实现

时间:2021-05-01 16:45:20

前几天在看redis的集群方案,在redis3.0以后支持的服务器端的集群方案。不过,在客户端也有成熟的redis集群。实现思想是采用一致性hash算法,将redis节点散列,将存取的key也进行散列,从而找到该从哪个节点上操作数据。下面先来了解下一致性hash算法。

使用场景

现在我们假设有100台redis data服务器,一份数据101进来的时候,以散列公式hash(i)&100,计算所存放的服务器,假设hash(i) = i,那么数据被散列到标号为1的服务器,然后这个时候服务器新增了一台,然后散列公式为hash(i)%101,这个时候请求访问数据101的时候,被分配至0号服务器,但是其实这个时候数据是在1号服务器的。

所以这个时候大量的数据失效了(访问不到了)。

所以这个时候,我们假设是新增了服务器,如果是持久化存储的,我们可以让服务器集群对数据进行重新散列,进行数据迁移,然后进行恢复,但是这个时候就意味着每次增减服务器的时候,集群就需要大量的通信,进行数据迁移,这个开销是非常大的。如果只是缓存,那么缓存就都失效了。所以这个时候怎么办?

我们可以看到,关键问题在于,服务器数量变动的时候,要能够保证旧的数据能够按照老的算法,计算到数据所在的服务器,而新的数据能够按照新的散列算法,计算出数据所在的服务器。

一致性hash算法实现

如上图,我们有ABCD四台服务器,这四台服务器被分配至0~232 的一个环上,比如0~230的存储在A服务器,230 +1~231 存储到B服务器上.....CD按照这样的进行均分。将我们的散列空间也划为0~232 ,然后数据进来后对232 取模,得到一个值K1,我们根据K1在环上所处的位置,得到所分配到的服务器,如图,K1被分配到B服务器。 这个时候,我们有一台服务器B失效了。

一致性hash算法实现

 

 

我们可以看到,如果是B失效了,那么如果有持久化存储的,需要做数据恢复,将B的数据迁移至C即可,对于原本散列在A和D的数据,不需要做任何改变。 同理,如果我们是新增了服务器,那么只需要对一台服务器的数据迁移一部分至新加的服务器即可。

一致性哈希,简单的说在移除 / 添加一个 cache 时,它能够尽可能小的改变已存在 key 映射关系,尽可能的满足单调性的要求。

--------------------------------------------------------------------------------- 代码实现

代码实现:将节点的hash值和节点信息(比如节点的ip)放入一个有序的以集合中(选择treeMap做示例),根据要操作的key的hash找到比它大的第一个节点,若没有找到,则找第一个节点。(模拟环形)

 

package com.cw.demo.yizhixing.hash;

import java.util.HashMap;

/**
 * 服务器节点
 * Created by chenwei01 on 2017/1/18.
 */
public class ServerNode {
    //节点hash值
    private Integer index;
    //节点地址    
    private String serverAddr;
    //模拟数据存储
    private HashMap data=new HashMap();

    public ServerNode(String serverAddr) {
        this.serverAddr = serverAddr;
    }

    public ServerNode(int index, String serverAddr) {
        this.index = index;
        this.serverAddr = serverAddr;
    }

    public Integer getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }

    public HashMap getData() {
        return data;
    }

    public void setData(HashMap data) {
        this.data = data;
    }

    public String getServerAddr() {
        return serverAddr;
    }

    public void setServerAddr(String serverAddr) {
        this.serverAddr = serverAddr;
    }

    @Override
    public String toString() {
        return "ServerNode{" +
                "index=" + index +
                ", serverAddr='" + serverAddr + '\'' +
                ", data=" + data +
                '}';
    }
}
package com.cw.demo.yizhixing.hash;

/**
* hash 工具类
* Created by chenwei01 on 2017/1/17.
*/
public class HashUtil {

/**
* 通过key计算下标
* 采用FNV算法
* @param key 节点值
* @return
*/
public static int hash(String key){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i=0;i<key.length();i++){
hash = (hash ^ key.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
hash= hash<0?Math.abs(hash):hash;
//对2的32次方取余,实际上这里有些疑惑,为什么是这个数,是为了增大离散吗?
return (int)(hash % Math.pow(2,32));
}


}

 

package com.cw.demo.yizhixing.hash;

import org.apache.commons.lang.StringUtils;

import java.util.*;

/**
 * 一致性hash算法demo
 * 分布式系统中常用的策略
 * Created by chenwei01 on 2017/1/18.
 */
public class YiZhiXingDemo {

    public static void main(String[] args) {


        //将各个节点的hash计算后放入 集合中排序
        ServerNode node1=new ServerNode("192.168.21.58");
        ServerNode node2=new ServerNode("192.168.1.9");
        ServerNode node3=new ServerNode("192.168.82.220");
        ServerNode node4=new ServerNode("192.168.72.125");
        ServerNode node5=new ServerNode("192.168.12.112");
        ServerNode node6=new ServerNode("192.168.3.48");

        node1.setIndex(HashUtil.hash(node1.getServerAddr()));
        node2.setIndex(HashUtil.hash(node2.getServerAddr()));
        node3.setIndex(HashUtil.hash(node3.getServerAddr()));
        node4.setIndex(HashUtil.hash(node4.getServerAddr()));
        node5.setIndex(HashUtil.hash(node5.getServerAddr()));
        node6.setIndex(HashUtil.hash(node6.getServerAddr()));

        TreeMap<Integer,ServerNode> map=new TreeMap<Integer,ServerNode>();
        map.put(node1.getIndex(), node1);
        map.put(node2.getIndex(), node2);
        map.put(node3.getIndex(), node3);
        map.put(node4.getIndex(), node4);
        map.put(node5.getIndex(), node5);
        map.put(node6.getIndex(), node6);

        Integer Serverkey=0;
        String keyStr="";
       for (int i=97;i<123;i++){
           keyStr= String.valueOf ((char)i);
           Integer index= HashUtil.hash(keyStr);
           // 获取 大于key的hash值的第一台服务器 ,若取不到,则取 第一台服务器
           SortedMap<Integer ,ServerNode> sortedMap = map.tailMap(index);
           if (sortedMap==null||sortedMap.size()==0){
               Serverkey=map.firstKey();
           }else {
               Serverkey = sortedMap.firstKey();
           }
           ServerNode node =map.get(Serverkey);

           node.getData().put(keyStr, StringUtils.upperCase(keyStr));
           System.out.println("key 为 " + keyStr + " ,路由到下标为" +Serverkey+ "索引为" + node.getIndex() + "的节点上存储");
       }
        System.out.println(map);

        //----------------------------------模拟 宕机的情况
        map.remove(node1.getIndex());
        System.out.println(map.size());
        for (int i=97;i<123;i++){
            keyStr= String.valueOf ((char)i);
            Integer index= HashUtil.hash(keyStr);
            // 获取 大于key的hash值的第一台服务器 ,若取不到,则取 第一台服务器 --start
            SortedMap<Integer ,ServerNode> sortedMap = map.tailMap(index);
            if (sortedMap==null||sortedMap.size()==0){
                Serverkey=map.firstKey();
            }else {
                Serverkey = sortedMap.firstKey();
            }
            ServerNode node =map.get(Serverkey);
            System.out.println("key 为 "+keyStr+" 索引到下标为 "+Serverkey+"的server, 取得的结果为 "+node.getData().get(keyStr));
        }
    }
}

 

结果如图

一致性hash算法实现

可以看出一致性hash算法的优点:如果服务器数量发生改变,并不是所有的缓存都会失效,而只是部分会失效,而不至于所有压力都集中到后端数据库层面。

还有个虚拟节点的方案,用来平衡节点路由的不平衡问题,主要手段就是建立虚拟节点与真实节点的映射,比如虚拟节点名为:  xuni_0_192.168.0.1,xuni_2_192.168.0.1,xuni_3_192.168.0.1然后路由到这个虚拟节点后,真实节点就是 192.168.0.1。