Java并发(十五):并发工具类——信号量Semaphore

时间:2021-03-29 02:41:26

先做总结:

1、Semaphore是什么?

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

2、Semaphore实现原理:

(1)semaphore实际是允许count个线程同时获取的共享锁。(semaphore = new Semaphore(count); 通过AQS实现,count即state)

(2)semaphore.acquire(); 获取semaphore的锁,

    state>0:还有数量,可以获取锁,state - 1;

    state<=0:线程数量已满,不能获取锁,需要将线程挂起并放入等待队列;

(3)semaphore.release(); 释放semaphore的锁,就是将 state+1 释放成功后,唤醒同步队列中的线程

一、应用举例

/**
* 为了简单起见我们假设停车场仅有5个停车位,一开始停车场没有车辆所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车。
* 然后又来三辆,这个时候由于只有两个停车位,所有只能停两辆,其余一辆必须在外面候着,直到停车场有空车位。
* 当然以后每来一辆都需要在外面候着。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆 要看选择的机制是公平还是非公平)。
*
* 从程序角度看,停车场就相当于信号量Semaphore,其中许可数为5,车辆就相对线程。
* 当来一辆车时,许可数就会减 1 ,当停车场没有车位了(许可书 ==0 ),其他来的车辆需要在外面等候着。
* 如果有一辆车开出停车场,许可数 + 1,然后放进来一辆车。
*/
class SemaphoreTest { static class Parking {
// 信号量
private Semaphore semaphore; Parking(int count) {
semaphore = new Semaphore(count);
} public void park() {
try {
// 获取信号量
semaphore.acquire();
long time = (long) (Math.random() * 10);
System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒...");
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + "开出停车场...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
} static class Car extends Thread {
Parking parking; Car(Parking parking) {
this.parking = parking;
} @Override
public void run() {
parking.park(); // 进入停车场
}
} public static void main(String[] args) {
Parking parking = new Parking(3); for (int i = 0; i < 5; i++) {
new Car(parking).start();
}
} /** 输出结果
* Thread-1进入停车场,停车2秒...
* Thread-2进入停车场,停车5秒...
* Thread-0进入停车场,停车3秒...
* Thread-1开出停车场...
* Thread-3进入停车场,停车0秒...
* Thread-3开出停车场...
* Thread-4进入停车场,停车0秒...
* Thread-4开出停车场...
* Thread-0开出停车场...
* Thread-2开出停车场...
*/
}

二、类结构

public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
}

三、原理解析

    // new Semaphore(count),将AQS的state设置成许可数量count
semaphore = new Semaphore(count); public Semaphore(int permits) {
sync = new NonfairSync(permits);
} Sync(int permits) {
setState(permits);
}
    /**
* 获取semaphore的锁,
* 获取到锁就可以继续操作,没有获取到锁就进入同步队列挂起
*/
semaphore.acquire(); public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
} /**
* state>0:还有数量,可以获取锁
* state<=0:线程数量已满,不能获取锁,需要将线程挂起
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
    /**
* 释放semaphore的锁,就是将state+1
* 释放成功后,唤醒同步队列中的下一个线程
*/
semaphore.release(); public void release() {
sync.releaseShared(1);
} public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
} protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

并发工具类(三)控制并发线程数的Semaphore

【死磕Java并发】—–J.U.C之并发工具类:Semaphore