Java Thread&Concurrency(13): 深入理解ConcurrentLinkedQueue及其实现原理

时间:2022-11-12 17:06:22

ConcurrentLinkedQueue是一个基于链表实现的非阻塞队列,特点是head和tail是可以滞后的,甚至tail落到head的后面,准确得说,是当事实的head距离当前head至少两个link时才会修改head,这种设计可以在某些时候提高效率。

我们接着来看源码实现,这里主要是offer和poll方法:

    public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

  • 根据数据构造节点newNode,取得尾节点tail。
  • 接着判断next,假如不为空则取它或者取得新的head节点或者新的tail,这里依据此时是否tail已经滞后head了,以及tail已经被修改。
  • 假如next为nul,则尝试CAS添加newNode到当前实际的尾节点p,这里的CAS操作就是可线性化点。
  • 如果上述成功,则在尾节点距离达到或者超过2个link时更新尾节点。
  • 直接返回true。
poll方法:
    public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
  • 取得头结点,假如item为null或者CAS失败,则需要向后更新p结点,这里取得新p的方式无非就是沿着next链。
  • 当CAS操作成功之后,同样是距离2个link时更新head结点,然后返回数据。
关于ConcurrentLinkedQueue就是以上这些,总体来说,对于使用者来说,它是个基于链表的、不会阻塞的队列。