package cn.de.common.dubbo; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import cn.de.AppConstant; import cn.de.user.utils.MpqqTokenSecurityUtils; import cn.de.user.utils.UserTokenSecurityUtils; import cn.de.DataAccessReceiveService; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance; /** * 根据用户Id一致性hash负载均衡,用户未登录则随机选择 * <pre> * http://www.zsythink.net/archives/1182 * https://www.cnblogs.com/xrq730/p/5186728.html * </pre> * * @author mac * * 日期2017年5月12日 */ public class UserIdConsistentHashLoadBalance extends AbstractLoadBalance { /** 选择器集合。key为方法全路径名称,value为调用者集合 */ private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); /** 随机数 */ private final Random random = new Random(); /** * 使用一致性哈希选择一个调用者进行调用 * <pre> * invokers: * 192.168.0.1:8001 * 192.168.0.2:8001 * 192.168.0.3:8001 * * url: * cn.de.Bill.findBillById * * key: * userId * * </pre> * * @param invokers 某一接口的调用者集合 * @param url 接口 * @param invocation */ @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String userAuthToken = null == attrs ? null : attrs.get(AppConstant.RPC_TOKEN_PARM); String reqIp = null == attrs ? null : attrs.get(AppConstant.RPC_REQ_IP); String hashKey = null; if (StringUtils.isNotEmpty(userAuthToken)) { try { hashKey = UserTokenSecurityUtils.getUserIdByTokenStr(userAuthToken); } catch (Exception e) { } } //如果用户id不存在则取请求ip if (null == hashKey) { hashKey = reqIp; } //如果未获取得到用户id,随机取一个调用者 if (null == hashKey) { int length = invokers.size(); // 总个数 return invokers.get(random.nextInt(length)); } else { //cn.de.User:0.0.1.findUserById String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); //生成一个哈希码 int identityHashCode = System.identityHashCode(invokers); //获取某个方法的所有调用者 ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); //不存在,则初始化选择器集合,并选择一个调用者 if (selector == null || selector.getIdentityHashCode() != identityHashCode) { //初始化选择器(哈希环) selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } //存在,则选择一个调用者 return selector.select(hashKey); } } /** * 一致性hash选择 * * * 日期2017年5月12日 */ private static final class ConsistentHashSelector<T> { /** 虚拟节点,key为哈希值,value为虚拟节点。解决哈希环倾斜问题,实现将n个节点尽可能多的、均匀的分布在哈希环上 */ private final TreeMap<Long, Invoker<T>> virtualInvokers; /** 虚拟节点数 */ private final int replicaNumber; /** 唯一HashCode */ private final int identityHashCode; public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = System.identityHashCode(invokers); URL url = invokers.get(0).getUrl(); /* * 默认节点数不能过小也不能过大。 * 过小导致map中元素过少,根据hash值选择调用者时,选择同一个调用者概率较大,不能起到负载均衡效果; * 过大导致map存储空间增大,且根据hash值选择调用者时,增加了将hash值与map中元素比较的次数; * * 节点增加时,invokers增加,前n-1个调用者的hash值不变,第n个调用者的部分hash值添加到哈希环中, * 如果请求不变,选择出的调用者可能变化,也可能不变。因为在哈希环中随机地添加了一些元素,再次查找比指定 * 哈希值大的元素时,查找结果可能变化,也可能不变。 * * 节点减少时,invokers减少,前n-1个调用者的hash值不变,第n个调用者的部分hash值从哈希环中移除, * 如果请求不变,选择出的调用者可能变化,也可能不变。因为在哈希环中随机地移除了一些元素,再次查找比指定 * 哈希值大的元素时,查找结果可能变化,也可能不变。 */ this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); for (Invoker<T> invoker : invokers) { for (int i = 0; i < replicaNumber / 4; i++) { //进行一次md5编码,使value分布均匀 byte[] digest = md5(invoker.getUrl().toFullString() + i); for (int h = 0; h < 4; h++) { //进行一次哈希编码,使value分布均匀 long m = hash(digest, h); //key为0-max之间的随机的n个数 virtualInvokers.put(m, invoker); } } } } public int getIdentityHashCode() { return identityHashCode; } /** * 根据用户id一致性选择的核心算法 * * @param invocation * @return */ public Invoker<T> select(String userId) { String key = userId; byte[] digest = md5(key); //哈希范围为0-max long hash = hash(digest, 0); Invoker<T> invoker = selectForKey(hash); return invoker; } /** * 根据哈希值选择一个调用者 * * @param hash * @return */ private Invoker<T> selectForKey(long hash) { Invoker<T> invoker; Long key = hash; //map中不存在指定的哈希,大概率是这样,因为map中元素个数很少,hash()生成的值范围很大 if (!virtualInvokers.containsKey(key)) { //查找key值>=指定哈希的元素 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); if (tailMap.isEmpty()) { //不存在,则取key值最小的元素。指定哈希大于map中最大的key时,出现该现象。形成一个哈希环 key = virtualInvokers.firstKey(); } else { //存在,取剩余元素的key最小的元素 key = tailMap.firstKey(); } } invoker = virtualInvokers.get(key); return invoker; } /** * 计算哈希值,为0-max之间的一个数 * <pre> * max为2^32-1 ? * 不使用jdk提供的字符串的hashcode方法,其可能产生负数,且对于ip字符串,生成的hash值比较集中 * </pre> * * @param digest * @param number * @return */ private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = null; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } } }
1 代码入口函数为doSelect,理解代码逻辑后可阅读类注释中的两篇博客,了解代码中未提到的一些问题。
2 代码为组内一个同事写的,其参考网上部分文章实现。