分布式缓存技术redis学习系列(八)——JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取

时间:2021-11-16 17:25:17

redis集群环境,客户端使用JedisCluster获取连接并操作redis服务,上一篇 分布式缓存技术redis学习系列(七)——spring整合jediscluster 简单介绍了spring使用JedisCluster,这篇从JedisCluster源码层面看看是如何使用。

一、集群初始化

1、从单元测试开始,代码如下:

    @Autowired  
private JedisCluster jedisCluster;

@Test
public void testJedisCluster(){
jedisCluster.set("name", "啊芝");
String val = jedisCluster.get("name");
System.out.println(val);
}

2、在单元测试中我们注入了JedisCluster jedisCluster,它来源于spring配置文件对其的注册:

    <bean id="jedisCluster" class="com.jsun.service.redis.impl.JedisClusterFactory">  
<property name="addressConfig">
<value>classpath:redis.properties</value>
</property>
<property name="addressKeyPrefix" value="cluster" /> <!-- 属性文件里 key的前缀 -->

<property name="timeout" value="300000" />
<property name="maxRedirections" value="6" />
<property name="genericObjectPoolConfig" ref="genericObjectPoolConfig" />
</bean>

3、JedisClusterFactory:

它实现了InitializingBean接口,重写了afterPropertiesSet()方法,当JedisClusterFactory Bean被注册之后,此方法被调用,它里面调用parseHostAndPort()方法,具体代码如下:

    @Override  
public void afterPropertiesSet() throws Exception {
//拿到所有节点配置
Set<HostAndPort> haps = this.parseHostAndPort();

//初始化集群
jedisCluster = new JedisCluster(haps, timeout, maxRedirections,genericObjectPoolConfig);

}
    private Set<HostAndPort> parseHostAndPort() throws Exception {  
try {
//读取配置文件,即redis.properties,把配置的节点全部存放入Set<HostAndPort> haps里面返回
Properties prop = new Properties();
prop.load(this.addressConfig.getInputStream());

Set<HostAndPort> haps = new HashSet<HostAndPort>();
for (Object key : prop.keySet()) {

if (!((String) key).startsWith(addressKeyPrefix)) {
continue;
}

String val = (String) prop.get(key);

boolean isIpPort = p.matcher(val).matches();

if (!isIpPort) {
throw new IllegalArgumentException("ip 或 port 不合法");
}
String[] ipAndPort = val.split(":");

HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
haps.add(hap);
}

return haps;
} catch (IllegalArgumentException ex) {
throw ex;
} catch (Exception ex) {
throw new Exception("解析 jedis 配置文件失败", ex);
}
}

4、从初始化集群代码一路跟踪,直至如下代码处:

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
//遍历所有从配置文件读取的节点
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
try {
cache.discoverClusterNodesAndSlots(jedis);

//这个break是个重点,虽然使用的for循环遍历,但是此处的break让此循环立刻跳出
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}

for (HostAndPort node : startNodes) {
cache.setNodeIfNotExist(node);
}
}


注意上面代码段for循环里面的break:
它让for循环立刻跳出,导致for循环实际上只执行了一次,也就是说只使用了配置文件中所有节点配置的第一个,如此说来,集群配置文件中节点配置一个和配置多个效果一致。


同时还要注意到:通过获取的第一个节点配置实例化了一个Jedis jedis,如果配置文件中第一个节点指向的服务挂机或无法连接,将导致程序无法使用整个集群,虽然redis集群中其它节点是可用的。

5、discoverClusterNodesAndSlots:从redis服务器获取集群节点信息以及slot槽信息

 //存放
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();

try {
this.nodes.clear();
this.slots.clear();

String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(nodeInfo, new HostAndPort(
jedis.getClient().getHost(), jedis.getClient().getPort()));

HostAndPort targetNode = clusterNodeInfo.getNode();
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
}
} finally {
w.unlock();
}
}


关键信息:String localNodes = jedis.clusterNodes();
调用jedis实例的clusterNodes,实际上在redis服务端执行了cluster nodes命令,执行结果如下:

2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 119.254.166.136:7031 myself,master - 0 0 1 connected 0-5460
6f7119b06bb3316119f0bed3f793c2ce87983566 119.254.166.136:7036 slave 6fd9a873b29f5e9a61756606ececa4a953a11db7 0 1479278388635 6 connected
1b42b9d25779ccd5555fb804d01ddfbdd20635bf 119.254.166.136:7032 master - 0 1479278389637 2 connected 5461-10922
e22f9b0d7cb3fdb932926a1b4d9c3140e70255eb 119.254.166.136:7034 slave 2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 0 1479278390639 4 connected
cd480d1b437ad323ae3e15e548db8faf43a5d766 119.254.166.136:7035 slave 1b42b9d25779ccd5555fb804d01ddfbdd20635bf 0 1479278385630 5 connected
6fd9a873b29f5e9a61756606ececa4a953a11db7 119.254.166.136:7033 master - 0 1479278386632 3 connected 10923-16383

6、通过cluster nodes命令获取集群所有节点的信息字符串,然后进行解析封装,解析函数parse代码如下:

public static final int SLOT_INFORMATIONS_START_INDEX = 8;
public static final int HOST_AND_PORT_INDEX = 1;

public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
String[] nodeInfoPartArray = nodeInfo.split(" ");

HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray, current);
ClusterNodeInformation info = new ClusterNodeInformation(node);

if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
fillSlotInformation(slotInfoPartArray, info);
}

return info;
}

关键信息:String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
获取slot槽分配的区间值,比如0-5460,具体获取代码如下:

 private String[] extractSlotParts(String[] nodeInfoPartArray) {
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
- SLOT_INFORMATIONS_START_INDEX];
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
}
return slotInfoPartArray;
}

parse函数中另一个关键信息:fillSlotInformation(slotInfoPartArray, info);

fillSlotInformation函数调用了fillSlotInformationFromSlotRange函数,fillSlotInformationFromSlotRange函数作用是把所有的slot槽的index值存放到addAvailableSlot集合中,具体代码如下:

  private void fillSlotInformation(String[] slotInfoPartArray, ClusterNodeInformation info) {
for (String slotRange : slotInfoPartArray) {
fillSlotInformationFromSlotRange(slotRange, info);
}
}
private void fillSlotInformationFromSlotRange(String slotRange, ClusterNodeInformation info) {
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
// slot is in transition
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);

if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
// import
info.addSlotBeingImported(slot);
} else {
// migrate (->-)
info.addSlotBeingMigrated(slot);
}
} else if (slotRange.contains("-")) {
// slot range
String[] slotRangePart = slotRange.split("-");
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer.valueOf(slotRangePart[1]); slot++) {
info.addAvailableSlot(slot);
}
} else {
// single slot
info.addAvailableSlot(Integer.valueOf(slotRange));
}
}

7、初始化核心工作:把当前节点存放入nodes集合中,key为节点host:port,value为JedisPool实例;把slot槽index索引值与当前节点的JedisPool进行映射,存入Map

public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));

if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}

for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}

关键信息:slots.put(slot, targetPool);实现slot槽index索引值与当前节点的JedisPool进行映射

二、使用JedisCluster客户端存取值

1、还是看单元测试:

    @Autowired  
private JedisCluster jedisCluster;

//验证高可用性
@Test
public void testJedisCluster1() throws InterruptedException{
for(int i=0;i<100;i++){

try{
jedisCluster.set("name"+i, "hello world="+i);
String val = jedisCluster.get("name"+i);
System.out.println(val);

//重连次数超过redis集群环境主从服务个数
}catch(JedisClusterMaxRedirectionsException m){
System.err.println("start Too many Cluster redirections?");
//Thread.sleep(2000);
i =i-1;
System.err.println("continue Too many Cluster redirections?");
continue;
}catch(JedisClusterException jce){
//redis服务器的哨兵机制,还没有完成挂机master替换或者替换失败
//此时客户端请求从挂机的master中获取数据,所得到的结果就是the cluster is down
System.out.println("start CLUSTERDOWN the cluster is down");
//Thread.sleep(2000);
i =i-1;
System.out.println("continue CLUSTERDOWN the cluster is down");
continue;
}
//修改该睡眠时间的大小,会影响抛出异常的类型,
//较小值更容易抛出JedisClusterMaxRedirectionsException异常,
//较小值更容易抛出JedisClusterException异常
//以上三句结论,缺乏充分验证,请勿轻信!!!
Thread.sleep(1000);
}
}

2、jedisCluster.set():使用匿名内部类重写execute方法从redis服务器存取数据,调用run方法获取分配的slot槽,进而获取slot槽对应的JedisPool来生成jedis连接实例

  @Override
public String set(final String key, final String value) {
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value);
}
}.run(key);
}

3、run(),调用runWithRetries()方法:

private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) {
//redirections 重试连接次数,与集群环境主从服务个数相同
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}

Jedis connection = null;
try {

if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();

// if asking success, reset asking flag
asking = false;
} else {

//默认首次tryRandomNode=false
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}

return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}

releaseConnection(connection, true);
connection = null;

// retry with random connection
//回调,重试连接
return runWithRetries(key, redirections - 1, true, asking);
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache();
} else {
throw new JedisClusterException(jre);
}

releaseConnection(connection, false);
connection = null;


//回调,重试连接
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection, false);
}

}

关键信息解析:redirections参数

重试连接次数,与配置的redis集群环境主从服务个数相同,如果集群中一个master挂机,并且哨兵机制还没有发现该master挂机,客户端程序发送请求,出现无法连接的情况,之后会不断重试连接,超过最大集群服务个数,则抛出JedisClusterMaxRedirectionsException异常

关键信息解析:tryRandomNode参数

是否重试随机连接,首次为false,如果出现master挂机无法连接,执行重试,该参数变为true。

关键信息解析:connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

1)、通过JedisCluster的CRC16算法,对当前的key进行计算,获取一个整形值,区间为16384 - 1,redis集群分配的所有slot数
2)、通过获取的整形值,从集群初始化的slots集合中获取对应的JedisPool,进而获取Jedis连接实例

=====20170622更新=====

关键信息解析:两个catch语句块
两个catch语句块,实现了master挂机时,客户端重连的机制,这也是JedisCluster实现集群高可用的关键。

但是,如果master挂机,客户端程序执行重连,redis的哨兵机制还没有发现该master挂机或者发现但是没有完成挂机master替换或者替换失败,最终还是会抛出JedisClusterMaxRedirectionsException或者JedisClusterException异常,客户端程序需要对此情况进行额外处理,延迟并增加重试次数,否则造成部分情况失败,额外处理方式,可以参考单元测试

=====20170622更新=====

总结:

程序启动初始化集群环境:

1)、读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例

2)、用获取的redis连接实例执行clusterNodes()方法,实际执行redis服务端cluster nodes命令,获取主从配置信息

3)、解析主从配置信息,先把所有节点存放到nodes的map集合中,key为节点的ip:port,value为当前节点的jedisPool

4)、解析主节点分配的slots区间段,把slot对应的索引值作为key,第三步中拿到的jedisPool作为value,存储在slots的map集合中

综上,就实现了slot槽索引值与jedisPool的映射,这个jedisPool包含了master的节点信息,所以槽和几点是对应的,与redis服务端一致

从集群环境存取值:

1)、把key作为参数,执行CRC16算法,获取key对应的slot值

2)、通过该slot值,去slots的map集合中获取jedisPool实例

3)、通过jedisPool实例获取jedis实例,最终完成redis数据存取工作


jediscluster并不能实现客户端程序高可用:上述初始化和存取值的过程,如果客户端程序运行过程中,某一个master挂了,redis服务端的sentinel哨兵执行了主从替换,但是程序还是必然报错;
因为集群节点初始化是程序启动时执行的,并且在程序执行过程中只初始化一次;后续服务端master变更,客户端程序初始化的slots与master对应关系并没有同步实现变更;
存取值过程,依据slot索引拿到jedisPool实例,其包含的redis连接还是原来的也就是挂掉的ip:port,所以集群配置并没有让客户端程序实现高可用,只是实现了分布式功能。
只有重启服务重新初始化新的集群环境,程序方可正常运行。


===20170622更新======
上述蓝色字体结论部分有错误,经过重新验证,得出一下结论:
jediscluster可以实现客户端程序高可用,当出现master挂机,客户端程序通过catch语句块形式进行回调,完成重连操作,实现高可用机制;
但是,redis集群环境是通过哨兵机制进行监控的,要考虑的哨兵机制的延迟性(替换挂机的master),客户端程序需要对因哨兵机制的延迟造成的影响进行处理(比如增加延迟重试、对指定jedisCluster异常进行捕获后再重试等),充分保证客户端程序高可用