redis集群环境,客户端使用JedisCluster获取连接并操作redis服务,上一篇 分布式缓存技术redis学习系列(七)——spring整合jediscluster 简单介绍了spring使用JedisCluster,这篇从JedisCluster源码层面看看是如何使用。
一、集群初始化
1、从单元测试开始,代码如下:
@Autowired
private JedisCluster jedisCluster;
@Test
public void testJedisCluster(){
jedisCluster.set("name", "啊芝");
String val = jedisCluster.get("name");
System.out.println(val);
}
2、在单元测试中我们注入了JedisCluster jedisCluster,它来源于spring配置文件对其的注册:
<bean id="jedisCluster" class="com.jsun.service.redis.impl.JedisClusterFactory">
<property name="addressConfig">
<value>classpath:redis.properties</value>
</property>
<property name="addressKeyPrefix" value="cluster" /> <!-- 属性文件里 key的前缀 -->
<property name="timeout" value="300000" />
<property name="maxRedirections" value="6" />
<property name="genericObjectPoolConfig" ref="genericObjectPoolConfig" />
</bean>
3、JedisClusterFactory:
它实现了InitializingBean接口,重写了afterPropertiesSet()方法,当JedisClusterFactory Bean被注册之后,此方法被调用,它里面调用parseHostAndPort()方法,具体代码如下:
@Override
public void afterPropertiesSet() throws Exception {
//拿到所有节点配置
Set<HostAndPort> haps = this.parseHostAndPort();
//初始化集群
jedisCluster = new JedisCluster(haps, timeout, maxRedirections,genericObjectPoolConfig);
}
private Set<HostAndPort> parseHostAndPort() throws Exception {
try {
//读取配置文件,即redis.properties,把配置的节点全部存放入Set<HostAndPort> haps里面返回
Properties prop = new Properties();
prop.load(this.addressConfig.getInputStream());
Set<HostAndPort> haps = new HashSet<HostAndPort>();
for (Object key : prop.keySet()) {
if (!((String) key).startsWith(addressKeyPrefix)) {
continue;
}
String val = (String) prop.get(key);
boolean isIpPort = p.matcher(val).matches();
if (!isIpPort) {
throw new IllegalArgumentException("ip 或 port 不合法");
}
String[] ipAndPort = val.split(":");
HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
haps.add(hap);
}
return haps;
} catch (IllegalArgumentException ex) {
throw ex;
} catch (Exception ex) {
throw new Exception("解析 jedis 配置文件失败", ex);
}
}
4、从初始化集群代码一路跟踪,直至如下代码处:
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
//遍历所有从配置文件读取的节点
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
try {
cache.discoverClusterNodesAndSlots(jedis);
//这个break是个重点,虽然使用的for循环遍历,但是此处的break让此循环立刻跳出
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
for (HostAndPort node : startNodes) {
cache.setNodeIfNotExist(node);
}
}
注意上面代码段for循环里面的break:
它让for循环立刻跳出,导致for循环实际上只执行了一次,也就是说只使用了配置文件中所有节点配置的第一个,如此说来,集群配置文件中节点配置一个和配置多个效果一致。
同时还要注意到:通过获取的第一个节点配置实例化了一个Jedis jedis,如果配置文件中第一个节点指向的服务挂机或无法连接,将导致程序无法使用整个集群,虽然redis集群中其它节点是可用的。
5、discoverClusterNodesAndSlots:从redis服务器获取集群节点信息以及slot槽信息
//存放
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
this.nodes.clear();
this.slots.clear();
String localNodes = jedis.clusterNodes();
for (String nodeInfo : localNodes.split("\n")) {
ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(nodeInfo, new HostAndPort(
jedis.getClient().getHost(), jedis.getClient().getPort()));
HostAndPort targetNode = clusterNodeInfo.getNode();
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
}
} finally {
w.unlock();
}
}
关键信息:String localNodes = jedis.clusterNodes();
调用jedis实例的clusterNodes,实际上在redis服务端执行了cluster nodes命令,执行结果如下:
2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 119.254.166.136:7031 myself,master - 0 0 1 connected 0-5460
6f7119b06bb3316119f0bed3f793c2ce87983566 119.254.166.136:7036 slave 6fd9a873b29f5e9a61756606ececa4a953a11db7 0 1479278388635 6 connected
1b42b9d25779ccd5555fb804d01ddfbdd20635bf 119.254.166.136:7032 master - 0 1479278389637 2 connected 5461-10922
e22f9b0d7cb3fdb932926a1b4d9c3140e70255eb 119.254.166.136:7034 slave 2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 0 1479278390639 4 connected
cd480d1b437ad323ae3e15e548db8faf43a5d766 119.254.166.136:7035 slave 1b42b9d25779ccd5555fb804d01ddfbdd20635bf 0 1479278385630 5 connected
6fd9a873b29f5e9a61756606ececa4a953a11db7 119.254.166.136:7033 master - 0 1479278386632 3 connected 10923-16383
6、通过cluster nodes命令获取集群所有节点的信息字符串,然后进行解析封装,解析函数parse代码如下:
public static final int SLOT_INFORMATIONS_START_INDEX = 8;
public static final int HOST_AND_PORT_INDEX = 1;
public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) {
String[] nodeInfoPartArray = nodeInfo.split(" ");
HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray, current);
ClusterNodeInformation info = new ClusterNodeInformation(node);
if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) {
String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
fillSlotInformation(slotInfoPartArray, info);
}
return info;
}
关键信息:String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
获取slot槽分配的区间值,比如0-5460,具体获取代码如下:
private String[] extractSlotParts(String[] nodeInfoPartArray) {
String[] slotInfoPartArray = new String[nodeInfoPartArray.length
- SLOT_INFORMATIONS_START_INDEX];
for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) {
slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i];
}
return slotInfoPartArray;
}
parse函数中另一个关键信息:fillSlotInformation(slotInfoPartArray, info);
fillSlotInformation函数调用了fillSlotInformationFromSlotRange函数,fillSlotInformationFromSlotRange函数作用是把所有的slot槽的index值存放到addAvailableSlot集合中,具体代码如下:
private void fillSlotInformation(String[] slotInfoPartArray, ClusterNodeInformation info) {
for (String slotRange : slotInfoPartArray) {
fillSlotInformationFromSlotRange(slotRange, info);
}
}
private void fillSlotInformationFromSlotRange(String slotRange, ClusterNodeInformation info) {
if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) {
// slot is in transition
int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]);
if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) {
// import
info.addSlotBeingImported(slot);
} else {
// migrate (->-)
info.addSlotBeingMigrated(slot);
}
} else if (slotRange.contains("-")) {
// slot range
String[] slotRangePart = slotRange.split("-");
for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer.valueOf(slotRangePart[1]); slot++) {
info.addAvailableSlot(slot);
}
} else {
// single slot
info.addAvailableSlot(Integer.valueOf(slotRange));
}
}
7、初始化核心工作:把当前节点存放入nodes集合中,key为节点host:port,value为JedisPool实例;把slot槽index索引值与当前节点的JedisPool进行映射,存入Map
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}
关键信息:slots.put(slot, targetPool);实现slot槽index索引值与当前节点的JedisPool进行映射
二、使用JedisCluster客户端存取值
1、还是看单元测试:
@Autowired
private JedisCluster jedisCluster;
//验证高可用性
@Test
public void testJedisCluster1() throws InterruptedException{
for(int i=0;i<100;i++){
try{
jedisCluster.set("name"+i, "hello world="+i);
String val = jedisCluster.get("name"+i);
System.out.println(val);
//重连次数超过redis集群环境主从服务个数
}catch(JedisClusterMaxRedirectionsException m){
System.err.println("start Too many Cluster redirections?");
//Thread.sleep(2000);
i =i-1;
System.err.println("continue Too many Cluster redirections?");
continue;
}catch(JedisClusterException jce){
//redis服务器的哨兵机制,还没有完成挂机master替换或者替换失败
//此时客户端请求从挂机的master中获取数据,所得到的结果就是the cluster is down
System.out.println("start CLUSTERDOWN the cluster is down");
//Thread.sleep(2000);
i =i-1;
System.out.println("continue CLUSTERDOWN the cluster is down");
continue;
}
//修改该睡眠时间的大小,会影响抛出异常的类型,
//较小值更容易抛出JedisClusterMaxRedirectionsException异常,
//较小值更容易抛出JedisClusterException异常
//以上三句结论,缺乏充分验证,请勿轻信!!!
Thread.sleep(1000);
}
}
2、jedisCluster.set():使用匿名内部类重写execute方法从redis服务器存取数据,调用run方法获取分配的slot槽,进而获取slot槽对应的JedisPool来生成jedis连接实例
@Override
public String set(final String key, final String value) {
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value);
}
}.run(key);
}
3、run(),调用runWithRetries()方法:
private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) {
//redirections 重试连接次数,与集群环境主从服务个数相同
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
//默认首次tryRandomNode=false
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
} catch (JedisConnectionException jce) {
if (tryRandomNode) {
// maybe all connection is down
throw jce;
}
releaseConnection(connection, true);
connection = null;
// retry with random connection
//回调,重试连接
return runWithRetries(key, redirections - 1, true, asking);
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache();
} else {
throw new JedisClusterException(jre);
}
releaseConnection(connection, false);
connection = null;
//回调,重试连接
return runWithRetries(key, redirections - 1, false, asking);
} finally {
releaseConnection(connection, false);
}
}
关键信息解析:redirections参数
重试连接次数,与配置的redis集群环境主从服务个数相同,如果集群中一个master挂机,并且哨兵机制还没有发现该master挂机,客户端程序发送请求,出现无法连接的情况,之后会不断重试连接,超过最大集群服务个数,则抛出JedisClusterMaxRedirectionsException
异常
关键信息解析:tryRandomNode参数
是否重试随机连接,首次为false,如果出现master挂机无法连接,执行重试,该参数变为true。
关键信息解析:connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
1)、通过JedisCluster的CRC16算法,对当前的key进行计算,获取一个整形值,区间为16384 - 1,redis集群分配的所有slot数
2)、通过获取的整形值,从集群初始化的slots集合中获取对应的JedisPool,进而获取Jedis连接实例
=====20170622更新=====
关键信息解析:两个catch语句块
两个catch语句块,实现了master挂机时,客户端重连的机制,这也是JedisCluster实现集群高可用的关键。
但是,如果master挂机,客户端程序执行重连,redis的哨兵机制还没有发现该master挂机或者发现但是没有完成挂机master替换或者替换失败,最终还是会抛出JedisClusterMaxRedirectionsException
或者JedisClusterException
异常,客户端程序需要对此情况进行额外处理,延迟并增加重试次数,否则造成部分情况失败,额外处理方式,可以参考单元测试
=====20170622更新=====
总结:
程序启动初始化集群环境:
1)、读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例
2)、用获取的redis连接实例执行clusterNodes()方法,实际执行redis服务端cluster nodes命令,获取主从配置信息
3)、解析主从配置信息,先把所有节点存放到nodes的map集合中,key为节点的ip:port,value为当前节点的jedisPool
4)、解析主节点分配的slots区间段,把slot对应的索引值作为key,第三步中拿到的jedisPool作为value,存储在slots的map集合中
综上,就实现了slot槽索引值与jedisPool的映射,这个jedisPool包含了master的节点信息,所以槽和几点是对应的,与redis服务端一致
从集群环境存取值:
1)、把key作为参数,执行CRC16算法,获取key对应的slot值
2)、通过该slot值,去slots的map集合中获取jedisPool实例
3)、通过jedisPool实例获取jedis实例,最终完成redis数据存取工作
jediscluster并不能实现客户端程序高可用:上述初始化和存取值的过程,如果客户端程序运行过程中,某一个master挂了,redis服务端的sentinel哨兵执行了主从替换,但是程序还是必然报错;
因为集群节点初始化是程序启动时执行的,并且在程序执行过程中只初始化一次;后续服务端master变更,客户端程序初始化的slots与master对应关系并没有同步实现变更;
存取值过程,依据slot索引拿到jedisPool实例,其包含的redis连接还是原来的也就是挂掉的ip:port,所以集群配置并没有让客户端程序实现高可用,只是实现了分布式功能。
只有重启服务重新初始化新的集群环境,程序方可正常运行。
===20170622更新======
上述蓝色字体结论部分有错误,经过重新验证,得出一下结论:
jediscluster可以实现客户端程序高可用,当出现master挂机,客户端程序通过catch语句块形式进行回调,完成重连操作,实现高可用机制;
但是,redis集群环境是通过哨兵机制进行监控的,要考虑的哨兵机制的延迟性(替换挂机的master),客户端程序需要对因哨兵机制的延迟造成的影响进行处理(比如增加延迟重试、对指定jedisCluster异常进行捕获后再重试等),充分保证客户端程序高可用