大家都知道加锁是用来在并发情况防止同一个资源被多方抢占的有效手段,加锁其实就是同步互斥(或称独占)也行,即:同一时间不论有多少并发请求,只有一个能处理,其余要么排队等待,要么放弃执行。关于锁的实现网上大把的例子,我这里只是梳理与总结一下,以便参考方便。
同步互斥按作用范围可分为:
-
线程间同步互斥
下面分别通过代码示例来演示常见的线程间同步互斥的实现方法:
-
synchronized
/**
* 线程间同步(synchronized同步互斥锁,开启多个线程,若某个线程获得锁,则其余线程全部阻塞等待排队,当释放锁后,则下一个继续获得锁,其余线程仍等待)
*/
private void testSynchronized() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Runnable runnable = () -> {
long threadId = Thread.currentThread().getId();
for (int i = 0; i <= 100; i++) {
synchronized (lockObj) {
if (count > 0) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d --count:%d %n", threadId, i, --count);
}
} //若未加锁,则可能会出现负数,即并发问题
// if (count>0){
// try {
// Thread.sleep(200L);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.printf("threadId:%s,number:%d --count:%d %n",threadId,i,--count);
// }
}
}; executorService.execute(runnable);
executorService.execute(runnable); System.out.printf("lasted count:%d", count);
} -
synchronized同步互斥锁+通知等待模式
/**
* 线程间同步(synchronized同步互斥锁+通知等待模式,开启多个线程,当获得锁后,则可通过Object.notify()方法发出通知,通知其它等待锁或wait情况下恢复继续执行,示例演示的是生产与消费互相等待)
*/
private void testWaitAndNotify() {
count = 0;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Runnable productRunnable = () -> {
long threadId = Thread.currentThread().getId();
for (int i = 0; i <= 50; i++) {
synchronized (lockObj) { //获取锁
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d --生产后 count:%d %n", threadId, i, ++count);
lockObj.notify();//发出通知
try {
System.out.printf("threadId:%s,number:%d,等待生产%n", threadId, i);
if (i == 50) break;
lockObj.wait();//等待通知,阻塞当前线程
System.out.printf("threadId:%s,number:%d,收到通知,准备生产%n", threadId, i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
count = -1;
System.out.printf("threadId:%s,已生产完了。%n", threadId);
}; Runnable consumeRunnable = () -> {
long threadId = Thread.currentThread().getId();
for (int i = 0; i <= 200; i++) {
synchronized (lockObj) { //获取锁
if (count > 0) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d --消费后 count:%d %n", threadId, i, --count);
lockObj.notify(); //发出通知
} else {
try {
System.out.printf("threadId:%s,number:%d,等待消费%n", threadId, i);
if (count == -1) break;
lockObj.wait();//等待通知,阻塞当前线程
System.out.printf("threadId:%s,number:%d,收到通知,准备消费%n", threadId, i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
System.out.printf("threadId:%s,已消费完了。%n", threadId); }; executorService.execute(consumeRunnable);
executorService.execute(productRunnable); } -
条件锁ReentrantLock、Condition
/**
* 线程间同步(条件锁ReentrantLock、Condition,开启多个线程,当lock()获取锁后,则可以通过Lock的条件实例方法signal发送信号,通知其它等待锁或await情况下恢复继续执行,示例演示的是生产与消费互相等待)
*/
private void testLock() {
final Lock lock = new ReentrantLock();
final Condition lockCond = lock.newCondition(); count = 0;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Runnable productRunnable = () -> {
long threadId = Thread.currentThread().getId();
lock.lock();//先获得锁
for (int i = 0; i <= 50; i++) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d --生产后 count:%d %n", threadId, i, ++count);
lockCond.signal();//放出信号
try {
System.out.printf("threadId:%s,number:%d,等待生产%n", threadId, i);
if (i == 50) break;
lockCond.await();//等待信号,阻塞当前线程
System.out.printf("threadId:%s,number:%d,收到通知,准备生产%n", threadId, i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.unlock();//释放锁
count = -1;
System.out.printf("threadId:%s,已生产完了。%n", threadId);
}; Runnable consumeRunnable = () -> {
long threadId = Thread.currentThread().getId();
lock.lock();//先获得锁
for (int i = 0; i <= 200; i++) {
if (count > 0) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d --消费后 count:%d %n", threadId, i, --count);
lockCond.signal();//放出信号
} else {
try {
System.out.printf("threadId:%s,number:%d,等待消费%n", threadId, i);
if (count == -1) break;
lockCond.await();//等待信号,阻塞当前线程
System.out.printf("threadId:%s,number:%d,收到通知,准备消费%n", threadId, i);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
lock.unlock();
System.out.printf("threadId:%s,已消费完了。%n", threadId); }; executorService.execute(consumeRunnable);
executorService.execute(productRunnable);
} -
Future
/**
* 线程间同步(Future,采用Executors.submit开启1个或多个线程返回Future,线程后台异步执行不阻塞主线程,当需要获得线程结果时,即:Future.get,则会等待获取结果,
* 当然可以使用CompletableFuture来实现完全的异步回调处理结果,无需任何阻塞)
*/
private void testFuture() {
//refer:https://www.cnblogs.com/xiaoxi/p/8303574.html
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> task = executorService.submit(() -> {
long total = 0;
System.out.println("子线程for loop start...");
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
} total += i;
}
System.out.printf("子线程for loop end,total=%d %n", total);
return total;
}); //主线程处理其它逻辑,此时是与子线程在并行执行
for (int n = 0; n <= 30; n++) {
System.out.printf("主线程for loop中,n=%d %n", n);
} try {
long result = task.get();//等待子线程结果,如果未执行完则会阻塞主线程直到子线程完成出结果
System.out.printf("主线程获取子线程计算的结果,total=%d %n", result);
} catch (Exception e) {
e.printStackTrace();
} //使用CompletableFuture可异步回调获取结果,不会阻塞主线程
CompletableFuture.supplyAsync(() -> {
long total = 0;
System.out.println("子线程for loop start...");
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
} total += i;
}
System.out.printf("threadId:%s,子线程for loop end,total=%d %n", Thread.currentThread().getId(), total);
return total;
}).thenAccept(result -> {
//当子线程执行完成后会回调该方法
System.out.printf("threadId:%s,回调获取子线程计算的结果,total=%d %n", Thread.currentThread().getId(), result);
}); //主线程处理其它逻辑,此时是与子线程在并行执行
long threadId = Thread.currentThread().getId();
for (int n = 0; n <= 30; n++) {
System.out.printf("threadId:%s,主线程for loop2中,n=%d %n", threadId, n);
} System.out.printf("threadId:%s,主线程已执行完成。%n", threadId); } -
CountDownLatch(CyclicBarrier类似,支持分批并发执行,分批次阶段等待,且支持重设计数器)
/**
* 线程间同步(CountDownLatch,同时运行多个线程,在CountDownLatch计数器count为0前主线程会阻塞等待)
*/
private void testCountDownLatch() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);
Runnable runnable = () -> {
long threadId = Thread.currentThread().getId();
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("threadId:%s,number:%d %n", threadId, i);
}
System.out.printf("threadId:%s,已处理完成。%n", threadId);
latch.countDown();//扣减计数器-1
}; //开3个线程并行处理
for (int i = 1; i <= 3; i++) {
executorService.execute(runnable);
} long mainThreadId = Thread.currentThread().getId();
try {
System.out.printf("threadId:%s,主线程等待中...%n", mainThreadId);
latch.await();//等待全部执行完成,即计数器为0,阻塞主线程
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.printf("threadId:%s,主线程确认所有子线程都处理完成,count:%d,开始执行主线程逻辑。%n", mainThreadId, latch.getCount()); System.out.printf("threadId:%s,主线程已执行完成!%n", mainThreadId); } -
Semaphore
/**
* 线程间同步(Semaphore,开启多个线程,使用acquire获取1个许可【可指定一次获取多个许可】,
* 若未能获取到则等待,若已获得许可则占用了1个可用许可总数且可进入继续执行,待执行完成后应释放许可)
*/
private void testSemaphore(){
Semaphore wcSemaphore = new Semaphore(5,true);
Runnable runnable =() -> {
long threadId = Thread.currentThread().getId();
System.out.printf("threadId:%s,等待进入WC,目前还有:%d空位,排队等候人数:%d %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength());
try {
wcSemaphore.acquire();
System.out.printf("threadId:%s,进入WC,目前还有:%d空位,排队等候人数:%d,关门 %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength());
Thread.sleep(1000L);
System.out.printf("threadId:%s,离开WC,目前还有:%d空位,排队等候人数:%d,开门 %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength());
wcSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}; ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int n=1;n<=10;n++){
executorService.execute(runnable);
} long mainThreadId = Thread.currentThread().getId();
System.out.printf("threadId:%s,清洁阿姨等待打扫WC,目前还有:%d空位,排队等候人数:%d %n",
mainThreadId,wcSemaphore.availablePermits(),wcSemaphore.getQueueLength());
//如果还有排队且剩余空位未全部处理则等待
while (wcSemaphore.hasQueuedThreads() && wcSemaphore.drainPermits()!=5 && wcSemaphore.availablePermits()!=5){
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
} try {
wcSemaphore.acquire(5);
System.out.printf("threadId:%s,清洁阿姨开始打扫WC,关上WC入口,即所有人均不可再使用,目前还有:%d空位,排队等候人数:%d %n",
mainThreadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength());
} catch (InterruptedException e) {
e.printStackTrace();
} }
-
-
进程间同步互斥
采用FileLock实现进程间同步互斥,如果是C#、C++则可以使用Mutex
/**
* 进程间同步(FileLock文件锁,同时开启多个进程实例,若已获得锁的实例在执行,则后面的进程实例均只能等待,当然可以使用tryLock非阻塞模式)
*/
private void testFileLock() {
File lockFile = new File(System.getProperty("user.dir") + File.separator + "app.lock");
if (!lockFile.exists()) {
try {
if (!lockFile.createNewFile()) {
System.out.printf("创建文件失败:" + lockFile.getAbsolutePath());
return;
}
} catch (IOException e) {
e.printStackTrace();
}
} try { FileChannel fileChannel = new FileOutputStream(lockFile).getChannel();
String jvmName = ManagementFactory.getRuntimeMXBean().getName(); System.out.printf("jvm ProcessName:%s, 准备获取锁 ... %n", jvmName); FileLock lock = fileChannel.lock();//获取文件锁 for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("jvm ProcessName:%s, number:%d %n", jvmName, i);
} lock.release();
fileChannel.close(); System.out.printf("jvm ProcessName:%s, 处理完成,释放锁 %n", jvmName); } catch (Exception e) {
e.printStackTrace();
} } -
机器间同步互斥(即分布式锁)
-
基于DB实现(利用DB的更新独占锁)
---网上也有很多,我这里把之前用C#实现的基于DB的分布式锁的例子(JAVA同理)拿出来【原理是:有一张T_Locking表,SType=LOCK名,SValue=LOCK唯一标识值,RecordTime=加锁记录时间,获取锁时直接根据SType=N'Lock名' and SValue=N''来更新,若被其它实例更新,则SValue不可能为空,则获取锁失败,然后进一步判断是否出现过期锁的情况,如果过期则仍可以尝试更新获取锁即可】
/// <summary>
/// 设置分布式锁
/// </summary>
/// <returns></returns>
private bool SetDistributedLockBasedDB()
{
bool hasLock = false;
try
{
var sqlDapperUtil = new SqlDapperUtil(Constants.CfgKey_KYLDConnectionName);
////此处利用DB的更新排它锁,确保并发时只有一个优先执行,而当某个执行成功后,其它后续执行会因为条件更新不满足而更新失败,实现了多并发时只有一个能更新成功,即获得锁。
hasLock = sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=@SValue,RecordTime=getdate() where SType=N'Lock名' and SValue=N'' ", new { SValue = Lock唯一标识值 }); if (!hasLock) //如果未获得锁,还需要考虑加锁后未被正常释放锁的情况,故如下再次尝试对加锁超过1小时以上的进行重新更新再次获得锁,避免无效锁一直处于锁定状态
{
hasLock = sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=@SValue,RecordTime=getdate() " +
"where SType = N'Lock名' and SValue <> N'' and RecordTime < DATEADD(hh, -1, getdate())",
new { SValue = Lock唯一标识值 });
}
}
catch (Exception ex)
{
logger.Error("SetDistributedLockBasedDB Error: " + ex.ToString());
} return hasLock;
} /// <summary>
/// 释放分布式锁
/// </summary>
private void ReleaseDistributedLockBasedDB()
{
try
{
var sqlDapperUtil = new SqlDapperUtil(Constants.CfgKey_KYLDConnectionName);
sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=N'',RecordTime=getdate() where SType=N'Lock名' and SValue=@SValue", new { SValue = Lock唯一标识值 });
}
catch (Exception ex)
{
logger.Error("ReleaseDistributedLockBasedDB Error: " + ex.ToString());
}
} -
基于Redis实现
实现方式1(原生实现):Redis分布式锁的正确实现方式(Java版)
实现方式2(Redlock):Redis分布式锁的官方算法RedLock以及Java版本实现库Redisson
-
基于Zookeeper实现
-
*有同时列出三种实现方案的文章,可参见: Java分布式锁三种实现方案