一致性哈希Java源码分析

时间:2022-09-21 10:00:25

  首次接触一致性哈希是在学习memcached的时候,为了解决分布式服务器的负载均衡或者说选路的问题,一致性哈希算法不仅能够使memcached服务器被选中的概率(数据分布)更加均匀,而且使得服务器的增加和减少对整个分布式存储的影响也较小,也就是说不会引起大范围的数据迁移。

  关于一致性哈希算法的原理和应用我就不多说了,网上一抓一大把,可以看这里这里、或者这里等等。直接上代码:

 1 /**
2 * 在这个环中,节点之间是存在顺序关系的,
3 * 所以TreeMap的key必须实现Comparator接口
4 * @author 5 */
6 public final class KetamaNodeLocator {
7
8 private TreeMap<Long, Node> ketamaNodes; // 记录所有虚拟服务器节点,为什么是Long类型,因为Long实现了Comparable接口
9 private HashAlgorithm hashAlg;
10 private int numReps = 160; // 每个服务器节点生成的虚拟服务器节点数量,默认设置为160
11
12 public KetamaNodeLocator(List<Node> nodes, HashAlgorithm alg, int nodeCopies) {
13 hashAlg = alg;
14 ketamaNodes = new TreeMap<Long, Node>();
15
16 numReps = nodeCopies;
17
18 // 对所有节点,生成numReps个虚拟结点
19 for (Node node : nodes) {
20 // 每四个虚拟结点为一组,为什么这样?下面会说到
21 for (int i = 0; i < numReps / 4; i++) {
22 // 为这组虚拟结点得到惟一名称
23 byte[] digest = hashAlg.computeMd5(node.getName() + i);
24 /**
25 * Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,
26 * 分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因
27 */
28 for (int h = 0; h < 4; h++) {
29 // 对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key
30 long m = hashAlg.hash(digest, h);
31
32 ketamaNodes.put(m, node);
33 }
34 }
35 }
36 }
37
38 /**
39 * 根据一个key值在Hash环上顺时针寻找一个最近的虚拟服务器节点
40 * @param k
41 * @return
42 */
43 public Node getPrimary(final String k) {
44 byte[] digest = hashAlg.computeMd5(k);
45 Node rv = getNodeForKey(hashAlg.hash(digest, 0)); // 为什么是0?猜测:0、1、2、3都可以,但是要固定
46 return rv;
47 }
48
49 Node getNodeForKey(long hash) {
50 final Node rv;
51 Long key = hash;
52 //如果找到这个节点,直接取节点,返回
53 if (!ketamaNodes.containsKey(key)) {
54 //得到大于当前key的那个子Map,然后从中取出第一个key,就是大于且离它最近的那个key
55 SortedMap<Long, Node> tailMap = ketamaNodes.tailMap(key);
56 if (tailMap.isEmpty()) {
57 key = ketamaNodes.firstKey();
58 } else {
59 key = tailMap.firstKey();
60 }
61 // For JDK1.6 version
62 // key = ketamaNodes.ceilingKey(key);
63 // if (key == null) {
64 // key = ketamaNodes.firstKey();
65 // }
66 }
67
68 rv = ketamaNodes.get(key);
69 return rv;
70 }
71 }
  KetamaNodeLocator类是实现一致性哈希环的类,记录了所有的服务器节点(虚拟服务器)在环上的位置,以及服务器节点本身的信息(存放在Node中),同时还提供了一个根据key值在Hash环上顺时针寻找一个最近的虚拟服务器节点的方法。
  在多数博客上都有对虚拟服务器节点的使用做出解释,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。就是说在物理服务器很少的时候,可能出现服务器节点通过Hash算法集中映射在环的某一部分,导致数据在映射的时候都分布到某一台或几台服务器上,无法达到负载均衡的目的,这也是违背我们使用分布式系统的初衷的。通过将一个物理节点虚拟成多个虚拟节点的方法,能够使得服务器(虚拟的)在Hash环上分布很均匀,避免出现以上的情况。在这里我还要补充一点使用虚拟服务器节点的作用,当一个分布式的集群在正常负载均衡的情况下所有服务器都饱和工作、达到极限值时,我们需要通过增加物理机器的方法来扩展整个分布式系统的性能,让新加入的服务器分担整个分布式系统上的数据压力。假如不使用虚拟节点,新加入的服务器经过Hash算法映射到环上的某一点,它只对顺时针方向上的下一个服务器产生影响,也就是说它只能分担一个服务器上的数据压力,对于其他的服务器,情况仍不容乐观。而使用虚拟节点,我们就能很好的解决这个问题。
  以上hash映射是通过MD5算法实现,MD5算法会产生一个16字节的数组,通过将其切成4段,每一段作为一个hash值生成唯一的标识。下面是hash算法的源码:
 1 /**
2 * hash算法,通过MD5算法实现
3 * MD5算法根据key生成一个16字节的序列,我们将其切成4段,将其中一段作为得到的Hash值
4 * 在生成虚拟服务器节点中,我们将这四段分别作为四个虚拟服务器节点的唯一标识,即四个hash值
5 * @author XXX
6 */
7 public enum HashAlgorithm {
8
9 /**
10 * MD5-based hash algorithm used by ketama.
11 */
12 KETAMA_HASH;
13
14 public long hash(byte[] digest, int nTime) {
15 long rv = ((long) (digest[3+nTime*4] & 0xFF) << 24)
16 | ((long) (digest[2+nTime*4] & 0xFF) << 16)
17 | ((long) (digest[1+nTime*4] & 0xFF) << 8)
18 | (digest[0+nTime*4] & 0xFF);
19
20 /**
21 * 实际我们只需要后32位即可,为什么返回一个long类型?
22 * 因为Long实现了Comparable接口
23 * Hash环上的节点之间是存在顺序关系的,必须实现Comparable接口
24 */
25 return rv & 0xffffffffL; /* Truncate to 32-bits */
26 }
27
28 /**
29 * Get the md5 of the given key.
30 */
31 public byte[] computeMd5(String k) {
32 MessageDigest md5;
33 try {
34 md5 = MessageDigest.getInstance("MD5");
35 } catch (NoSuchAlgorithmException e) {
36 throw new RuntimeException("MD5 not supported", e);
37 }
38 md5.reset();
39 byte[] keyBytes = null;
40 try {
41 keyBytes = k.getBytes("UTF-8");
42 } catch (UnsupportedEncodingException e) {
43 throw new RuntimeException("Unknown string :" + k, e);
44 }
45
46 md5.update(keyBytes);
47 return md5.digest();
48 }
49 }

测试:

  1 /**
2 * 分布平均性测试
3 * @author 4 */
5 public class HashAlgorithmTest {
6
7 static Random ran = new Random();
8
9 // key的数量,key在实际客户端中是根据要存储的值产生的hash序列?
10 private static final Integer EXE_TIMES = 100000;
11 // 服务器节点的数量
12 private static final Integer NODE_COUNT = 5;
13 // 每个服务器节点生成的虚拟节点数量
14 private static final Integer VIRTUAL_NODE_COUNT = 160;
15
16 /**
17 * 模拟EXE_TIMES个客户端数据存储时选择缓存服务器的情况,
18 * 得到每个服务器节点所存储的值的数量,从而计算出值在服务器节点的分布情况
19 * 判断该算法的"性能",正常情况下要求均匀分布
20 * @param args
21 */
22 public static void main(String[] args) {
23 HashAlgorithmTest test = new HashAlgorithmTest();
24
25 // 记录每个服务器节点所分布到的key节点数量
26 Map<Node, Integer> nodeRecord = new HashMap<Node, Integer>();
27
28 // 模拟生成NODE_COUNT个服务器节点
29 List<Node> allNodes = test.getNodes(NODE_COUNT);
30 // 将服务器节点根据Hash算法扩展成VIRTUAL_NODE_COUNT个虚拟节点布局到Hash环上(实际上是一棵搜索树)
31 // 由KetamaNodeLocator类实现和记录
32 KetamaNodeLocator locator =
33 new KetamaNodeLocator(allNodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT);
34
35 // 模拟生成随机的key值(由长度50以内的字符组成)
36 List<String> allKeys = test.getAllStrings();
37 for (String key : allKeys) {
38 // 根据key在Hash环上找到相应的服务器节点node
39 Node node = locator.getPrimary(key);
40
41 // 记录每个服务器节点分布到的数据个数
42 Integer times = nodeRecord.get(node);
43 if (times == null) {
44 nodeRecord.put(node, 1);
45 } else {
46 nodeRecord.put(node, times + 1);
47 }
48 }
49
50 // 打印分布情况
51 System.out.println("Nodes count : " + NODE_COUNT + ", Keys count : " + EXE_TIMES + ", Normal percent : " + (float) 100 / NODE_COUNT + "%");
52 System.out.println("-------------------- boundary ----------------------");
53 for (Map.Entry<Node, Integer> entry : nodeRecord.entrySet()) {
54 System.out.println("Node name :" + entry.getKey() + " - Times : " + entry.getValue() + " - Percent : " + (float)entry.getValue() / EXE_TIMES * 100 + "%");
55 }
56
57 }
58
59
60 /**
61 * Gets the mock node by the material parameter
62 *
63 * @param nodeCount
64 * the count of node wanted
65 * @return
66 * the node list
67 */
68 private List<Node> getNodes(int nodeCount) {
69 List<Node> nodes = new ArrayList<Node>();
70
71 for (int k = 1; k <= nodeCount; k++) {
72 Node node = new Node("node" + k);
73 nodes.add(node);
74 }
75
76 return nodes;
77 }
78
79 /**
80 * All the keys
81 */
82 private List<String> getAllStrings() {
83 List<String> allStrings = new ArrayList<String>(EXE_TIMES);
84
85 for (int i = 0; i < EXE_TIMES; i++) {
86 allStrings.add(generateRandomString(ran.nextInt(50)));
87 }
88
89 return allStrings;
90 }
91
92 /**
93 * To generate the random string by the random algorithm
94 * <br>
95 * The char between 32 and 127 is normal char
96 *
97 * @param length
98 * @return
99 */
100 private String generateRandomString(int length) {
101 StringBuffer sb = new StringBuffer(length);
102
103 for (int i = 0; i < length; i++) {
104 sb.append((char) (ran.nextInt(95) + 32));
105 }
106
107 return sb.toString();
108 }
109 }

==========================神奇的分割线=========================

                           源码请猛戳{ 这里

===========================================================

 

参考资料:

一致性哈希算法及其在分布式系统中的应用

memcached全面剖析–4. memcached的分布式算法

大型网站技术架构:核心原理与案例分析