Semaphore(信号量)源码分析

时间:2022-09-29 20:40:08

1. Semaphore

Semaphore和ReentrantReadWriteLock.ReadLock(读锁)都采用AbstractOwnableSynchronizer共享排队的方式实现。

关于AbstractQueuedSynchronizer中的独占锁和共享锁,请参考ReentrantLock(http://www.cnblogs.com/bjorney/p/8040085.html)和ReentrantReadWriteLock(http://www.cnblogs.com/bjorney/p/8064268.html)

问题:Semaphore在acquire时不检查传入的参数是否超过state最大值??? 例如,new Semaphore(5, ture || false) -> acquire(10),则调用acquire的线程将被阻塞
           1)在公平模式下,之后所有调用acquire的线程都将在SyncQueue中排队而永远阻塞???
           2)在非公平模式下,之后所有acquire失败而参与排队的线程都将永远阻塞???

public class Semaphore implements java.io.Serializable {
    private final Sync sync;

    public Semaphore(int permits) {
        sync = new NonfairSync(permits); // 公平竞争,sync的锁状态(锁计数)state = permits
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits); // 公平竞争 || 非公平竞争
    }

    public void acquire() throws InterruptedException; // acquire(1) public void acquire(int permits) throws InterruptedException { // permits必须>=0
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    public void acquireUninterruptibly(); // acquireUninterruptibly(1) public void acquireUninterruptibly(int permits) { // permits必须>=0
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    public boolean tryAcquire(); // tryAcquire(1) public boolean tryAcquire(int permits) { // permits必须>=0
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    public boolean tryAcquire(long timeout, TimeUnit unit); // tryAcquire(1, timeout, unit) public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { // permits必须>=0
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    public void release(); // release(1) public void release(int permits) { // permits必须>=0
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    ... ...
}

2. Semaphore.Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }

    final int nonfairTryAcquireShared(int acquires) { // 尝试非公平取锁
        for (;;) {
            // CAS(state)失败将回到此处
            int available = getState();                                                      /*记录state*/
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining)) //remaining >= 0时/*CAS设置state -= acquires*/
                return remaining; // remaining < 0:SyncQueue中排队
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // CAS(state)失败将回到此处
            int current = getState();               /*记录state*/
            int next = current + releases;
            if (next < current) throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next)) /*CAS设置state += releases*/
                return true;
        }
    }

    ... ...
}

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) { // 尝试非公平取锁
        return nonfairTryAcquireShared(acquires);
    }
}

static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) { // 尝试公平取锁
        for (;;) {
            // CAS(state)失败将回到此处
            if (hasQueuedPredecessors()) // SyncQueue不为空 && SyncQueue中下个待唤醒节点非当前线程所在节点
                return -1; // 
            int available = getState();                                                      /*记录state*/
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining)) //remaining >= 0时/*CAS设置state -= acquires*/
                return remaining;
        }
    }
}