在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。
背景
最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)
分析
redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式
- 丢弃
- 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用
代码
直接上代码 其实直接redis的工具类就可以解决了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package com.test
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.List;
/**
* @desc redis队列实现方式
* @anthor
* @date
**/
public class RedisUcUitl {
private static final String LOCK_SUCCESS = "OK" ;
private static final String SET_IF_NOT_EXIST = "NX" ;
private static final String SET_WITH_EXPIRE_TIME = "PX" ;
private static final Long RELEASE_SUCCESS = 1L;
private RedisUcUitl() {
}
/**
* logger
**/
/**
* 存储redis队列顺序存储 在队列首部存入
*
* @param key 字节类型
* @param value 字节类型
*/
public static Long lpush(Jedis jedis, final byte [] key, final byte [] value) {
return jedis.lpush(key, value);
}
/**
* 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时
*
* @param srckey
* @param dstkey
* @param timeout 0 表示永不超时
* @return
*/
public static byte [] brpoplpush(Jedis jedis, final byte [] srckey, final byte [] dstkey, final int timeout) {
return jedis.brpoplpush(srckey, dstkey, timeout);
}
/**
* 返回制定的key,起始位置的redis数据
* @param redisKey
* @param start
* @param end -1 表示到最后
* @return
*/
public static List< byte []> lrange(Jedis jedis, final byte [] redisKey, final long start, final long end) {
return jedis.lrange(redisKey, start, end);
}
/**
* 删除key
* @param redisKey
*/
public static void delete(Jedis jedis, final byte [] redisKey) {
return jedis.del(redisKey);
}
/**
* 尝试加锁
* @param lockKey key名称
* @param requestId 身份标识
* @param expireTime 过期时间
* @return
*/
public static boolean tryGetDistributedLock(Jedis jedis, final String lockKey, final String requestId, final int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
return LOCK_SUCCESS.equals(result);
}
/**
* 释放锁
* @param lockKey key名称
* @param requestId 身份标识
* @return
*/
public static boolean releaseDistributedLock(Jedis jedis, final String lockKey, final String requestId) {
final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" ;
jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
}
|
业务逻辑主要代码如下
1.先消耗队列中的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
while ( true ){
// 消费队列
try {
// 被放入redis队列的数据 序列化后的
byte [] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1 );
if (bytes == null || bytes.isEmpty()){
// 队列中没数据时退出
break ;
}
// 反序列化对象
Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
// 塞入唯一的值 防止被其他线程误解锁
String requestId = UUID.randomUUID().toString();
boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100 );
if (lockGetFlag){
// 成功获取锁 进行业务处理
//TODO
// 处理完毕释放锁
boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);
} else {
// 未能获得锁放入等待队列
RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
}
} catch (Exception e){
break ;
}
}
|
2.处理最新接到的数据
同样是走尝试获取锁,获取不到放入队列的流程
一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class ObjectSerialUtil {
private ObjectSerialUtil() {
// 工具类
}
/**
* 将Object对象序列化为byte[]
*
* @param obj 对象
* @return byte数组
* @throws Exception
*/
public static byte [] objectToBytes(Object obj) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
byte [] bytes = bos.toByteArray();
bos.close();
oos.close();
return bytes;
}
/**
* 将bytes数组还原为对象
*
* @param bytes
* @return
* @throws Exception
*/
public static Object bytesToObject( byte [] bytes) {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bin);
return ois.readObject();
} catch (Exception e) {
throw new BaseException( "反序列化出错!" , e);
}
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://juejin.im/post/5d077d04f265da1b5f265661