四。方案2,基于redis的分布式锁
/**
* 分布式锁工厂类
*/
public class RedisLockUtil {
private static final Logger logger = ();
private static Object schemeLock = new Object();
private static Map<String, RedisLockUtil> instances = new ConcurrentHashMap();
public static RedisLockUtil getInstance(String schema) {
RedisLockUtil u = (schema);
if (u == null) {
synchronized (schemeLock) {
u = (schema);
if (u == null) {
LockObserver lo = new LockObserver(schema);
u = new RedisLockUtil(schema, lo);
(schema, u);
}
}
}
return u;
}
private Object mutexLock = new Object();
private Map<String, Object> mutexLockMap = new ConcurrentHashMap();
private Map<String, RedisReentrantLock> cache = new ConcurrentHashMap<String, RedisReentrantLock>();
private DelayQueue<RedisReentrantLock> dq = new DelayQueue<RedisReentrantLock>();
private AbstractLockObserver lo;
public RedisLockUtil(String schema, AbstractLockObserver lo) {
Thread th = new Thread(lo);
(false);
("Lock Observer:"schema);
();
clearUselessLocks(schema);
= lo;
}
public void clearUselessLocks(String schema) {
Thread th = new Thread(new Runnable() {
@Override
public void run() {
while (!()) {
try {
RedisReentrantLock t = ();
if (()) {
String key = ();
synchronized (getMutex(key)) {
(key);
}
}
();
} catch (InterruptedException e) {
}
}
}
});
(true);
("Lock cleaner:"schema);
();
}
private Object getMutex(String key) {
Object mx = (key);
if (mx == null) {
synchronized (mutexLock) {
mx = (key);
if (mx == null) {
mx = new Object();
(key, mx);
}
}
}
return mx;
}
private RedisReentrantLock getLock(String key, boolean addref) {
RedisReentrantLock lock = (key);
if (lock == null) {
synchronized (getMutex(key)) {
lock = (key);
if (lock == null) {
lock = new RedisReentrantLock(key, lo);
(key, lock);
}
}
}
if (addref) {
if (!()) {
synchronized (getMutex(key)) {
lock = (key);
if (!()) {
lock = new RedisReentrantLock(key, lo);
(key, lock);
}
}
}
}
return lock;
}
public void reset() {
for (String s : ()) {
getLock(s, false).unlock();
}
}
/**
* 尝试加锁
* 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁
*
* @param key
* @return
* @throws Exception
* @throws InterruptedException
* @throws KeeperException
*/
public LockStat lock(String key) {
return lock(key, -1);
}
public LockStat lock(String key, int timeout) {
RedisReentrantLock ll = getLock(key, true);
();
try {
if ((false)) {
();
return ;
}
if ((timeout)) {
return ;
} else {
();
if (()) {
(ll);
}
return null;
}
} catch (LockNotExistsException e) {
();
return lock(key, timeout);
} catch (RuntimeException e) {
();
throw e;
}
}
public void unlock(String key, LockStat stat) {
unlock(key, stat, false);
}
public void unlock(String key, LockStat stat, boolean keepalive) {
if (stat == null) return;
if ((stat)) {
RedisReentrantLock lock = getLock(key, false);
boolean candestroy = ();
if (candestroy && !keepalive) {
if (()) {
(lock);
}
}
}
}
public static enum LockStat {
NONEED,
SUCCESS
}
}
/**
* 分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed {
private static final Logger logger = ();
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3 * 60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key, AbstractLockObserver observer) {
= key;
= observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock() {
redisLock = new RedisLock(key, new LockListener() {
@Override
public void lockAcquired() {
();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
();
}*/
();
}
}, observer);
}
public boolean incRef() {
synchronized (refLock) {
if (destroyed) return false;
ref;
}
return true;
}
public void descrRef() {
synchronized (refLock) {
ref--;
}
}
public boolean clear() {
if (destroyed) return true;
synchronized (refLock) {
if (ref > 0) {
return false;
}
destroyed = true;
();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException {
if (timeout <= 0) timeout = ;
//incRef();
();//多线程竞争时,先拿到第一层锁
if (redisLock == null) {
();
//descrRef();
throw new LockNotExistsException();
}
try {
lockcount = new CountDownLatch(1);
boolean res = (timeout);
if (!res) {
(timeout, );
//(timeout*1000);
if (!()) {
();
return false;
}
}
return true;
} catch (InterruptedException e) {
();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock() {
if (!isOwner(true)) {
try {
throw new RuntimeException("big ================================================ :"key);
} catch (Exception e) {
("err:"e, e);
}
return false;
}
try {
();
();//多线程竞争时,释放最外层锁
} catch (RuntimeException e) {
();//多线程竞争时,释放最外层锁
throw e;
} finally {
descrRef();
}
return canDestroy();
}
public boolean canDestroy() {
synchronized (refLock) {
return ref <= 0;
}
}
<a href="/">
nbso online
casino reviews
</a>
public String getKey() {
return key;
}
public void setKey(String key) {
= key;
}
public boolean isOwner(boolean check) {
synchronized (refLock) {
if (redisLock == null) {
("reidsLock is null:key="key);
return false;
}
boolean a = ();
boolean b = ();
if (check) {
if (!a || !b) {
(key";a:"a";b:"b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized (this) {
if (cleartime > 0) return false;
= () 10 * 1000;
return true;
}
}
public void resetCleartime() {
synchronized (this) {
= -1;
}
}
@Override
public int compareTo(Delayed object) {
if (object instanceof RedisReentrantLock) {
RedisReentrantLock t = (RedisReentrantLock) object;
long l = - ;
if (l > 0) return 1; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if (l < 0) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = (cleartime - (), );
return d;
}
}
/**
* 使用Redis实现的分布式锁
* 基本工作原理如下:
* 1. 使用setnx(key,时间戮 超时),如果设置成功,则直接拿到锁
* 2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时
* 3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮 超时 1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)
*/
public class RedisLock implements LockListener {
private String key;
private boolean owner = false;
private AbstractLockObserver observer = null;
private LockListener lockListener = null;
private boolean waiting = false;
private long expire;//锁超时时间,以秒为单位
private boolean expired = false;
public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
= key;
= lockListener;
= observer;
}
public boolean trylock(long expire) {
synchronized (this) {
if (owner) {
return true;
}
= expire;
= false;
if (!waiting) {
owner = (key, expire);
if (!owner) {
waiting = true;
(key, this);
}
}
return owner;
}
}
public boolean isOwner() {
return owner;
}
public void unlock() {
synchronized (this) {
(key);
owner = false;
}
}
public void clear() {
synchronized (this) {
if (waiting) {
(key);
waiting = false;
}
}
}
public boolean doExpire() {
synchronized (this) {
if (owner) return true;
if (expired) return false;
expired = true;
clear();
}
return false;
}
@Override
public void lockAcquired() {
synchronized (this) {
if (expired) {
unlock();
return;
}
owner = true;
waiting = false;
}
();
}
@Override
public long getExpire() {
return ;
}
@Override
public void lockError() {
synchronized (this) {
owner = false;
waiting = false;
();
}
}
}
public class LockObserver extends AbstractLockObserver implements Runnable {
private CacheRedisClient client;
private Object mutex = new Object();
private Map<String, LockListener> lockMap = new ConcurrentHashMap();
private boolean stoped = false;
private long interval = 500;
private boolean terminated = false;
private CountDownLatch doneSignal = new CountDownLatch(1);
public LockObserver(String schema) {
client = new CacheRedisClient(schema);
(new ExitHandler() {
public void run() {
stoped = true;
try {
();
} catch (InterruptedException e) {
}
}
});
}
public void addLockListener(String key, LockListener listener) {
if (terminated) {
();
return;
}
synchronized (mutex) {
(key, listener);
}
}
public void removeLockListener(String key) {
synchronized (mutex) {
(key);
}
}
@Override
public void run() {
while (!terminated) {
long p1 = ();
Map<String, LockListener> clone = new HashMap();
synchronized (mutex) {
(lockMap);
}
Set<String> keyset = ();
if (() > 0) {
(());
for (String key : keyset) {
LockListener ll = (key);
try {
if (tryLock(key, ())) {
();
removeLockListener(key);
}
} catch (Exception e) {
();
removeLockListener(key);
}
}
();
} else {
if (stoped) {
terminated = true;
();
return;
}
}
try {
long p2 = ();
long cost = p2 - p1;
if (cost <= interval) {
(interval - cost);
} else {
(interval * 2);
}
} catch (InterruptedException e) {
}
}
}
/**
* 超时时间单位为s!!!
*
* @param key
* @param expire
* @return
*/
public boolean tryLock(final String key, final long expireInSecond) {
if (terminated) return false;
final long tt = ();
final long expire = expireInSecond * 1000;
final Long ne = tt expire;
List<Object> mm = (key, new MultiBlock() {
@Override
public void execute() {
(key, ne);
((key));
}
});
Long res = (Long) (0);
if (new Long(1).equals(res)) {
return true;
} else {
byte[] bb = (byte[]) (1);
Long ex = (bb);
if (ex == null || tt > ex) {
Long old = (key, new Long(ne 1));
if (old == null || (ex == null && old == null) || (ex != null && (old))) {
return true;
}
}
}
return false;
}
public void unLock(String key) {
(key);
}
}
使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级;
五。总结
本文介绍了两种分布式锁的实现方案;方案1最大的优势在于避免结点挂掉后导致的死锁;方案2最大的优势在于性能超强;在实际生产过程中,结合自身情况来决定最适合的分布式锁方案是架构师的必修课。