【Java并发编程】18、PriorityBlockingQueue源码分析

时间:2021-03-24 10:36:42

PriorityBlockingQueue是一个基于数组实现的线程安全的*队列,原理和内部结构跟PriorityQueue基本一样,只是多了个线程安全。javadoc里面提到一句,1:理论上是*的,所以添加元素可能导致outofmemoryerror;2.不容许添加null;3.添加的元素使用构造时候传入Comparator排序,要不然就使用元素的自然排序。

PriorityBlockingQueue是基于优先级,不是FIFO,这是个好东西,可以用来实现优先级的线程池,高优先级的先执行,低优先级的后执行。跟之前看过的几个队列一样,都是继承AbstractQueue实现BlockingQueue接口。

对于优先级的实现,是采用数组来实现堆的,大概样子画个图容易理解:

【Java并发编程】18、PriorityBlockingQueue源码分析

堆顶元素是最小的,对于各左右子堆也保证堆顶元素最小。应用的数据结构为:最大堆/最小堆,是一个完全二叉树

容易混淆的概念:

二叉排序树,又称二叉查找树,又称二叉搜索树
或者是一棵空树,或者是具有下列性质的二叉树
(1)若左子树不空,则左子树上所有结点的值均小于或等于它的根结点的值;
(2)若右子树不空,则右子树上所有结点的值均大于或等于它的根结点的值;
(3)左、右子树也分别为二叉排序树;
 
完全二叉树:叶节点只能出现在最下层和次下层,并且最下面一层的结点都集中在该层最左边的若干位置的二叉树
 
平衡二叉树:它是一 棵空树或它的左右两个子树的高度差的绝对值不超过1,并且左右两个子树都是一棵平衡二叉树,同时,平衡二叉树必定是二叉搜索树
 
红黑树:是一种自平衡二叉查找树

内部结构和构造:

//基于数组实现的,如果构造没有传入容量,就是用默认大小
private static final int DEFAULT_INITIAL_CAPACITY = 11; /**
* 数组最大容量
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /**
* 优先级队列数组,记住queue[n]的2个左右子元素在数组的位置为在queue[2*n+1]和queue[2*(n+1)]
*/
private transient Object[] queue; /**
* 队列元素个数
*/
private transient int size; /**
* 比较器,构造时可以选择传入,没有就null,到时候就使用元素的自然排序
*/
private transient Comparator<? super E> comparator; /**
* 重入锁控制多有操作
*/
private final ReentrantLock lock; /**
* 队列为空的时候条件队列
*/
private final Condition notEmpty; /**
* 自旋锁
*/
private transient volatile int allocationSpinLock; /**
* 序列化的时候使用PriorityQueue,这个PriorityBlockingQueue几乎一模一样
*/
private PriorityQueue q; /**
* 默认构造,使用默认容量,没有比较器
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
} public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
} /**
* 最终调用的构造
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

内部结构和构造没有什么特别的地方,基于数组实现优先级的堆,记住数组元素queue[n]的左节点queue[2*n+1]和右节点queue[2*(n+1)],每次出队的都是queue[0]。

看下常用方法:

add、put、offer都是最终调用offer()方法:

所有的添加元素最后都是调用offer方法,2步:扩容+存储,大体流程为:

1.加锁,检查元素数量是否大于等于数组长度,如果是,那就扩容,扩容没必要使用主锁,先释放锁,使用cas自旋锁,容量最少翻倍,释放自旋锁,可能存在竞争,检查下,是否扩容,如果扩容那就复制数组,再度加主锁;

2.看构造入参是否有comparator,有就使用,没有就自然排序,从数组待插入位置父节点开始比较大,如果大于父节点,那就直接待插入位置插入,否则就跟父节点交换,然后循环向上查找,数量加1,通知非空条件队列take,最后释放锁。

看下几个出队操作:

出队的大体流程:

1.加锁,获取queue[0],清掉堆的最后一个叶子节点,并将其作为比较节点。等价于把最后一个叶子节点移到了queue[0]位置。然后从顶向下比较,找到新的queue[0]应该在的位置

2.调用从顶向下调整的方法:待调整位置节点左右节点和之前的叶子节点比较,如果之前叶子节点最小,那就直接放入待调整位置,如果是叶子节点小,那就取小的那个放入待调整位置,并且将小的部分重新循环查找,循环次数根据2分查找,基本是元素数量的一半就到找到位置。

再看一个remove,因为remove方法,2中调整方式都用到了:

remove的时候有2个调整,先自顶向下调整,保证最小,然后再向上调整。

出处:http://blog.csdn.net/xiaoxufox/article/details/51860543