Jedis Cluster源码分析

时间:2022-04-21 16:52:01

最近一个项目用到Jedis客户端,需要对这个客户端进行改造。看了一下Jedis Cluster源码,做个记录

首先,说核心内容, 在Jedis源码中,关于cluster有个两个重要的map。一个是nodes,一个是slots

nodes:  host:port  ---->  JedisPool

slots:  slot ----> JedisPool

nodes存放的是key为host:port到JedisPool的映射

slots存放的 slot到JedisPool的映射

这里,JedisPool是用apache common pool存放jedis对象的pool,slot是通过Crc16算出对16384取余得到

上个Jedis Cluster的Demo吧

 import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.util.HashSet;
import java.util.Set; /**
* Created by guanxianseng on 2017/8/15.
*
* nodes: host:port -> JedisPool
* slots: slot -> JedisPool
*/
public class TestCluster {
public static void main(String[] args) {
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7340));
jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7341));
jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7342));
JedisCluster jc = new JedisCluster(jedisClusterNodes);
jc.set("name", "guanxianseng");
System.out.println(jc.get("name"));
}
}

输出

guanxianseng

Process finished with exit code 0

这里IP是我的虚拟机的IP,开了两台虚拟机,部署的是三主三从的集群

首先,进入JedisCluster的构造函数,一路找下去,我们会看到这样的代码

 public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
initializeSlotsCache(nodes, poolConfig, password);
}

进入initializeSlotsCache方法

 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
if (password != null) {
jedis.auth(password);
}
try {
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

这里,获取集群节点的jedis对象,进入discoverClusterNodesAndSlots(jedis)

 public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock(); try {
reset();
List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
} List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
} HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}

第6行,其实就是执行slots命令。进入getAssignedSlotArray方法

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
List<Integer> slotNums = new ArrayList<Integer>();
for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
.intValue(); slot++) {
slotNums.add(slot);
}
return slotNums;
}

这里获取了,节点分配的slots

回到上面,进入generateHostAndPort方法

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
((Long) hostInfos.get(1)).intValue());
}

这里获取到节点的host和port

回到上面,进入setupNodeIfNotExist(targetNode);

 public JedisPool setupNodeIfNotExist(HostAndPort node) {
w.lock();
try {
String nodeKey = getNodeKey(node);
JedisPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool; JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
}

这里设置我们一开始提到的nodes, host:port   -------> JedisPool映射

继续回到上面,进入assignSlotsToNode(slotNums, targetNode);

 public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}

这里设置了前面说的slots, slot ------> JedisPool的映射

这里初始化完成

执行set命令

 @Override
public String set(final String key, final String value) {
return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value);
}
}.run(key);
}

进入run(key);方法

 public T run(String key) {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
} return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
}

进入runWithRetries()

 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
if (attempts <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
} Jedis connection = null;
try { if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking(); // if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
} return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) {

这里有点长,截取了前面一部分

 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

这里,计算key的slot,从slots获取Jedis对象

到这,基本已完成

总结一下,执行slots命令,缓存host:port --> JedisPool, slot ---->JedisPool映射。执行命令,key ---> slot ----> JedisPool   ------->Jedis