实战Java高并发程序设计之Semaphoer

时间:2022-12-03 17:57:09

信号量为多线程协作提供了更强大的控制方法.广义上说,信号量是对锁的扩展.无论是内部锁synchronized还是重入锁ReentranLock,一次都只允许一个线程访问同一个资源,而信号量却可以指定多个线程,同时访问某一个资源.

 概念上,一个信号量维持着大量的许可. 每个acquire请求都会阻塞直到一个permit可用为止,并且然后拥有这个许可. 每个release请求都可能会释放一个permit.
 但是,没有实际的permit对象是被用的,Semaphore仅仅是保留可用数量的计数,并相应地执行
 Semaphores常用来去限制能访问某些资源的线程的数量.

 每个线程在获得一个item之前都必须从semaphore中获得一个permit,确保一个item是可用的.
 当这个线程完成了,它返回到线程池并且permit也返回给semaphore,然后允许另外一个线程去获取item.
  注意:
  当调用acquire请求时候不会有同步锁,因为这将阻值某个item返回到池中.
  semaphore封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。

 semaphore初始值为1,使用这个特性,当它至多有一个permit的时候,可以用作互斥锁.
 这通常被称为二进制信号量,因为它只有两个状态:0个或者1个permit是可用的.

以这种方式使用的时候,二进制的semaphore拥有这个属性(与很多Lock类实现不同),它的锁可以被所有者以外的线程释放掉.(因为semaphores没有所有权概念).

 这在某些专门的上下文中是有用的,例如死锁恢复


此类的构造函数可选择接受公平参数,当设置为false时,此类不会保证线程获取许可的顺序。
 尤其是插队是被允许的.也就是说,调用{@link #acquire}的线程可以在一直等待的线程之前被分配一个许可证 - 逻辑上,新线程将自己置于等待线程队列的头部
 当公平设置为真时,信号量保证调用线程调用任何{@link #acquire()}方法被选中以按照调用这些方法的顺序获得许可证
 注意,FIFO排序必须适用于这些方法中的特定内部执行点。
 因此,一个线程可以在另一个线程之前调用#acquire(),但是在另一个线程之后到达排序点,并且类似地从方法返回。
 同时也注意:未定义的tryAcquire()方法不符合公平性设置,但是将获得任意一个可用的permit


通常,用于控制资源访问的semaphores应该初始化为公平的,以确保线程没有被访问资源(饥饿的)
 当使用信号量进行其他类型的同步控制时,非公平的性能优势高于公平性的


此类还提供了一次acquire和release多个许可证的便利方法
 当这些方法没有设置将公平性设置为true时候,当心增加不定期延时的风险.(上面讲到的插队的问题).


下面看下源码的实现:AQS可以去看看之前的部分

Semaphore同步实现部分

    /**
     * 信号量的同步实现。 使用AQS状态来表示permit。 分为公平和非正式版本。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }


构造器:

    /**
     * 非公平的
     * @param 允许可用的初始许可证。该值可能为负数,在这种情况下,必须在任何获得授予之前释放
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 第二个参数可以指定是否公平
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

常用方法:

    /**
     * 获取一个准入的许可.如果无法获取,线程会等待,直到有线程释放一个许可或者当前线程被中断
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 与上面方法类似,但是不响应中断
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 尝试获取一个许可,如果成功返回true,失败返回false.
     * 不会进行等待,立即返回
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 添加时间限定
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 线程访问结束以后,释放一个许可,返回给信号量
     * 以便于其他的线程可以进行资源访问
     */
    public void release() {
        sync.releaseShared(1);
    }

批量的获取与释放与上面类似...

其他方法:

    /**
     * 返回此信号量中当前可用的许可数。
     * 该方法通常用于调试和测试.
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 获取并返回所有可立即获得的许可证。
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 缩小可用许可证的数量. 
     * 该方法在使用信号量来跟踪变得不可用的资源的子类中是有用的
     * 该方法与{@code acquire}不同之处在于它不阻止等待许可证变为可用。
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 是否公平
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 查询是否有线程等待获取.
     * 注意:
     * 因为取消可能会在任何时候发生,所以{@code true}返回并不保证任何其他线程都将获得.
     * 该方法主要用于监视系统状态
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 返回等待获取的线程数的估计值
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }


最后附上书中的一个demo,更方便体会:

public class SemaphoreDemo implements Runnable{
	//给信号量初始化5个permit.也就是说可以有同时五个线程进入try代码块
    final Semaphore semp = new Semaphore(5);
    @Override
    public void run() {
        try {
            semp.acquire();
            //模拟耗时操作
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId()+":done!");
            semp.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
    	//开启20个线程测试.最后输出为5个一组
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreDemo demo=new SemaphoreDemo();
        for(int i=0;i<20;i++){
            exec.submit(demo);
        }
    }
}


源码来自JDK1.8,注释来自自己翻译,不足自出希望指正