Semaphore
可以很轻松完成信号量控制,Semaphore
可以控制某个资源可被同时访问的个数,通过 acquire()
获取一个许可,
如果没有就等待,而 release()
释放一个许可。
Semaphore
的结构如下:
从上面可以看出,Semaphore和ReentrantLock
一样,都是包含公平锁(FairySync
)和非公平锁(NonfairSync
),两个锁都是继承Sync
,而Sync
也是继承自AQS
。其构造函数如下:
- /**
- * 创建具有给定的许可数和非公平的公平设置的 Semaphore。
- */
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- /**
- * 创建具有给定的许可数和给定的公平设置的 Semaphore。
- */
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
在ReentrantLock
中已经阐述过,公平锁和非公平锁获取锁机制的差别:对于公平锁而言,如果当前线程不在CLH
队列的头部,则需要排队等候,而非公平锁则不同,它无论当前线程处于CLH
队列的何处都会直接获取锁。所以公平信号量和非公平信号量的区别也一样。
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
对于公平信号量和非公平信号量,他们机制的差异就体现在traAcquireShared()
方法中:
公平锁
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- //判断该线程是否位于CLH队列的列头,如果是的话返回 -1,调用doAcquireSharedInterruptibly()
- if (hasQueuedPredecessors())
- return -1;
- //获取当前的信号量许可
- int available = getState();
- //设置“获得acquires个信号量许可之后,剩余的信号量许可数”
- int remaining = available - acquires;
- //如果剩余信号量 > 0 ,则设置“可获取的信号量”为remaining
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
tryAcquireShared
是尝试获取 信号量,remaining
表示下次可获取的信号量。
对于hasQueuedPredecessors
、compareAndSetState
在ReentrantLock
中已经阐述了,hasQueuedPredecessors
用于判断该线程是否位于CLH
队列列头,compareAndSetState
用于设置state
的,它是进行原子操作的。代码如下:
- public final boolean hasQueuedPredecessors() {
- Node t = tail; // Read fields in reverse initialization order
- Node h = head;
- Node s;
- return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
- }
- protected final boolean compareAndSetState(int expect, int update) {
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
doAcquireSharedInterruptibly
源代码如下:
- private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
- /** 创建CLH队列的node节点,Node.SHARED表示该节点为共享锁 */
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- //获取该节点的前继节点
- final Node p = node.predecessor();
- //当p为头节点时,基于公平锁机制,线程尝试获取锁
- if (p == head) {
- //尝试获取锁
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- //判断当前线程是否需要阻塞,如果阻塞的话,则一直处于阻塞状态知道获取共享锁为止
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
doAcquireSharedInterruptibly
主要是做两个工作;1、尝试获取共享锁,2、阻塞线程直到线程获取共享锁。
addWaiter(Node.SHARED)
:创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“
(Node.SHARED);并将该节点添加到CLH
队列末尾。
shouldParkAfterFailedAcquire
:如果在尝试获取锁失败之后,线程应该等待,返回true
;否则返回false
。
parkAndCheckInterrupt
:当前线程会进入等待状态,直到获取到共享锁才继续运行。
非公平锁
CLH
队列的列头,如下:
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
tryAcquireShared
直接调用
AQS
的
nonfairTryAcquireShared()
。通过上面的代码我可看到非公平锁并没有通过
if (hasQueuedPredecessors())
这样的条件来判断该节点是否为
CLH
队列的头节点,而是直接判断信号量。
信号量Semaphore
的释放和获取不同,它没有分公平锁和非公平锁。如下:
- public void release() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- //尝试释放共享锁
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
release()
释放线索所占有的共享锁,它首先通过tryReleaseShared
尝试释放共享锁,如果成功直接返回,如果失败则调用doReleaseShared
来释放共享锁。
tryReleaseShared
:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- //设置可获取的信号许可数为next
- if (compareAndSetState(current, next))
- return true;
- }
- }
doReleaseShared
:
- private void doReleaseShared() {
- for (;;) {
- //node 头节点
- Node h = head;
- //h != null,且h != 尾节点
- if (h != null && h != tail) {
- //获取h节点对应线程的状态
- int ws = h.waitStatus;
- //若h节点状态为SIGNAL,表示h节点的下一个节点需要被唤醒
- if (ws == Node.SIGNAL) {
- //设置h节点状态
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue;
- //唤醒h节点对应的下一个节点
- unparkSuccessor(h);
- }
- //若h节点对应的状态== 0 ,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态
- else if (ws == 0 && compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue;
- }
- //h == head时,则退出循环,若h节点发生改变时则循环继续
- if (h == head)
- break;
- }
- }
假如现在有10个人去售票厅买票,但是窗口只有2个,那么同时能够买票的只能有2个人,当2个人中任意一个人买好票离开之后,
等待的8个人中又会有一个人可以占用窗口买票。
- public class SemaphoreTest {
- public static void main(String[] args) {
- /** 定义窗口个数 **/
- Semaphore semaphore = new Semaphore(2);
- ExecutorService executor = Executors.newCachedThreadPool();
- /** 模拟10个用户 **/
- for (int i = 1; i <= 10; i++) {
- final int offset = i;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- /** 获取信号量许可 **/
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】进入窗口,准备买票");
- TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
- System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】买票完成,即将离开");
- TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
- System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】离开售票窗口-->还有"+ semaphore.getQueueLength() +"个人在等待排队");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- executor.shutdown();
- }
- }
- pool-1-thread-1用户【1】进入窗口,准备买票
- pool-1-thread-2用户【2】进入窗口,准备买票
- pool-1-thread-2用户【2】买票完成,即将离开
- pool-1-thread-1用户【1】买票完成,即将离开
- pool-1-thread-1用户【1】离开售票窗口-->还有8个人在等待排队
- pool-1-thread-3用户【3】进入窗口,准备买票
- pool-1-thread-2用户【2】离开售票窗口-->还有7个人在等待排队
- pool-1-thread-4用户【4】进入窗口,准备买票
- pool-1-thread-3用户【3】买票完成,即将离开
- pool-1-thread-3用户【3】离开售票窗口-->还有6个人在等待排队
- pool-1-thread-5用户【5】进入窗口,准备买票
- pool-1-thread-4用户【4】买票完成,即将离开
- pool-1-thread-5用户【5】买票完成,即将离开
- pool-1-thread-4用户【4】离开售票窗口-->还有5个人在等待排队
- pool-1-thread-6用户【6】进入窗口,准备买票
- pool-1-thread-6用户【6】买票完成,即将离开
- pool-1-thread-5用户【5】离开售票窗口-->还有4个人在等待排队
- pool-1-thread-7用户【7】进入窗口,准备买票
- pool-1-thread-6用户【6】离开售票窗口-->还有3个人在等待排队
- pool-1-thread-8用户【8】进入窗口,准备买票
- pool-1-thread-8用户【8】买票完成,即将离开
- pool-1-thread-8用户【8】离开售票窗口-->还有2个人在等待排队
- pool-1-thread-9用户【9】进入窗口,准备买票
- pool-1-thread-9用户【9】买票完成,即将离开
- pool-1-thread-7用户【7】买票完成,即将离开
- pool-1-thread-9用户【9】离开售票窗口-->还有1个人在等待排队
- pool-1-thread-10用户【10】进入窗口,准备买票
- pool-1-thread-7用户【7】离开售票窗口-->还有0个人在等待排队
- pool-1-thread-10用户【10】买票完成,即将离开
- pool-1-thread-10用户【10】离开售票窗口-->还有0个人在等待排队
我们最终可以看到在其中两个客户购票未完全结束,第三个客户(线程)是不会被允许进入购票的。
- 在上述的示例中,如果
fair
传的是true
则各个线程公平竞争,即按照等待时间的长短决定谁先获取许可。
以10 个线程竞争 2 个许可为例,执行结果如下,首选是线程 1、2 获取了许可,随后线程 3、4获取了许可,最后是线程5、6获取许
可,顺序基本上与创建线程并启动的先后顺序一致,也与各个线程等待的时间基本相符(如上述的打印结果)。
- 在上述的示例中,如果
fair
传的是false
则各个线程非公平竞争,随机选取一个线程获取许可。
以 10个线程竞争 2 个许可为例,执行结果如下,首先是线程 1、2 获取了许可,随后线程 6、7 获取了许可,
最后是线程 10、3 获取许可,与线程创建启动时间无关,也与线程等待时间无关。
- pool-1-thread-2用户【2】进入窗口,准备买票
- pool-1-thread-1用户【1】进入窗口,准备买票
- pool-1-thread-2用户【2】买票完成,即将离开
- pool-1-thread-1用户【1】买票完成,即将离开
- pool-1-thread-2用户【2】离开售票窗口-->还有8个人在等待排队
- pool-1-thread-6用户【6】进入窗口,准备买票
- pool-1-thread-1用户【1】离开售票窗口-->还有7个人在等待排队
- pool-1-thread-7用户【7】进入窗口,准备买票
- pool-1-thread-6用户【6】买票完成,即将离开
- pool-1-thread-7用户【7】买票完成,即将离开
- pool-1-thread-6用户【6】离开售票窗口-->还有6个人在等待排队
- pool-1-thread-9用户【9】进入窗口,准备买票
- pool-1-thread-9用户【9】买票完成,即将离开
- pool-1-thread-7用户【7】离开售票窗口-->还有5个人在等待排队
- pool-1-thread-4用户【4】进入窗口,准备买票
- pool-1-thread-9用户【9】离开售票窗口-->还有4个人在等待排队
- pool-1-thread-5用户【5】进入窗口,准备买票
- pool-1-thread-4用户【4】买票完成,即将离开
- pool-1-thread-4用户【4】离开售票窗口-->还有3个人在等待排队
- pool-1-thread-8用户【8】进入窗口,准备买票
- pool-1-thread-5用户【5】买票完成,即将离开
- pool-1-thread-8用户【8】买票完成,即将离开
- pool-1-thread-8用户【8】离开售票窗口-->还有2个人在等待排队
- pool-1-thread-3用户【3】进入窗口,准备买票
- pool-1-thread-5用户【5】离开售票窗口-->还有1个人在等待排队
- pool-1-thread-10用户【10】进入窗口,准备买票
- pool-1-thread-3用户【3】买票完成,即将离开
- pool-1-thread-10用户【10】买票完成,即将离开
- pool-1-thread-10用户【10】离开售票窗口-->还有0个人在等待排队
- pool-1-thread-3用户【3】离开售票窗口-->还有0个人在等待排队