阻塞队列—PriorityBlockingQueue源码分析

时间:2021-10-16 09:50:56

阻塞队列—PriorityBlockingQueue源码分析

 前言

阻塞队列—PriorityBlockingQueue源码分析

PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、*、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据

队列创建

小根堆

PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); 

大根堆

PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 

 @Override 

 public int compare(Integer o1, Integer o2) { 

  return o2 - o1; 

 } 

}); 

应用场景

有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

我们来看一个具体例子,例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。

package com.niuh.queue.priority; 

 

import java.util.ArrayList; 

import java.util.List; 

import java.util.Queue; 

import java.util.Random; 

import java.util.concurrent.ExecutorService; 

import java.util.concurrent.Executors; 

import java.util.concurrent.PriorityBlockingQueue; 

import java.util.concurrent.TimeUnit; 

 

/** 

 * <p> 

 * PriorityBlockingQueue使用示例 

 * </p> 

 */ 

public class PriorityBlockingQueueDemo { 

 

    public static void main(String[] args) throws Exception { 

        Random random = new Random(47); 

        ExecutorService exec = Executors.newCachedThreadPool(); 

        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); 

        exec.execute(new PrioritizedTaskProducer(queue, exec)); // 这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的 

        exec.execute(new PrioritizedTaskConsumer(queue)); // 步骤是同时进行的,因而输出结果并不一定是有序的 

    } 

 

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { 

    private Random random = new Random(47); 

    private static int counter = 0; 

    private final int id = counter++; 

    private final int priority; 

 

    protected static List<PrioritizedTask> sequence = new ArrayList<>(); 

 

    public PrioritizedTask(int priority) { 

        this.priority = priority; 

        sequence.add(this); 

    } 

 

    @Override 

    public int compareTo(PrioritizedTask o) { 

        return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);  // 定义优先级计算方式 

    } 

 

    @Override 

    public void run() { 

        try { 

            TimeUnit.MILLISECONDS.sleep(random.nextInt(250)); 

        } catch (InterruptedException e) { 

        } 

        System.out.println(this); 

    } 

 

    @Override 

    public String toString() { 

        return String.format("[%1$-3d]", priority) + " Task " + id; 

    } 

 

    public String summary() { 

        return "(" + id + ": " + priority + ")"

    } 

 

    public static class EndSentinel extends PrioritizedTask { 

        private ExecutorService exec

 

        public EndSentinel(ExecutorService exec) { 

            super(-1); 

            this.exec = exec

        } 

 

        @Override 

        public void run() { 

            int count = 0; 

            for (PrioritizedTask pt : sequence) { 

                System.out.print(pt.summary()); 

                if (++count % 5 == 0) { 

                    System.out.println(); 

                } 

            } 

            System.out.println(); 

            System.out.println(this + " Calling shutdownNow()"); 

            exec.shutdownNow(); 

        } 

    } 

 

class PrioritizedTaskProducer implements Runnable { 

    private Random random = new Random(47); 

    private Queue<Runnable> queue; 

    private ExecutorService exec

 

    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) { 

        this.queue = queue; 

        this.exec = exec

    } 

 

    @Override 

    public void run() { 

        for (int i = 0; i < 20; i++) { 

            queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加随机优先级的任务 

            Thread.yield(); 

        } 

        try { 

            for (int i = 0; i < 10; i++) { 

                TimeUnit.MILLISECONDS.sleep(250); 

                queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加优先级为10的任务 

            } 

            for (int i = 0; i < 10; i++) { 

                queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加优先级为1-10的任务 

            } 

            queue.add(new PrioritizedTask.EndSentinel(exec)); 

        } catch (InterruptedException e) { 

        } 

        System.out.println("Finished PrioritizedTaskProducer"); 

    } 

 

class PrioritizedTaskConsumer implements Runnable { 

    private PriorityBlockingQueue<Runnable> queue; 

 

    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) { 

        this.queue = queue; 

    } 

 

    @Override 

    public void run() { 

        try { 

            while (!Thread.interrupted()) { 

                queue.take().run(); // 任务的消费者,从PriorityBlockingQueue中取出任务执行 

            } 

        } catch (InterruptedException e) { 

        } 

        System.out.println("Finished PrioritizedTaskConsumer"); 

    } 

工作原理

PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:

  • PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11。
  • 该队列可以说是真正的*队列,它在队列满的时候会进行扩容,而前面说的*阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
  • 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
  • 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
  • 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。

注意:

  1. 堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》
  2. 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》

源码分析

定义

PriorityBlockingQueue的类继承关系如下:

阻塞队列—PriorityBlockingQueue源码分析

其包含的方法定义如下:

阻塞队列—PriorityBlockingQueue源码分析

成员属性

从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。

/** 

* 默认数组长度 

*/ 

private static final int DEFAULT_INITIAL_CAPACITY = 11; 

 

/** 

 * 最大达容量,分配时超出可能会出现 OutOfMemoryError 异常 

 */ 

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 

 

/** 

 * 队列,存储我们的元素 

 */ 

private transient Object[] queue; 

 

/** 

 * 队列长度 

 */ 

private transient int size

 

/** 

 * 比较器,入队进行权重的比较 

 */ 

private transient Comparator<? super E> comparator; 

 

/** 

 * 显示锁 

 */ 

private final ReentrantLock lock; 

 

/** 

 * 空队列时进行线程阻塞的 Condition 对象 

 */ 

private final Condition notEmpty; 

构造函数

/** 

* 默认构造,使用长度为 11 的数组,比较器为空 

*/ 

public PriorityBlockingQueue() { 

    this(DEFAULT_INITIAL_CAPACITY, null); 

/** 

* 自定义数据长度构造,比较器为空 

*/ 

public PriorityBlockingQueue(int initialCapacity) { 

    this(initialCapacity, null); 

/** 

* 自定义数组长度,可以自定义比较器 

*/ 

public PriorityBlockingQueue(int initialCapacity, 

                             Comparator<? super E> comparator) { 

    if (initialCapacity < 1) 

        throw new IllegalArgumentException(); 

    this.lock = new ReentrantLock(); 

    this.notEmpty = lock.newCondition(); 

    this.comparator = comparator; 

    this.queue = new Object[initialCapacity]; 

/** 

* 构造函数,带有初始内容的队列 

*/ 

public PriorityBlockingQueue(Collection<? extends E> c) { 

    this.lock = new ReentrantLock(); 

    this.notEmpty = lock.newCondition(); 

    boolean heapify = true; // true if not known to be in heap order 

    boolean screen = true;  // true if must screen for nulls 

    if (c instanceof SortedSet<?>) { 

        SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 

        this.comparator = (Comparator<? super E>) ss.comparator(); 

        heapify = false

    } 

    else if (c instanceof PriorityBlockingQueue<?>) { 

        PriorityBlockingQueue<? extends E> pq = 

            (PriorityBlockingQueue<? extends E>) c; 

        this.comparator = (Comparator<? super E>) pq.comparator(); 

        screen = false

        if (pq.getClass() == PriorityBlockingQueue.class) // exact match 

            heapify = false

    } 

    Object[] a = c.toArray(); 

    int n = a.length; 

    // If c.toArray incorrectly doesn't return Object[], copy it. 

    if (a.getClass() != Object[].class) 

        a = Arrays.copyOf(a, n, Object[].class); 

    if (screen && (n == 1 || this.comparator != null)) { 

        for (int i = 0; i < n; ++i) 

            if (a[i] == null

                throw new NullPointerException(); 

    } 

    this.queue = a; 

    this.size = n; 

    if (heapify) 

        heapify(); 

入队方法

入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。

offer(E e)

public void put(E e) { 

    offer(e); // never need to block 

 

public boolean offer(E e) { 

    //判断是否为空 

    if (e == null

        throw new NullPointerException(); 

    //显示锁 

    final ReentrantLock lock = this.lock; 

    lock.lock(); 

    //定义临时对象 

    int n, cap; 

    Object[] array; 

    //判断数组是否满了 

    while ((n = size) >= (cap = (array = queue).length)) 

        //数组扩容 

        tryGrow(array, cap); 

    try { 

        //拿到比较器 

        Comparator<? super E> cmp = comparator; 

        //判断是否有自定义比较器 

        if (cmp == null

            //堆上浮 

            siftUpComparable(n, e, array); 

        else 

            //使用自定义比较器进行堆上浮 

            siftUpUsingComparator(n, e, array, cmp); 

        //队列长度 +1 

        size = n + 1; 

        //唤醒休眠的出队线程 

        notEmpty.signal(); 

    } finally { 

        //释放锁 

        lock.unlock(); 

    } 

    return true

siftUpComparable(int k, T x, Object[] array)

上浮调整比较器方法的实现

private static <T> void siftUpComparable(int k, T x, Object[] array) { 

        Comparable<? super T> key = (Comparable<? super T>) x; 

        while (k > 0) { 

         //无符号向左移,目的是找到放入位置的父节点 

            int parent = (k - 1) >>> 1; 

            //拿到父节点的值 

            Object e = array[parent]; 

            //比较是否大于该元素,不大于就没比较交换 

            if (key.compareTo((T) e) >= 0) 

                break; 

            //以下都是元素位置交换 

            array[k] = e; 

            k = parent; 

        } 

        array[k] = key

    } 

根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。

入队图解

例子:85 添加到二叉堆中(大顶堆)

package com.niuh.queue.priority; 

 

import java.util.Comparator; 

import java.util.concurrent.PriorityBlockingQueue; 

 

/** 

 * <p> 

 * PriorityBlockingQueue 简单演示 demo 

 * </p> 

 */ 

public class TestPriorityBlockingQueue { 

 

    public static void main(String[] args) throws InterruptedException { 

        // 大顶堆 

        PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 

            @Override 

            public int compare(Integer o1, Integer o2) { 

                return o2 - o1; 

            } 

        }); 

 

        concurrentLinkedQueue.offer(90); 

        concurrentLinkedQueue.offer(80); 

        concurrentLinkedQueue.offer(70); 

        concurrentLinkedQueue.offer(60); 

        concurrentLinkedQueue.offer(40); 

        concurrentLinkedQueue.offer(30); 

        concurrentLinkedQueue.offer(20); 

        concurrentLinkedQueue.offer(10); 

        concurrentLinkedQueue.offer(50); 

        concurrentLinkedQueue.offer(85); 

        //输出元素排列 

        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 

        //取出元素 

        Integer take = concurrentLinkedQueue.take(); 

        System.out.println(); 

        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 

    } 

 阻塞队列—PriorityBlockingQueue源码分析

操作的细节分为两步:

  • 第一步:首先把新元素插入到堆的尾部再说;(新的元素可能是特别大或者特别小,那么要做的一件事情就是重新维护一下堆的所有元素,把新元素挪到这个堆的相应的位置)
  • 第二步:依次向上调整整个堆的结构,就叫 HeapifyUp

  阻塞队列—PriorityBlockingQueue源码分析

85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。

阻塞队列—PriorityBlockingQueue源码分析

85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。

 阻塞队列—PriorityBlockingQueue源码分析

出队方法

入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);
  • E peek()

poll 和 peek 与上面类似,这里不做说明

take()

出队方法,该方法会阻塞

public E take() throws InterruptedException { 

 //显示锁 

    final ReentrantLock lock = this.lock; 

    //可中断锁 

    lock.lockInterruptibly(); 

    //结果接收对象 

    E result; 

    try { 

     //判断队列是否为空 

        while ( (result = dequeue()) == null

         //线程阻塞 

            notEmpty.await(); 

    } finally { 

        lock.unlock(); 

    } 

    return result; 

dequeue()

我们再来看看具体出队方法的实现,dequeue方法

private E dequeue() { 

//长度减少 1 

   int n = size - 1; 

   //判断队列中是否有元素 

   if (n < 0) 

       return null

   else { 

    //队列对象 

       Object[] array = queue; 

       //取出第一个元素 

       E result = (E) array[0]; 

       //拿出最后一个元素 

       E x = (E) array[n]; 

       //置空 

       array[n] = null

       Comparator<? super E> cmp = comparator; 

       if (cmp == null

        //下沉调整 

           siftDownComparable(0, x, array, n); 

       else 

           siftDownUsingComparator(0, x, array, n, cmp); 

       //成功则减少队列中的元素数量 

       size = n; 

       return result; 

   } 

总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。

siftDownComparable(int k, T x, Object[] array, int n)

再来看看下沉比较器方法的实现

private static <T> void siftDownComparable(int k, T x, Object[] array, 

                                               int n) { 

    //判断队列长度 

    if (n > 0) { 

        Comparable<? super T> key = (Comparable<? super T>)x; 

        //找到队列最后一个元素的父节点的索引。 

        int half = n >>> 1;           // loop while a non-leaf 

        while (k < half) { 

         //拿到 k 节点下的左子节点 

            int child = (k << 1) + 1; // assume left child is least 

            //取得子节点对应的值 

            Object c = array[child]; 

            //取得 k 右子节点的索引 

            int right = child + 1; 

            //比较右节点的索引是否小于队列长度和左右子节点的值进行比较 

            if (right < n && 

                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 

                c = array[child = right]; 

            //比较父节点值是否大于子节点 

            if (key.compareTo((T) c) <= 0) 

                break; 

            //下面都是元素替换 

            array[k] = c; 

            k = child; 

        } 

        array[k] = key

    } 

出队图解

将堆尾元素替换到顶部(即堆顶被替代删除掉)

依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown

例子:90 从二叉堆中删除(大顶堆)

阻塞队列—PriorityBlockingQueue源码分析

总结

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。