信号量为多线程协作提供了更强大的控制方法.广义上说,信号量是对锁的扩展.无论是内部锁synchronized还是重入锁ReentranLock,一次都只允许一个线程访问同一个资源,而信号量却可以指定多个线程,同时访问某一个资源.
概念上,一个信号量维持着大量的许可. 每个acquire请求都会阻塞直到一个permit可用为止,并且然后拥有这个许可. 每个release请求都可能会释放一个permit.
但是,没有实际的permit对象是被用的,Semaphore仅仅是保留可用数量的计数,并相应地执行
Semaphores常用来去限制能访问某些资源的线程的数量.
每个线程在获得一个item之前都必须从semaphore中获得一个permit,确保一个item是可用的.
当这个线程完成了,它返回到线程池并且permit也返回给semaphore,然后允许另外一个线程去获取item.
注意:
当调用acquire请求时候不会有同步锁,因为这将阻值某个item返回到池中.
semaphore封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。
semaphore初始值为1,使用这个特性,当它至多有一个permit的时候,可以用作互斥锁.
这通常被称为二进制信号量,因为它只有两个状态:0个或者1个permit是可用的.
以这种方式使用的时候,二进制的semaphore拥有这个属性(与很多Lock类实现不同),它的锁可以被所有者以外的线程释放掉.(因为semaphores没有所有权概念).
这在某些专门的上下文中是有用的,例如死锁恢复
此类的构造函数可选择接受公平参数,当设置为false时,此类不会保证线程获取许可的顺序。
尤其是插队是被允许的.也就是说,调用{@link #acquire}的线程可以在一直等待的线程之前被分配一个许可证 - 逻辑上,新线程将自己置于等待线程队列的头部
当公平设置为真时,信号量保证调用线程调用任何{@link #acquire()}方法被选中以按照调用这些方法的顺序获得许可证
注意,FIFO排序必须适用于这些方法中的特定内部执行点。
因此,一个线程可以在另一个线程之前调用#acquire(),但是在另一个线程之后到达排序点,并且类似地从方法返回。
同时也注意:未定义的tryAcquire()方法不符合公平性设置,但是将获得任意一个可用的permit
通常,用于控制资源访问的semaphores应该初始化为公平的,以确保线程没有被访问资源(饥饿的)
当使用信号量进行其他类型的同步控制时,非公平的性能优势高于公平性的
此类还提供了一次acquire和release多个许可证的便利方法
当这些方法没有设置将公平性设置为true时候,当心增加不定期延时的风险.(上面讲到的插队的问题).
下面看下源码的实现:AQS可以去看看之前的部分
Semaphore同步实现部分
/** * 信号量的同步实现。 使用AQS状态来表示permit。 分为公平和非正式版本。 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * NonFair version */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
/** * 非公平的 * @param 允许可用的初始许可证。该值可能为负数,在这种情况下,必须在任何获得授予之前释放 */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * 第二个参数可以指定是否公平 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
常用方法:
/** * 获取一个准入的许可.如果无法获取,线程会等待,直到有线程释放一个许可或者当前线程被中断 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 与上面方法类似,但是不响应中断 */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * 尝试获取一个许可,如果成功返回true,失败返回false. * 不会进行等待,立即返回 */ public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } /** * 添加时间限定 */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 线程访问结束以后,释放一个许可,返回给信号量 * 以便于其他的线程可以进行资源访问 */ public void release() { sync.releaseShared(1); }批量的获取与释放与上面类似...
其他方法:
/** * 返回此信号量中当前可用的许可数。 * 该方法通常用于调试和测试. */ public int availablePermits() { return sync.getPermits(); } /** * 获取并返回所有可立即获得的许可证。 */ public int drainPermits() { return sync.drainPermits(); } /** * 缩小可用许可证的数量. * 该方法在使用信号量来跟踪变得不可用的资源的子类中是有用的 * 该方法与{@code acquire}不同之处在于它不阻止等待许可证变为可用。 */ protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } /** * 是否公平 */ public boolean isFair() { return sync instanceof FairSync; } /** * 查询是否有线程等待获取. * 注意: * 因为取消可能会在任何时候发生,所以{@code true}返回并不保证任何其他线程都将获得. * 该方法主要用于监视系统状态 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 返回等待获取的线程数的估计值 */ public final int getQueueLength() { return sync.getQueueLength(); }
public class SemaphoreDemo implements Runnable{ //给信号量初始化5个permit.也就是说可以有同时五个线程进入try代码块 final Semaphore semp = new Semaphore(5); @Override public void run() { try { semp.acquire(); //模拟耗时操作 Thread.sleep(2000); System.out.println(Thread.currentThread().getId()+":done!"); semp.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { //开启20个线程测试.最后输出为5个一组 ExecutorService exec = Executors.newFixedThreadPool(20); final SemaphoreDemo demo=new SemaphoreDemo(); for(int i=0;i<20;i++){ exec.submit(demo); } } }
源码来自JDK1.8,注释来自自己翻译,不足自出希望指正