信号量Semaphore初探

时间:2022-02-13 15:13:49

         信号量Semaphorejava.util.concurrent包下一个常用的同步工具类,它维护了一个许可集,可以理解成资源数,可以通过acquire操作来获取一个资源,并通过release来释放一个资源,但需要注意的是,release来释放资源前不一定要先通过acquire来获取一个资源,如果不断的release而不进行acquire将导致资源数虚增,所以一定得我们自己来保证使用的正确性。

        我们经常用信号量来管理可重复使用的资源,比如数据库连接、线程等,因为这些资源都有着可预估的上限,所以我们在初始化Semaphore时设定的许可数和我们需要管理的资源数一致,获取一个资源时就通过acquire来获取一个许可,如果没有可用资源,则acquire将阻塞,释放一个资源时,通过release来释放一个许可。用Semaphore来限制操作的并发访问程度也类似。

       但有一点需要注意,Semaphore没有直接提供更新许可总数的方法,虽然你可以单独通过acquirerelease来特意的减少或增加许可的总量,但这样做会让人感到奇怪。那么为什么Semaphore没有单独同重设信号量数量的方法呢?直接把AQSsetState方法暴露出来不就行的吗?因为setState操作如果发生在在某些使用该Semaphore的线程还没有走完整个信号量的获取和释放的流程时,将会直接导致state值的不准确。现在说可能让人不太理解,不用担心,看完本文,你就懂了。有人会想到当需要修改许可总数时,我再重新new一个Semaphore出来不就行了?比如像下面:

private volatileSemaphore jdbcConnection= new Semaphore(10);

public voidresetJdbcConnection(intjdbcConnectionAmount) {

    jdbcConnection= new Semaphore(jdbcConnectionAmount);

}

这里通过提供resetJdbcConnection方法来让外部可以修改jdbcConnection的许可数目,注意,这里的jdbcConnection必须是volatile这样暴力修改是有风险的,因为你在修改时jdbcConnection很可能正在被使用,比如它进行了jdbcConnection.acquire()操作后,你把jdbcConnection给更换成另一个新的Semaphore,所以在你使用jdbcConnection.release()来释放一个许可时,是在新的Semaphore进行release操作,所以新的Semaphore的许可数量被莫名的+1了。有一种解决办法是用局部变量记录下操作acquire时的Semaphore,并在进行release时使用该局部变量来进行release,这样保证了acquirerelease是在同一个Semaphore上操作,这种方法简单有效,适合绝大多数场景。之所以说Semaphore没有直接提供更新许可总数的方法,是因为Semaphore确有一个永久减少许可总数的方法,即:

protected voidreducePermits(int reduction) {

    if(reduction < 0)throw new IllegalArgumentException();

    sync.reducePermits(reduction);

}

很显然,它需要子类来继承使用,别妄想将reduction传入负数来使许可数增加,因为sync.reducePermits(reduction);中对reduction的值做了限制:

final voidreducePermits(int reductions) {

    for(;;) {

        int current = getState();

        int next = current - reductions;

        if (next > current) // underflow

           throw new Error("Permit count underflow");

        if (compareAndSetState(current, next))

            return;

    }

}

有人会好奇,为啥Semaphore提供了减少许可数的入口,但未提供增加许可数的入口?这个我暂时还未找到原因。

 

    现在我们来分析Semaphore的内部实现,像绝大多数并发工具类一样,Semaphore也依赖于AQSAbstractQueuedSynchronizer),关于AQS的简单分析,可以参考我的另一篇博文:http://manzhizhen.iteye.com/blog/2305890。你会很自然的想到,初始化时许可的总数就是保存在AQSstate属性中的:

Sync(intpermits) {

    setState(permits);

}

 

final intgetPermits() {

    returngetState();

}

SyncSemaphore中实现的AQS的内部类,我们现在来看下Sync中关于非公平获取信号量和释放信号量的两个方法的实现:

final intnonfairTryAcquireShared(intacquires) {

    for(;;) {

        int available = getState();

        int remaining = available - acquires;

        if (remaining < 0||

            compareAndSetState(available, remaining))

            return remaining;

    }

}

 

protected final booleantryReleaseShared(intreleases) {

    for(;;) {

        int current = getState();

        int next = current + releases;

        if (next < current) // overflow

           throw new Error("Maximum permit count exceeded");

        if (compareAndSetState(current, next))

            return true;

    }

}

nonfairTryAcquireShared方法中可以看出,当需要获取acquires数量的许可时,先看可用的许可够不够,如果不够(remaining < 0),则直接返回还差的许可数的负数值,如果够,则从可用许可数中减去acquires,并返回剩余可用的许可数。由于nonfairTryAcquireShared是非阻塞的,所以它直接在Semaphore中的tryAcquire非阻塞方法中使用:

public booleantryAcquire() {

    returnsync.nonfairTryAcquireShared(1) >=0;

}

public booleantryAcquire(int permits) {

    if(permits < 0) throw new IllegalArgumentException();

    returnsync.nonfairTryAcquireShared(permits) >=0;

}

 

可见,对于tryAcquire的非阻塞方法,Semaphore的公平模式和非公平模式下的实现都是一样的,它在能获取到足够许可时不需要进入队列而是直接拿到走人,不能获取足够许可时就直接返回,是非公平的!注意,对于tryAcquire的阻塞方法还是有公平和非公平之分的。tryReleaseShared是用来释放许可数,我们可以看内部实现也很简单,就是不断的进行CAS操作指导成功将releases数目的许可数加回到当前可用许可数中。前面已经说过,Seamphore无法去校验你获取许可和释放许可是否一一对应,因为获取和释放都是直接在AQSstate上操作的,所以操作一段时间后,连AQS自己都忘记最初的state值是啥了,所以当然无法在中途来校验获取和释放是否正确,即使知道state的初始值,也很难在交错的获取和释放许可的操作过程中做一致性检查。

        看过SemaphoreAPI文档都知道Semaphore中由公平(fair)和非公平(nonfair)两种模式,这两种模式在Semaphore创建时确定,中途不能修改,默认是非公平的。Semaphore的构造函数如下:

publicSemaphore(intpermits, boolean fair) {

    sync= fair ? new FairSync(permits) : new NonfairSync(permits);

}

从名字就可以看出,FairSync代表公平的Sync实现,而NonfairSync代表非公平的Sync实现,从名字也看出内部类FairSyncNonfairSync都继承自上面提到的抽象类Sync那么公平和非公平的含义到底是什么呢?看过我讲述AQS博文的朋友应该知道,我说过,只要线程由于获取资源数失败而进入队列中后,就一定得等前面的节点获取完锁才有机会尝试获取锁,也就是说在AQS队列中的线程绝对是公平的,因为队列本来就是先进先出,即先到先得。但如果你想获取的资源数现在就有,那么即使现在队列中有线程排队在等,你也可以不用进入队列而直接拿到你想要的资源,这就是非公平!所以,你很自然的想到,公平的做法就是不管此时有没有可用的许可,只要队列中有线程在等,你就得给我乖乖去排队,没错,公平就是这样的!我们先看非公平的NonfairSync实现:

static final classNonfairSync extendsSync {

    private static final longserialVersionUID = -2694183684443567898L;

 

    NonfairSync(intpermits) {

        super(permits);

    }

 

    protected inttryAcquireShared(intacquires) {

        return nonfairTryAcquireShared(acquires);

    }

}

tryAcquireShared方法调用的nonfairTryAcquireShared是在父类Sync中实现的,前面已经给出了代码实现,这里就不在多说了。我们再看看公平的FairSync实现:

static final classFairSync extends Sync {

    private static final longserialVersionUID =2014338818796000944L;

 

    FairSync(intpermits) {

        super(permits);

    }

 

    protected inttryAcquireShared(intacquires) {

        for (;;) {

            if (hasQueuedPredecessors())

               return -1;

            int available = getState();

            int remaining = available - acquires;

            if (remaining < 0||

                compareAndSetState(available, remaining))

               return remaining;

        }

    }

}

FairSync的获取许可数方法tryAcquireSharednonfairTryAcquireShared比起来唯一的特别之处就是hasQueuedPredecessors方法的调用,而hasQueuedPredecessorsAQS中实现的:

public final booleanhasQueuedPredecessors() {

    // The correctness of this depends on head being initialized

    // before tail and on head.next being accurate if the current

    // thread is first in queue.

   Node t = tail;// Read fields in reverse initialization order

   Node h = head;

    Node s;

    returnh != t &&

        ((s = h.next) ==null || s.thread!= Thread.currentThread());

}

从方法名和实现可以看出,hasQueuedPredecessors主要用来判断AQS队列中是否还有等待的线程节点,如果有,并且不是当前的线程节点,则返回true,所以,对于tryAcquireShared来说,此时都不进行资源获取的尝试就直接返回-1表明资源获取失败了。

 

现在许可数、公平和非公平都介绍完了,我们来看看获取许可数的相关方法。acquire将会在获取许可之前一直阻塞,或者被中断,我们看其内部实现:

public voidacquire() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

}

 

public voidacquire(int permits) throws InterruptedException {

    if(permits < 0) throw new IllegalArgumentException();

    sync.acquireSharedInterruptibly(permits);

}

 可以看出,它直接调用了AQS中的acquireSharedInterruptibly的实现,acquireSharedInterruptibly的实现在我的AQS博文中已经分析过,这里不再阐述。我们再来看看获取信号的限时阻塞方法tryAcquire的实现:

public booleantryAcquire(long timeout, TimeUnit unit)

    throwsInterruptedException {

    returnsync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

public booleantryAcquire(intpermits, long timeout, TimeUnit unit)

    throwsInterruptedException {

    if(permits < 0) throw new IllegalArgumentException();

    returnsync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

}

可见,Semaphore也没对其有特别的实现,而是直接调用AQS中的tryAcquireSharedNanos方法,tryAcquireSharedNanos在我的AQS博文中已经做过简单的介绍,这里不再阐述。

 

       可以看出,只要能弄懂AQS,你就可以明白Semaphore的核心实现!