一.简介:
一致性hash算法提出了在动态变化的Cache环境中,判定哈希算法好坏的四个定义:
1、平衡性(Balance)
2、单调性(Monotonicity)
3、分散性(Spread)
4、负载(Load)
普通的哈希算法(也称硬哈希)采用简单取模的方式,将机器进行散列,这在cache环境不变的情况下能取得让人满意的结果,但是当cache环境动态变化时,这种静态取模的方式显然就不满足单调性的要求(当增加或减少一台机子时,几乎所有的存储内容都要被重新散列到别的缓冲区中)。
一致性哈希算法的基本实现原理是将机器节点和key值都按照一样的hash算法映射到一个0~2^32的圆环上。当有一个写入缓存的请求到来时,计算Key值k对应的哈希值Hash(k),如果该值正好对应之前某个机器节点的Hash值,则直接写入该机器节点,如果没有对应的机器节点,则顺时针查找下一个节点,进行写入,如果超过2^32还没找到对应节点,则从0开始查找(因为是环状结构)。
实现代码1:
首先有一个设备类,定义了机器名和ip:
public class Cache { public String name; public String ipAddress; }
主要的实现:
public class Shard<T> { //hash 算法并不是保证绝对的平衡,如果 cache 较少的话,对象并不能被均匀的映射到 cache 上, //所以增加虚拟节点 private TreeMap<Long, T> nodes; private List<T> shards; //节点碎片 private final int NODE_NUM = 10; // 每个机器节点关联的虚拟节点个数 public Shard(List<T> shards) { this.shards = shards; init(); } private void init() { nodes = new TreeMap<Long, T>(); for (int i = 0; i < shards.size(); i++) { // 遍历真实节点 final T shardInfo = shards.get(i); for (int n = 0; n < NODE_NUM; n++) { // 真实节点关联虚拟节点,真实节点是VALUE; nodes.put((long) Hash("SHARD-" + i + "-NODE-" + n), shardInfo); } System.out.println(shardInfo); } } public T getShardInfo(String key) { SortedMap<Long, T> tail = nodes.tailMap((long) Hash(key)); if (tail.size() == 0) { return nodes.get(nodes.firstKey()); } //找到最近的虚拟节点 return tail.get(tail.firstKey()); } /** * 改进的32位FNV算法,高离散 * * @param string * 字符串 * @return int值 */ public static int Hash(String str) { final int p = 16777619; int hash = (int) 2166136261L; for (byte b : str.getBytes()) hash = (hash ^ b) * p; hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5; return hash; } }
测试:
public class Test { /** * @param args */ public static void main(String[] args) { List<Cache> myCaches=new ArrayList<Cache>(); Cache cache1=new Cache(); cache1.name="COMPUTER1"; Cache cache2=new Cache(); cache2.name="COMPUTER2"; myCaches.add(cache1); myCaches.add(cache2); Shard<Cache> myShard=new Shard<Cache>(myCaches); Cache currentCache=myShard.getShardInfo("info1"); System.out.println(currentCache.name); // for(int i=0;i<20;i++) // { // String object=getRandomString(20);//产生20位长度的随机字符串 // Cache currentCache=myShard.getShardInfo(object); // System.out.println(currentCache.name); // } } public static String getRandomString(int length) { //length表示生成字符串的长度 String base = "abcdefghijklmnopqrstuvwxyz0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(base.length()); sb.append(base.charAt(number)); } return sb.toString(); } }
上述实现略为简单,32FNV算法的一种改进
FNV哈希算法是一种高离散性的哈希算法,特别适用于哈希非常相似的字符串,例如:URL,IP,主机名,文件名等。
以下服务使用了FNV:
1、calc
2、DNS
3、mdbm key/value查询函数
4、数据库索引hash
5、主流web查询/索引引擎
6、高性能email服务
7、基于消息ID查询函数
8、auti-spam反垃圾邮件过滤器
9、NFS实现(比如freebsd 4.3, linux NFS v4)
10、Cohesia MASS project
11、Ada 95的spellchecker
12、开源x86汇编器:flatassembler user-defined symbol hashtree
13、PowerBASIC
14、PS2、XBOX上的文本资源
15、非加密图形文件指纹
16、FRET
17、Symbian DASM
18、VC++ 2005的hash_map实现
19、memcache中的libketama
20、 PHP 5.x
21、twitter中用于改进cache碎片
22、BSD IDE project
23、deliantra game server
24、 Leprechaun
25、IPv6流标签
实现代码2:
HashFunction:
package ha; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class HashFunction { private MessageDigest md5 = null; public long hash(String key) { if (md5 == null) { try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("no md5 algorythm found"); } } md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); return res & 0xffffffffL; } }
主要实现:
package ha; import java.util.Collection; import java.util.SortedMap; import java.util.TreeMap; public class ConsistentHash<T> { private final HashFunction hashFunction; private final int numberOfReplicas; // 虚拟节点 private final SortedMap<Long, T> circle = new TreeMap<Long, T>(); // 用来存储虚拟节点hash值 到真实node的映射 public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes) { this.hashFunction = hashFunction; this.numberOfReplicas = numberOfReplicas; for (T node : nodes) { add(node); } } public void add(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.put(hashFunction.hash(node.toString() + i), node); } } public void remove(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.remove(hashFunction.hash(node.toString() + i)); } } /** * 获得一个最近的顺时针节点 * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点 * @return */ public T get(Object key) { if (circle.isEmpty()) { return null; } long hash = hashFunction.hash((String) key); if (!circle.containsKey(hash)) { SortedMap<Long, T> tailMap = circle.tailMap(hash); ////返回此映射的部分视图,其键大于等于 hash hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } return circle.get(hash); } public long getSize() { return circle.size(); } }
测试:
package ha; import ha2.Cache; import java.util.HashSet; import java.util.Random; import java.util.Set; public class MainApp { public static void main(String[] args) { Set<String> nodes = new HashSet<String>(); nodes.add("127.0.0.1"); nodes.add("192.144.111"); nodes.add("127.0.0.3"); ConsistentHash<String> consistentHash = new ConsistentHash<String>(new HashFunction(), 160, nodes); consistentHash.add("127.0.0.4"); // consistentHash.add("E"); // consistentHash.add("F"); // consistentHash.add("G"); // consistentHash.add("H"); // consistentHash.add("I"); // consistentHash.add("J"); // consistentHash.add("K"); // consistentHash.add("L"); // consistentHash.add("M"); // consistentHash.add("N"); // consistentHash.add("O"); // consistentHash.add("P"); // consistentHash.add("R"); // consistentHash.add("S"); // consistentHash.add("T"); // consistentHash.add("U"); // consistentHash.add("V"); // consistentHash.add("W"); System.out.println(consistentHash.getSize()); //640 System.out.println(consistentHash.get("127.0.0.1#111123")); System.out.println(consistentHash.get("127.0.0.33#111123"));//ip=>tcp Random random = new Random(); int s = random.nextInt(100)%(100-0+1) + 0; System.out.println(s); int t = random.nextInt()*100;//key System.out.println(consistentHash.get(t+"#")); for(int i=0;i<20;i++) { String object = getRandomString(20);//产生20位长度的随机字符串 System.out.println(consistentHash.get(object)); } } public static String getRandomString(int length) { //length表示生成字符串的长度 String base = "abcdefghijklmnopqrstuvwxyz0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(base.length()); sb.append(base.charAt(number)); } return sb.toString(); } }
实现代码3:
KetamaNodeLocator:
/** * 在这个环中,节点之间是存在顺序关系的, * 所以TreeMap的key必须实现Comparator接口 * @author 5 */ public final class KetamaNodeLocator { private TreeMap<Long, Node> ketamaNodes; // 记录所有虚拟服务器节点,为什么是Long类型,因为Long实现了Comparable接口 private HashAlgorithm hashAlg; private int numReps = 160; // 每个服务器节点生成的虚拟服务器节点数量,默认设置为160 public KetamaNodeLocator(List<Node> nodes, HashAlgorithm alg, int nodeCopies) { hashAlg = alg; ketamaNodes = new TreeMap<Long, Node>(); numReps = nodeCopies; // 对所有节点,生成numReps个虚拟结点 for (Node node : nodes) { // 每四个虚拟结点为一组,为什么这样?下面会说到 for (int i = 0; i < numReps / 4; i++) { // 为这组虚拟结点得到惟一名称 byte[] digest = hashAlg.computeMd5(node.getName() + i); /** * Md5是一个16字节长度的数组,将16字节的数组每四个字节一组, * 分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因 */ for (int h = 0; h < 4; h++) { // 对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key long m = hashAlg.hash(digest, h); ketamaNodes.put(m, node); } } } } /** * 根据一个key值在Hash环上顺时针寻找一个最近的虚拟服务器节点 * @param k * @return */ public Node getPrimary(final String k) { byte[] digest = hashAlg.computeMd5(k); Node rv = getNodeForKey(hashAlg.hash(digest, 0)); // 为什么是0?猜测:0、1、2、3都可以,但是要固定 return rv; } Node getNodeForKey(long hash) { final Node rv; Long key = hash; //如果找到这个节点,直接取节点,返回 if (!ketamaNodes.containsKey(key)) { //得到大于当前key的那个子Map,然后从中取出第一个key,就是大于且离它最近的那个key SortedMap<Long, Node> tailMap = ketamaNodes.tailMap(key); if (tailMap.isEmpty()) { key = ketamaNodes.firstKey(); } else { key = tailMap.firstKey(); } // For JDK1.6 version // key = ketamaNodes.ceilingKey(key); // if (key == null) { // key = ketamaNodes.firstKey(); // } } rv = ketamaNodes.get(key); return rv; } }
HashAlgorithm:
/** * hash算法,通过MD5算法实现 * MD5算法根据key生成一个16字节的序列,我们将其切成4段,将其中一段作为得到的Hash值 * 在生成虚拟服务器节点中,我们将这四段分别作为四个虚拟服务器节点的唯一标识,即四个hash值 * @author XXX */ public enum HashAlgorithm { /** * MD5-based hash algorithm used by ketama. */ KETAMA_HASH; public long hash(byte[] digest, int nTime) { long rv = ((long) (digest[3+nTime*4] & 0xFF) << 24) | ((long) (digest[2+nTime*4] & 0xFF) << 16) | ((long) (digest[1+nTime*4] & 0xFF) << 8) | (digest[0+nTime*4] & 0xFF); /** * 实际我们只需要后32位即可,为什么返回一个long类型? * 因为Long实现了Comparable接口 * Hash环上的节点之间是存在顺序关系的,必须实现Comparable接口 */ return rv & 0xffffffffL; /* Truncate to 32-bits */ } /** * Get the md5 of the given key. */ public byte[] computeMd5(String k) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 not supported", e); } md5.reset(); byte[] keyBytes = null; try { keyBytes = k.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Unknown string :" + k, e); } md5.update(keyBytes); return md5.digest(); } }
测试:
/** * 分布平均性测试 * @author 4 */ public class HashAlgorithmTest { static Random ran = new Random(); // key的数量,key在实际客户端中是根据要存储的值产生的hash序列? private static final Integer EXE_TIMES = 100000; // 服务器节点的数量 private static final Integer NODE_COUNT = 5; // 每个服务器节点生成的虚拟节点数量 private static final Integer VIRTUAL_NODE_COUNT = 160; /** * 模拟EXE_TIMES个客户端数据存储时选择缓存服务器的情况, * 得到每个服务器节点所存储的值的数量,从而计算出值在服务器节点的分布情况 * 判断该算法的"性能",正常情况下要求均匀分布 * @param args */ public static void main(String[] args) { HashAlgorithmTest test = new HashAlgorithmTest(); // 记录每个服务器节点所分布到的key节点数量 Map<Node, Integer> nodeRecord = new HashMap<Node, Integer>(); // 模拟生成NODE_COUNT个服务器节点 List<Node> allNodes = test.getNodes(NODE_COUNT); // 将服务器节点根据Hash算法扩展成VIRTUAL_NODE_COUNT个虚拟节点布局到Hash环上(实际上是一棵搜索树) // 由KetamaNodeLocator类实现和记录 KetamaNodeLocator locator = new KetamaNodeLocator(allNodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT); // 模拟生成随机的key值(由长度50以内的字符组成) List<String> allKeys = test.getAllStrings(); for (String key : allKeys) { // 根据key在Hash环上找到相应的服务器节点node Node node = locator.getPrimary(key); // 记录每个服务器节点分布到的数据个数 Integer times = nodeRecord.get(node); if (times == null) { nodeRecord.put(node, 1); } else { nodeRecord.put(node, times + 1); } } // 打印分布情况 System.out.println("Nodes count : " + NODE_COUNT + ", Keys count : " + EXE_TIMES + ", Normal percent : " + (float) 100 / NODE_COUNT + "%"); System.out.println("-------------------- boundary ----------------------"); for (Map.Entry<Node, Integer> entry : nodeRecord.entrySet()) { System.out.println("Node name :" + entry.getKey() + " - Times : " + entry.getValue() + " - Percent : " + (float)entry.getValue() / EXE_TIMES * 100 + "%"); } } /** * Gets the mock node by the material parameter * * @param nodeCount * the count of node wanted * @return * the node list */ private List<Node> getNodes(int nodeCount) { List<Node> nodes = new ArrayList<Node>(); for (int k = 1; k <= nodeCount; k++) { Node node = new Node("node" + k); nodes.add(node); } return nodes; } /** * All the keys */ private List<String> getAllStrings() { List<String> allStrings = new ArrayList<String>(EXE_TIMES); for (int i = 0; i < EXE_TIMES; i++) { allStrings.add(generateRandomString(ran.nextInt(50))); } return allStrings; } /** * To generate the random string by the random algorithm * <br> * The char between 32 and 127 is normal char * * @param length * @return */ private String generateRandomString(int length) { StringBuffer sb = new StringBuffer(length); for (int i = 0; i < length; i++) { sb.append((char) (ran.nextInt(95) + 32)); } return sb.toString(); } }
HashAlgorithmPercentTest
package ConsistentHashing; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; /** * ½ÚµãÔöɾ²âÊÔ * @author Lenovo */ public class HashAlgorithmPercentTest { static Random ran = new Random(); /** key's count */ private static final Integer EXE_TIMES = 100000; private static final Integer NODE_COUNT = 50; private static final Integer VIRTUAL_NODE_COUNT = 160; static List<String> allKeys = null; static { allKeys = getAllStrings(); } public static void main(String[] args) { Map<String, List<Node>> map = generateRecord(); List<Node> allNodes = getNodes(NODE_COUNT); System.out.println("Normal case : nodes count : " + allNodes.size()); call(allNodes, map); allNodes = getNodes(NODE_COUNT + 8); System.out.println("Added case : nodes count : " + allNodes.size()); call(allNodes, map); allNodes = getNodes(NODE_COUNT - 10); System.out.println("Reduced case : nodes count : " + allNodes.size()); call(allNodes, map); int addCount = 0; int reduceCount = 0; for (Map.Entry<String, List<Node>> entry : map.entrySet()) { List<Node> list = entry.getValue(); if (list.size() == 3) { if (list.get(0).equals(list.get(1))) { addCount++; } if (list.get(0).equals(list.get(2))) { reduceCount++; } } else { System.out.println("It's wrong size of list, key is " + entry.getKey() + ", size is " + list.size()); } } System.out.println(addCount + " --- " + reduceCount); System.out.println("Same percent in added case : " + (float) addCount * 100/ EXE_TIMES + "%"); System.out.println("Same percent in reduced case : " + (float) reduceCount * 100/ EXE_TIMES + "%"); } private static void call(List<Node> nodes, Map<String, List<Node>> map) { KetamaNodeLocator locator = new KetamaNodeLocator(nodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT); for (Map.Entry<String, List<Node>> entry : map.entrySet()) { Node node = locator.getPrimary(entry.getKey()); if (node != null) { List<Node> list = entry.getValue(); list.add(node); } } } private static Map<String, List<Node>> generateRecord() { Map<String, List<Node>> record = new HashMap<String, List<Node>>(EXE_TIMES); for (String key : allKeys) { List<Node> list = record.get(key); if (list == null) { list = new ArrayList<Node>(); record.put(key, list); } } return record; } /** * Gets the mock node by the material parameter * * @param nodeCount * the count of node wanted * @return * the node list */ private static List<Node> getNodes(int nodeCount) { List<Node> nodes = new ArrayList<Node>(); for (int k = 1; k <= nodeCount; k++) { Node node = new Node("node" + k); nodes.add(node); } return nodes; } /** * All the keys */ private static List<String> getAllStrings() { List<String> allStrings = new ArrayList<String>(EXE_TIMES); for (int i = 0; i < EXE_TIMES; i++) { allStrings.add(generateRandomString(ran.nextInt(50))); } return allStrings; } /** * To generate the random string by the random algorithm * <br> * The char between 32 and 127 is normal char * * @param length * @return */ private static String generateRandomString(int length) { StringBuffer sb = new StringBuffer(length); for (int i = 0; i < length; i++) { sb.append((char) (ran.nextInt(95) + 32)); } return sb.toString(); } }
参考:
https://blog.helong.info/blog/2015/03/13/jump_consistent_hash/
http://blog.codinglabs.org/articles/consistent-hashing.html