并发编程之BlockingQueue(二)

时间:2022-09-01 18:37:01

前言:

咱们扒光系列的第一篇文章已经对concurrent包中的三层结构的第一层和第二层做了解析,并且对第三层中的lock做了详细的代码分析,本篇博文将针对BlockingQueue这个后续会在执行器里使用的基本数据结构做源码分析,为后续的Executor源码分析做准备。

我们先来看Doug Lea的定义,BlockingQueue是一个队列,该队列支持两种特殊操作即队列为空的时候获取元素的线程需要等待,队列为满的时候加入元素的线程需要等待。这是典型的生产者消费者模式的应用。

顶层接口设定了3种方法:

BlockingQueue三种方法

由于put()/take()方法是在并发中会发生阻塞,因此我们着重研究这两种方法。

BlockingQueue知名家族成员:

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,我们以这三个类为基础对put()、take()方法进行源码解析。

1:ArrayBlockingQueue:
我们先看该类的几个重要成员变量

final Object[] items; //基于数组

int takeIndex;        //下一个获取元素的位置

int putIndex;        //下一个放置元素的位置

int count;            //队列中的元素个数

final ReentrantLock lock;    //锁,用于put和take的时候线程独占

private final Condition notEmpty;    //Condition等待队列,在该队列排队的线程都在等待队列中增加元素

private final Condition notFull;        //Condition等待队列,在该队列中排队的线程都在等待队列元素消耗

生产者消费者模式主要体现在notEmpty和notFull两个队列,前者是消费者队列,后者是生产者队列,就像餐厅的服务员(消费者)和厨师(生产者)在窗口,前者等待窗口里有菜,后者等待窗口有放菜的位置。

生产者方法put:

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;          //获取锁,生产者消费者用的同一把锁,消费者消费的时候你不能生产

    lock.lockInterruptibly();                               //关锁,当前生产者独占

    try {

        while (count == items.length)                //当队列已满的时候让当前生产者在生产者队列等待槽位

            notFull.await();

        enqueue(e);                                           //当队列未满的时候往队列里添加元素,此时是线程独占哦

    } finally {

        lock.unlock();

    }

}

private void enqueue(E x) {

    final Object[] items = this.items;        

    items[putIndex] = x;                                //在可以放置元素的数组槽位放置x元素

    if (++putIndex == items.length)               //对放置元素位置加一操作

        putIndex = 0;                                       //如果没地方放置元素了就把放置元素位置设置成0

    count++;                                                  //元素个数+1

    notEmpty.signal();                                    //通知消费者队列现在有东西可以消费了

}

上面方法可以总结成,厨房里有多个厨师,当一个厨师把菜做好之后就会独占窗口把菜放到窗口里,此时其他厨师需要排队等待这个厨师放完,而这个时候如果窗口已经摆满菜了,那么这个厨师就会在窗口的生产者队列中等待服务员把菜端走窗口有空余位置。而如果在厨师占领窗口这个过程中有一队程序员在消费者队列里等待的话就厨师把菜放到窗口里后就会通知这个消费者队列取菜。

消费者方法take:

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;      //这里的锁和生产者的锁是同一把锁,所以在生产者生产的时候你不能去消费

    lock.lockInterruptibly();

    try {

        while (count == 0)                            //如果队列为空,就等待队列变成非空

            notEmpty.await();                        //等待队列变为非空

        return dequeue();                            //如果队列有元素,那么就调用取元素的方法

    } finally {

        lock.unlock();

    }

}

private E dequeue() {

    final Object[] items = this.items;

    E x = (E) items[takeIndex];                           //获取元素与放入元素过程恰好相反

    items[takeIndex] = null;

    if (++takeIndex == items.length)

        takeIndex = 0;

    count--;

    if (itrs != null)

        itrs.elementDequeued();

    notFull.signal();                              //通知生产者队列,这里没有元素了,你们需要生产了

    return x;

}

OK!代码分析到这我们看ArrayBlockingQueue底层用的就是我们上一篇分析的Condition和Lock,道理非常简单,但是大家有没有发现阻塞队列不能同时生产和消费?这也是BlockingQueue没有广泛应用于第三方框架的原因吧!

2:LinkedBlockingQueue:
该类的几个成员变量:

private final int capacity; //容量

AtomicInteger count = new AtomicInteger(); //队列元素个数

private transient Node last; //队尾指针

private final ReentrantLock takeLock = new ReentrantLock(); //take和poll等方法调用的时候所占用的锁

private final Condition notEmpty = takeLock.newCondition(); //消费者等待队列,等待队列变为非空

private final ReentrantLock putLock = new ReentrantLock(); //put,offer等方法调用的时候所占用的锁

private final Condition notFull = putLock.newCondition(); //生产者等待队列

生产者put方法:

public void put(E e) throws InterruptedException {

    if (e == null)

        throw new NullPointerException(); 

    int c = -1;

    Node node = new Node(e);                            //以要放入的元素为基准新建一个节点

    final ReentrantLock putLock = this.putLock;   //获取生产者锁

    final AtomicInteger count = this.count;            

    putLock.lockInterruptibly();                                //当前生产者独占一下代码

    try {

        while (count.get() == capacity) {                    //一样的套路,如果当前队列满了就在生产者的Condition队列里等待被signal

            notFull.await();

        }

        enqueue(node);                                              //没满的话直接入队,这里不需要cas操作,因为只有当前线程调用该方法

        c = count.getAndIncrement();                        //队列元素个数+1

        if (c + 1 < capacity)                                        //如果队列放入元素之后还没满那么通知其他生产者线程这里可以生产

            notFull.signal();

    } finally {

        putLock.unlock();                                           

    }

    if (c == 0)

        signalNotEmpty();                                            //最后c==0证明队列里有元素,那么通知消费者过来消费

}

消费者take方法:

public E take() throws InterruptedException {

    E x;

    int c = -1;

    final AtomicInteger count = this.count;

    final ReentrantLock takeLock = this.takeLock;        

    takeLock.lockInterruptibly();                //独占消费者锁,即获取元素的时候只有一个线程

    try {

        while (count.get() == 0) {                    //如果队列为空,那么就在Condition的消费者等待队列里等待

            notEmpty.await();

        }

        x = dequeue();                                            //如果队列不为空,那么从容的去拿元素,不用怕别人跟你抢,因为就你一个人

        c = count.getAndDecrement();                

        if (c > 1)

            notEmpty.signal();                                   //如果队列中有两个以上元素那么通知消费者等待队列里的其他消费者过来拿

    } finally {

        takeLock.unlock();

    }

    if (c == capacity)                                            //如果c==capacity说明队列里有一个空位,这个时候通知生产者队列线程生产

        signalNotFull();

    return x;

}

LinkedBlockingQueue相比ArrayBlockingQueue的优势在于使用了两个锁,这样可以保证一个生产者和一个消费者同时工作,算是一种进步,但不能多个生产者和多个消费者同时工作。

3:PriorityBlockingQueue类:
该类的几个成员变量:

//默认容量为11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

//最大容量

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//用于存放元素的数组

private transient Object[] queue;

//实时大小

private transient int size;

//入队顺序规范

private transient Comparator comparator;

private final ReentrantLock lock;

private final Condition notEmpty;

private transient volatile int allocationSpinLock;

//优先队列

private PriorityQueue q;

两个比较重要的构造方法:

public PriorityBlockingQueue(int initialCapacity,

                            Comparator comparator) {

    if (initialCapacity < 1)

        throw new IllegalArgumentException();

    this.lock = new ReentrantLock();

    this.notEmpty = lock.newCondition();

    this.comparator = comparator;

    this.queue = new Object[initialCapacity];

}

这个方法我们可以自己实现一个comparator作为参数传进来,比如我们以学生对象的学号属性作为排序依据,那么入队之后队列会以学号的大小进行排序。如果不指定就会以入队顺序为顺序。

public PriorityBlockingQueue(Collection c) {

    this.lock = new ReentrantLock();

    this.notEmpty = lock.newCondition();

    boolean heapify = true; // true if not known to be in heap order

    boolean screen = true;  // true if must screen for nulls

    if (c instanceof SortedSet) {

        SortedSet ss = (SortedSet) c;

        this.comparator = (Comparator) ss.comparator();

        heapify = false;

    }

    else if (c instanceof PriorityBlockingQueue) {

        PriorityBlockingQueue pq =

            (PriorityBlockingQueue) c;

        this.comparator = (Comparator) pq.comparator();

        screen = false;

        if (pq.getClass() == PriorityBlockingQueue.class) // exact match

            heapify = false;

    }

    Object[] a = c.toArray();

    int n = a.length;

    // If c.toArray incorrectly doesn't return Object[], copy it.

    if (a.getClass() != Object[].class)

        a = Arrays.copyOf(a, n, Object[].class);

    if (screen && (n == 1 || this.comparator != null)) {

        for (int i = 0; i < n; ++i)

            if (a[i] == null)

                throw new NullPointerException();

    }

    this.queue = a;

    this.size = n;

    if (heapify)

        heapify();

}

该构造方法会接收一个容器类并按照容器类定义的顺序排序,如果没有顺序那么就按照自然顺序排序。

该类的put/offer(一样)方法:

public boolean offer(E e) {

    if (e == null)

        throw new NullPointerException();

    final ReentrantLock lock = this.lock;

    lock.lock();

    int n, cap;

    Object[] array;

    //如果内部数组容纳不下元素了就扩容

    while ((n = size) >= (cap = (array = queue).length))

        tryGrow(array, cap);

    try {

        Comparator cmp = comparator;

        if (cmp == null)

            //没有给comparator赋值情况下

            siftUpComparable(n, e, array);

        else

            //给comparator赋值的情况下

            siftUpUsingComparator(n, e, array, cmp);

        size = n + 1;

        notEmpty.signal();

    } finally {

        lock.unlock();

    }

    return true;

}

//加入队列算法

private static void siftUpUsingComparator(int k, T x, Object[] array,

                                  Comparator cmp) {

    while (k > 0) {

        int parent = (k - 1) >>> 1;

        Object e = array[parent];

        if (cmp.compare(x, (T) e) >= 0)

            break;

        array[k] = e;

        k = parent;

    }

    array[k] = x;

}

这个加入算法比较特别,本质上使用的是数组,但实际上这个数组维护的是一个堆(二叉排序),新加入的元素会被移动到合适的位置上(根据comparator)。

我们看下扩容方法:

private void tryGrow(Object[] array, int oldCap) {

    lock.unlock(); // must release and then re-acquire main lock

    Object[] newArray = null;

    if (allocationSpinLock == 0 &&

        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

                                0, 1)) {

        try {

            int newCap = oldCap + ((oldCap < 64) ?

                                  (oldCap + 2) : 

                                  (oldCap >> 1));

            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow

                int minCap = oldCap + 1;

                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

                    throw new OutOfMemoryError();

                newCap = MAX_ARRAY_SIZE;

            }

            if (newCap > oldCap && queue == array)

                newArray = new Object[newCap];

        } finally {

            allocationSpinLock = 0;

        }

    }

    if (newArray == null) // back off if another thread is allocating

        Thread.yield();

    lock.lock();

    if (newArray != null && queue == array) {

        queue = newArray;

        System.arraycopy(array, 0, newArray, 0, oldCap);

    }

}

扩容过程是这样的,当数组长度小于64的时候2个2个往上加,如果数组长度大于64,那么2倍2倍的往上加。而如果其他线程在添加元素(自己没有抢到自旋锁),那么线程礼让让其他线程扩容。最后内部新建了一个数组并对数组进行了复制。

该类的take()方法:

方法外层加锁,里层调用了出队方法。

private E dequeue() {

    int n = size - 1;

    if (n < 0)

        return null;

    else {

        Object[] array = queue;

        E result = (E) array[0];

        E x = (E) array[n];

        array[n] = null;

        Comparator cmp = comparator;

        if (cmp == null)

            siftDownComparable(0, x, array, n);

        else

            siftDownUsingComparator(0, x, array, n, cmp);

        size = n;

        return result;

    }

}

去的元素是堆顶的元素,取完之后把数组最后一个元素暂存,并调用下沉元素方法保持堆的平衡态(二叉树的顺序)。

4:DelayedQueue类:
该类内部根据延时时间排序,队头是离到期时间最近的。以领导-随从模式维护线程,领导在take和put之前要通知相应线程,这样可以保证不必要的线程执行。

成员变量:

private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueueq = new PriorityQueue();

private Thread leader = null;

//队头的线程马上到期可取的时候或者新的线程要称为领导的时候会通知该等待队列

private final Condition available = lock.newCondition();

put方法:

public boolean offer(E e) {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        q.offer(e);

        if (q.peek() == e) {

            leader = null;

            available.signal();

        }

        return true;

    } finally {

        lock.unlock();

    }

}

这个方法没啥好说的,加锁–根据e的自然大小入队–通知随从线程堆顶元素可取–释放锁

take方法:

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        for (;;) {

            E first = q.peek();

            if (first == null)

                available.await();

            else {

                //堆顶元素等待时间

                long delay = first.getDelay(NANOSECONDS);

                //如果堆顶元素刑满释放,那么取出

                if (delay <= 0)

                    return q.poll();

                first = null; // don't retain ref while waiting

                if (leader != null)

                    available.await();

                else {

                    Thread thisThread = Thread.currentThread();

                    leader = thisThread;

                    try {

                        available.awaitNanos(delay);

                    } finally {

                        if (leader == thisThread)

                            leader = null;

                    }

                }

            }

        }

    } finally {

        if (leader == null && q.peek() != null)

            available.signal();

        lock.unlock();

    }

}

这个方法虽然很长,但是很简单,由于元素类型E是Delayed类型或者其子类型,所以可以计算它的刑满释放时间,如果到时间了就可以取到了,如果没到时间就等待到期时常的时间。这个过程中只有一个领导者在操作,所以领导者不为空的条件下其他随从都会等待,在领导等待的时候它也是领导哦。这样生产者和消费者共用同一个锁就成为现实。

5:SynchronousQueue类:

该类源码比较复杂,本着知之为知之的原则这里不做讨论,只知道内部没有缓冲区间,生产者和消费者手对手完成生产和消费,该类用于创建不限大小的线程池。