Java并发编程之Semaphore

时间:2021-05-16 20:52:42

Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,

如果没有就等待,而 release()释放一个许可。


Java并发编程之Semaphore

Semaphore的结构如下:

Java并发编程之Semaphore

从上面可以看出,Semaphore和ReentrantLock一样,都是包含公平锁(FairySync)和非公平锁(NonfairSync),两个锁都是继承Sync,而Sync也是继承自AQS。其构造函数如下:

 
  1. /**
  2. * 创建具有给定的许可数和非公平的公平设置的 Semaphore。
  3. */
  4. public Semaphore(int permits) {
  5. sync = new NonfairSync(permits);
  6. }
  7.  
  8. /**
  9. * 创建具有给定的许可数和给定的公平设置的 Semaphore。
  10. */
  11. public Semaphore(int permits, boolean fair) {
  12. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  13. }

Java并发编程之Semaphore


ReentrantLock中已经阐述过,公平锁和非公平锁获取锁机制的差别:对于公平锁而言,如果当前线程不在CLH队列的头部,则需要排队等候,而非公平锁则不同,它无论当前线程处于CLH队列的何处都会直接获取锁。所以公平信号量和非公平信号量的区别也一样。

 
  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4.  
  5. public final void acquireSharedInterruptibly(int arg)
  6. throws InterruptedException {
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. if (tryAcquireShared(arg) < 0)
  10. doAcquireSharedInterruptibly(arg);
  11. }

对于公平信号量和非公平信号量,他们机制的差异就体现在traAcquireShared()方法中:


公平锁


 
  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. //判断该线程是否位于CLH队列的列头,如果是的话返回 -1,调用doAcquireSharedInterruptibly()
  4. if (hasQueuedPredecessors())
  5. return -1;
  6. //获取当前的信号量许可
  7. int available = getState();
  8. //设置“获得acquires个信号量许可之后,剩余的信号量许可数”
  9. int remaining = available - acquires;
  10.  
  11. //如果剩余信号量 > 0 ,则设置“可获取的信号量”为remaining
  12. if (remaining < 0 || compareAndSetState(available, remaining))
  13. return remaining;
  14. }
  15. }

tryAcquireShared是尝试获取 信号量,remaining表示下次可获取的信号量。

对于hasQueuedPredecessorscompareAndSetStateReentrantLock中已经阐述了,hasQueuedPredecessors用于判断该线程是否位于CLH队列列头,compareAndSetState用于设置state的,它是进行原子操作的。代码如下:

 
  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail; // Read fields in reverse initialization order
  3. Node h = head;
  4. Node s;
  5. return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  6. }
  7.  
  8. protected final boolean compareAndSetState(int expect, int update) {
  9. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  10. }
doAcquireSharedInterruptibly 源代码如下:

 
  1. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  2. /** 创建CLH队列的node节点,Node.SHARED表示该节点为共享锁 */
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. //获取该节点的前继节点
  8. final Node p = node.predecessor();
  9. //当p为头节点时,基于公平锁机制,线程尝试获取锁
  10. if (p == head) {
  11. //尝试获取锁
  12. int r = tryAcquireShared(arg);
  13. if (r >= 0) {
  14. setHeadAndPropagate(node, r);
  15. p.next = null; // help GC
  16. failed = false;
  17. return;
  18. }
  19. }
  20. //判断当前线程是否需要阻塞,如果阻塞的话,则一直处于阻塞状态知道获取共享锁为止
  21. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  22. throw new InterruptedException();
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }

doAcquireSharedInterruptibly主要是做两个工作;1、尝试获取共享锁,2、阻塞线程直到线程获取共享锁。

addWaiter(Node.SHARED):创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。

shouldParkAfterFailedAcquire:如果在尝试获取锁失败之后,线程应该等待,返回true;否则返回false

parkAndCheckInterrupt:当前线程会进入等待状态,直到获取到共享锁才继续运行。


非公平锁


对于非公平锁就简单多了,她没有那些所谓的要判断是不是 CLH 队列的列头,如下:

 
  1. final int nonfairTryAcquireShared(int acquires) {
  2. for (;;) {
  3. int available = getState();
  4. int remaining = available - acquires;
  5. if (remaining < 0 || compareAndSetState(available, remaining))
  6. return remaining;
  7. }
  8. }
在非公平锁中, tryAcquireShared 直接调用 AQS nonfairTryAcquireShared() 。通过上面的代码我可看到非公平锁并没有通过 if (hasQueuedPredecessors()) 这样的条件来判断该节点是否为 CLH 队列的头节点,而是直接判断信号量。


Java并发编程之Semaphore


信号量Semaphore的释放和获取不同,它没有分公平锁和非公平锁。如下:

 
  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. //尝试释放共享锁
  6. if (tryReleaseShared(arg)) {
  7. doReleaseShared();
  8. return true;
  9. }
  10. return false;
  11. }

release()释放线索所占有的共享锁,它首先通过tryReleaseShared尝试释放共享锁,如果成功直接返回,如果失败则调用doReleaseShared来释放共享锁。

tryReleaseShared

 
  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. int current = getState();
  4. //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
  5. int next = current + releases;
  6. if (next < current) // overflow
  7. throw new Error("Maximum permit count exceeded");
  8. //设置可获取的信号许可数为next
  9. if (compareAndSetState(current, next))
  10. return true;
  11. }
  12. }
doReleaseShared

 
  1. private void doReleaseShared() {
  2. for (;;) {
  3. //node 头节点
  4. Node h = head;
  5. //h != null,且h != 尾节点
  6. if (h != null && h != tail) {
  7. //获取h节点对应线程的状态
  8. int ws = h.waitStatus;
  9. //若h节点状态为SIGNAL,表示h节点的下一个节点需要被唤醒
  10. if (ws == Node.SIGNAL) {
  11. //设置h节点状态
  12. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  13. continue;
  14. //唤醒h节点对应的下一个节点
  15. unparkSuccessor(h);
  16. }
  17. //若h节点对应的状态== 0 ,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态
  18. else if (ws == 0 && compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  19. continue;
  20. }
  21. //h == head时,则退出循环,若h节点发生改变时则循环继续
  22. if (h == head)
  23. break;
  24. }
  25. }


Java并发编程之Semaphore


假如现在有10个人去售票厅买票,但是窗口只有2个,那么同时能够买票的只能有2个人,当2个人中任意一个人买好票离开之后,

等待的8个人中又会有一个人可以占用窗口买票。

 
  1. public class SemaphoreTest {
  2.  
  3. public static void main(String[] args) {
  4. /** 定义窗口个数 **/
  5. Semaphore semaphore = new Semaphore(2);
  6.  
  7. ExecutorService executor = Executors.newCachedThreadPool();
  8. /** 模拟10个用户 **/
  9. for (int i = 1; i <= 10; i++) {
  10. final int offset = i;
  11. executor.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. try {
  15. /** 获取信号量许可 **/
  16. semaphore.acquire();
  17. System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】进入窗口,准备买票");
  18. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
  19. System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】买票完成,即将离开");
  20. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
  21. System.out.println(Thread.currentThread().getName() + "用户【" + offset+ "】离开售票窗口-->还有"+ semaphore.getQueueLength() +"个人在等待排队");
  22. semaphore.release();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27.  
  28. });
  29. }
  30. executor.shutdown();
  31. }
  32. }
 
  1. pool-1-thread-1用户【1】进入窗口,准备买票
  2. pool-1-thread-2用户【2】进入窗口,准备买票
  3. pool-1-thread-2用户【2】买票完成,即将离开
  4. pool-1-thread-1用户【1】买票完成,即将离开
  5. pool-1-thread-1用户【1】离开售票窗口-->还有8个人在等待排队
  6. pool-1-thread-3用户【3】进入窗口,准备买票
  7. pool-1-thread-2用户【2】离开售票窗口-->还有7个人在等待排队
  8. pool-1-thread-4用户【4】进入窗口,准备买票
  9. pool-1-thread-3用户【3】买票完成,即将离开
  10. pool-1-thread-3用户【3】离开售票窗口-->还有6个人在等待排队
  11. pool-1-thread-5用户【5】进入窗口,准备买票
  12. pool-1-thread-4用户【4】买票完成,即将离开
  13. pool-1-thread-5用户【5】买票完成,即将离开
  14. pool-1-thread-4用户【4】离开售票窗口-->还有5个人在等待排队
  15. pool-1-thread-6用户【6】进入窗口,准备买票
  16. pool-1-thread-6用户【6】买票完成,即将离开
  17. pool-1-thread-5用户【5】离开售票窗口-->还有4个人在等待排队
  18. pool-1-thread-7用户【7】进入窗口,准备买票
  19. pool-1-thread-6用户【6】离开售票窗口-->还有3个人在等待排队
  20. pool-1-thread-8用户【8】进入窗口,准备买票
  21. pool-1-thread-8用户【8】买票完成,即将离开
  22. pool-1-thread-8用户【8】离开售票窗口-->还有2个人在等待排队
  23. pool-1-thread-9用户【9】进入窗口,准备买票
  24. pool-1-thread-9用户【9】买票完成,即将离开
  25. pool-1-thread-7用户【7】买票完成,即将离开
  26. pool-1-thread-9用户【9】离开售票窗口-->还有1个人在等待排队
  27. pool-1-thread-10用户【10】进入窗口,准备买票
  28. pool-1-thread-7用户【7】离开售票窗口-->还有0个人在等待排队
  29. pool-1-thread-10用户【10】买票完成,即将离开
  30. pool-1-thread-10用户【10】离开售票窗口-->还有0个人在等待排队

我们最终可以看到在其中两个客户购票未完全结束,第三个客户(线程)是不会被允许进入购票的。


Java并发编程之Semaphore


  • 在上述的示例中,如果 fair传的是true

则各个线程公平竞争,即按照等待时间的长短决定谁先获取许可。

以10 个线程竞争 2 个许可为例,执行结果如下,首选是线程 1、2 获取了许可,随后线程 3、4获取了许可,最后是线程5、6获取许

可,顺序基本上与创建线程并启动的先后顺序一致,也与各个线程等待的时间基本相符(如上述的打印结果)。


  • 在上述的示例中,如果 fair传的是false

则各个线程非公平竞争,随机选取一个线程获取许可。

以 10个线程竞争 2 个许可为例,执行结果如下,首先是线程 1、2 获取了许可,随后线程 6、7 获取了许可,

最后是线程 10、3 获取许可,与线程创建启动时间无关,也与线程等待时间无关。

 
  1. pool-1-thread-2用户【2】进入窗口,准备买票
  2. pool-1-thread-1用户【1】进入窗口,准备买票
  3. pool-1-thread-2用户【2】买票完成,即将离开
  4. pool-1-thread-1用户【1】买票完成,即将离开
  5. pool-1-thread-2用户【2】离开售票窗口-->还有8个人在等待排队
  6. pool-1-thread-6用户【6】进入窗口,准备买票
  7. pool-1-thread-1用户【1】离开售票窗口-->还有7个人在等待排队
  8. pool-1-thread-7用户【7】进入窗口,准备买票
  9. pool-1-thread-6用户【6】买票完成,即将离开
  10. pool-1-thread-7用户【7】买票完成,即将离开
  11. pool-1-thread-6用户【6】离开售票窗口-->还有6个人在等待排队
  12. pool-1-thread-9用户【9】进入窗口,准备买票
  13. pool-1-thread-9用户【9】买票完成,即将离开
  14. pool-1-thread-7用户【7】离开售票窗口-->还有5个人在等待排队
  15. pool-1-thread-4用户【4】进入窗口,准备买票
  16. pool-1-thread-9用户【9】离开售票窗口-->还有4个人在等待排队
  17. pool-1-thread-5用户【5】进入窗口,准备买票
  18. pool-1-thread-4用户【4】买票完成,即将离开
  19. pool-1-thread-4用户【4】离开售票窗口-->还有3个人在等待排队
  20. pool-1-thread-8用户【8】进入窗口,准备买票
  21. pool-1-thread-5用户【5】买票完成,即将离开
  22. pool-1-thread-8用户【8】买票完成,即将离开
  23. pool-1-thread-8用户【8】离开售票窗口-->还有2个人在等待排队
  24. pool-1-thread-3用户【3】进入窗口,准备买票
  25. pool-1-thread-5用户【5】离开售票窗口-->还有1个人在等待排队
  26. pool-1-thread-10用户【10】进入窗口,准备买票
  27. pool-1-thread-3用户【3】买票完成,即将离开
  28. pool-1-thread-10用户【10】买票完成,即将离开
  29. pool-1-thread-10用户【10】离开售票窗口-->还有0个人在等待排队
  30. pool-1-thread-3用户【3】离开售票窗口-->还有0个人在等待排队