JDK 中基于链表的非阻塞*队列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 内部是如何使用 CAS 非阻塞算法来保证多线程下入队出队操作的线程安全?
ConcurrentLinkedQueue是线程安全的*非阻塞队列,其底层数据结构是使用单向链表实现,入队和出队操作是使用我们经常提到的CAS来保证线程安全的。
我们首先看一下ConcurrentLinkedQueue的类图结构先,好有一个内部逻辑有一个大概的印象,如下图所示:
可以清楚的看到ConcurrentLinkedQueue内部的队列是使用单向链表方式实现,类中两个volatile 类型的Node 节点分别用来存放队列的首位节点。
首先我们先来看一下ConcurrentLinkedQueue的构造函数,如下:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
通过无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
Node节点内部则维护一个volatile 修饰的变量item 用来存放节点的值,next用来存放链表的下一个节点,从而链接为一个单向*链表,这就是单向*的根本原因。如下图:
接下来看ConcurrentLinkedQueue 主要关注入队,出队,获取队列元素的方法的源码,如下所示:
1.首先看入队方法offer,offer 操作是在队列末尾添加一个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是*队列该方法一直会返回 true。另外由于使用 CAS 无阻塞算法,该方法不会阻塞调用线程,其源码如下:
public boolean offer(E e) {
//(1)e为null则抛出空指针异常
checkNotNull(e); //(2)构造Node节点
final Node<E> newNode = new Node<E>(e); //(3)从尾节点进行插入
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //(4)如果q==null说明p是尾节点,则执行插入
if (q == null) { //(5)使用CAS设置p节点的next节点
if (p.casNext(null, newNode)) {
//(6)cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
if (p != t)
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)//(7)
//多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
//重新找新的head,因为新的head后面的节点才是正常的节点。
p = (t != (t = tail)) ? t : head;
else
//(8) 寻找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
类图结构时候谈到构造队列时候参构造函数创建了一个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下面通过图形结合来讲解下 offer 操作的代码实现。
1.首先看一下,当一个线程调用offer(item)时候情况:首先代码(1)对传参判断空检查,如果为null 则抛出空指针异常,然后代码(2)则使用item作为构造函数参数创建一个新的节点,
代码(3)从队列尾部节点开始循环,目的是从队列尾部添加元素。如下图:
上图是执行代码(4)时候队列的情况,这时候节点 p , t ,head ,tail 同时指向了item为null的哨兵节点,由于哨兵节点的next节点为null,所以这里q指向也是null。
代码(4)发现q==null 则执行代码(5),通过CAS原子操作判断p 节点的next节点是否为null,如果为null 则使用节点newNode替换p 的next节点,
然后执行代码(6),由于 p == t ,所以没有设置尾部节点,然后退出offer方法,这时候队列的状态图如下:
上面讲解的是一个线程调用offer方法的情况下,如果多个线程同时调用,就会存在多个线程同时执行到代码(5),假设线程A调用offer(item1),
线程B调用offer(item2),线程 A 和线程B同时到 p.casNext(null,newNode)。而CAS的比较并设置操作是原子性的,假设线程A先执行了比较设置操作,
则发现当前P的next节点确实是null ,则会原子性更新next节点为newNode,这时候线程B 也会判断p 的next节点是否为null,结果发现不是null,(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到代码(3),
然后执行到代码(4)的时候的队列分布图如下:
根据这个状态图可知线程B会执行代码(8),然后q 赋值给了p,这个时候状态图为:
然后线程B再次跳转到代码(3)执行,当执行到代码(4)时候队列状态图为:
由于这时候q == null ,所以线程B 会执行步骤(5),通过CAS操作判断 当前p的next 节点是否为null ,不是则再次循环后尝试,是则使用newNode替换,假设CAS成功了,那么执行步骤(6),
由于 p != t 所以设置tail节点为newNode ,然后退出offer方法。这时候队列的状态图为:
到现在为止,offer代码在执行路径现在就差步骤(7)还没有执行过,其实这个要在执行poll操作才会出现的,这里先看一下执行poll操作后可能会存在的一种情况,如下图所示:
下面分析下当队列处于这种状态调用offer添加元素代码执行到代码(4)的时候的队列状态图,如下:
由于q节点不为空并且p==q 所以执行代码(7),因为 t == tail所以p 被赋值为head ,然后进入循环,循环后执行到代码(4)的时候的队列状态图,如下:
由于 q ==null,所以执行代码(5),进行CAS操作,如果当前没有其他线程执行offer操作,则CAS操作会成功,p的next节点被设置为新增节点,然后执行代码(6),
由于p != t 所以设置新节点为队列尾节点,现在队列状态图,如下:
在这里的自引用的节点会被垃圾回收掉,可见offer操作里面关键步骤是代码(5)通过原子CAS操作来进行控制同时只有一个线程可以追加元素到队列末尾,进行cas竞争失败的线程,
则会通过循环一次次尝试进行cas操作,知道cas成功才会返回,也就是通过使用无限循环里面不断进行CAS尝试方式来替代阻塞算法挂起调用线程,相比阻塞算法,这是使用CPU资源换取阻塞带来的开销。
2.poll操作,poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null,我们首先看改方法的源码,如下:
public E poll() {
//(1) goto标记
restartFromHead: //(2)无限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) { //(3)保存当前节点值
E item = p.item; //(4)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(5)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//(6)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//(7)自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
else//(8)
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
poll操作是从队头获取元素,所以代码(2)内层循环是从head节点开始迭代,代码(3)获取当前队头的节点,当队列一开始为空的时候队列状态为:
由于head 节点指向的item 为null 的哨兵节点,所以会执行到代码(6),假设这个过程没有线程调用offer,则此时q等于null ,如下图:
所以执行updateHead方法,由于h 等于 p所以没有设置头节点,poll方法直接返回null。
假设执行到代码(6)的时候已经有其他线程调用了offer 方法成功添加了一个元素到队列,这时候q执行的是新增元素的节点,这时候队列状态图为:
所以代码(6)判断结果为false,然后会转向代码(7)执行,而此时p不等于q,所以转向代码(8)执行,执行结果是p指向了节点q,此时的队列状态如下:
然后程序转向代码(3)执行,p现在指向的元素值不为null,则执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null,
如果此时没有其他线程进行poll操作,CAS成功则执行代码(5),由于此时 p != h ,所以设置头节点为p,poll然后返回被从队列移除的节点值item。此时队列状态为:
这个状态就是前面提到offer操作的时候,offer代码的执行路径(7)执行的前提状态。
假如现在一个线程调用了poll操作,则在执行代码(4)的时候的队列状态为:
可以看到这时候执行代码(6)返回null。
现在poll的代码还有个分支(7)还没有被执行过,那么什么时候会执行呢?假设线程A执行poll操作的时候,当前的队列状态,如下:
那么执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null。
假设 CAS 设置成功则标示该节点从队列中移除了,此时队列状态为:
然后由于p != h,所以会执行updateHead 方法,假如线程A执行updateHead前,另外一个线程B开始poll操作,这时候线程B的p指向head节点,
但是还没有执行到代码(6),这时候队列状态为:
然后线程A执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
然后线程B继续执行代码(6)q=p.next由于该节点是自引用节点所以p==q,所以会执行代码(7)跳到外层循环restartFromHead,重新获取当前队列队头 head, 现在状态为:
总结:poll元素移除一个 元素的时候,只是简单的使用CAS操作把当前节点的item值设置为null,然后通过重新设置头节点让该元素从队列里面摘除,
被摘除的节点就成了孤立节点,这个节点会被在GC的时候会被回收掉。另外,执行分支中如果发现头节点被修改了要跳到外层循环重新获取新的头节点。
3.peek操作,peek 操作是获取队列头部一个元素(只不获取不移除),如果队列为空则返回 null,其源码如下:
public E peek() {
//(1)
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
//(2)
E item = p.item;
//(3)
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//(4)
else if (p == q)
continue restartFromHead;
else
//(5)
p = q;
}
}
}
代码结构与poll操作类似,不同于代码(3)的使用只是少了castItem 操作,其实这很正常,因为peek只是获取队列头元素值,并不清空其值,
根据前面我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次peek的时候,代码(3)中会发现item==null,
然后会执行 q = p.next, 这时候 q 节点指向的才是队列里面第一个真正的元素或者如果队列为 null 则 q 指向 null。
当队列为空的时候,队列状态图,如下:
这时候执行updateHead 由于 h 节点等于 p 节点所以不进行任何操作,然后 peek 操作会返回 null。
当队列中至少有一个元素的时候(假如只有一个),这时候队列状态为:
这时候执行代码(5)这时候 p 指向了 q 节点,然后执行代码(3)这时候队列状态为:
执行代码(3)发现 item 不为 null,则执行 updateHead 方法,由于 h!=p, 所以设置头结点,设置后队列状态为:
可以看到其实就是剔除了哨兵节点。
总结:peek操作代码与poll操作类似,只是前者只获取队列头元素,但是并不从队列里面删除,而后者获取后需要从队列里面删除,另外,在第一次调用peek操作的时候,
会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。
4.size方法,获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。源码如下:
public int size() {
int count = ;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大返回Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}
//获取第一个队列元素(哨兵元素不算),没有则为null
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//获取当前节点的next元素,如果是自引入节点则返回真正头节点
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
5.remove方法,如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回 true,否者返回 false。源码如下:
public boolean remove(Object o) { //查找元素为空,直接返回false
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item; //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) { //获取next元素
Node<E> next = succ(p); //如果有前驱节点,并且next不为空则链接前驱节点到next,
if (pred != null && next != null)
pred.casNext(p, next);
return true;
}
pred = p;
}
return false;
}
ConcurrentLinkedQueue 底层使用单向链表数据结构来保存队列元素,每个元素被包装为了一个 Node 节点,队列是靠头尾节点来维护的,创建队列时候头尾节点指向一个 item 为 null 的哨兵节点,
第一次 peek 或者 first 时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以获取 size 的时候有可能进行了 offer,poll 或者 remove 操作,导致获取的元素个数不精确,所以在并发情况下 size 函数不是很有用。
- JDK 中基于链表的非阻塞*队列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 内部是如何使用 CAS 非阻塞算法来保证多线程下入队出队操作的线程安全?
Java并发编程笔记之ConcurrentLinkedQueue源码探究的更多相关文章
-
Java并发编程笔记之LinkedBlockingQueue源码探究
JDK 中基于链表的阻塞队列 LinkedBlockingQueue 原理剖析,LinkedBlockingQueue 内部是如何使用两个独占锁 ReentrantLock 以及对应的条件变量保证多线 ...
-
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝 ...
-
Java并发编程笔记之ThreadLocalRandom源码分析
JDK 并发包中 ThreadLocalRandom 类原理剖析,经常使用的随机数生成器 Random 类的原理是什么?及其局限性是什么?ThreadLocalRandom 是如何利用 ThreadL ...
-
Java并发编程笔记之ThreadLocal源码分析
多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,多线程访问同一个共享变量特别容易出现并发问题,特别是多个线程需要对一个共享变量进行写入时候, ...
-
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景.通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过Fu ...
-
Java并发编程笔记之SimpleDateFormat源码分析
SimpleDateFormat 是 Java 提供的一个格式化和解析日期的工具类,日常开发中应该经常会用到,但是由于它是线程不安全的,多线程公用一个 SimpleDateFormat 实例对日期进行 ...
-
Java并发编程笔记之Timer源码分析
timer在JDK里面,是很早的一个API了.具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一 ...
-
Java并发编程笔记之CyclicBarrier源码分析
JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复 ...
-
Java并发编程笔记之PriorityBlockingQueue源码分析
JDK 中*优先级队列PriorityBlockingQueue 内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的? PriorityBlock ...
随机推荐
-
IIS7.5 由于 Web 服务器上的“ISAPI 和 CGI 限制”列表设置,无法提供您请求的页面
IIS7.5中将一网站应用程序池托管管道模式改为经典后,网站页面打不开,错误信息: 引用内容 HTTP 错误 404.2 - Not Found由于 Web 服务器上的“ISAPI 和 CGI 限制” ...
-
java 25 - 4 网络编程之 UDP协议传输的代码优化
UDP协议的输出端: /* UDP发送数据: A:创建Socket发送端对象 B:创建数据报包(把数据打包) C:调用Socket对象发送数据报包 D:释放资源(底层是IO流) */ public c ...
-
ZOJ 1067 Color Me Less
原题链接 题目大意:一道类似于简单图像压缩的题目.给定一个调色板,然后把24位真彩色按照就近原则聚类. 解法:每个像素的色彩都是RGB三个值,相当于三维空间的一个点.所以当一个新的像素进来时,分别和调 ...
-
去掉IntelliJ IDEA的拼写检查
Settings→Editor→Inspections→Spelling 去掉Spelling下的Typo复选框即可 来自为知笔记(Wiz)
-
NWERC 2012 Problem J Joint Venture
刚刚开始想的是用二分的方法做,没想到这个题目这么水,直接暴力就行: 代码: #include<cstdio> #include<algorithm> #define maxn ...
-
一个cocoapods问题的解决,希望能帮助到遇到相似情况的人
之前10.7的系统上执行过cocoapods没有问题.如今系统版本号升级到了10.9,尝试使用cocoapods遇到问题,报告了类似以下的错误: Psych::SyntaxError - (/User ...
-
基于nodejs的流水线式的CRUD服务。依赖注入可以支持插件。
写代码好多年了,发现大家的思路都是写代码.写代码.写代码,还弄了个称号——码农. 我是挺无语的,我的思路是——不写代码.不写代码.不写代码! 无聊的代码为啥要重复写呢?甚至一写写好几年. 举个例子吧, ...
-
状态压缩动态规划 状压DP
总述 状态压缩动态规划,就是我们俗称的状压DP,是利用计算机二进制的性质来描述状态的一种DP方式 很多棋盘问题都运用到了状压,同时,状压也很经常和BFS及DP连用,例题里会给出介绍 有了状态,DP就比 ...
-
Apache Atlas
atlas英 [ˈætləs] 阿特拉斯. 美 [ˈætləs] n.地图集;〈比喻〉身负重担的人 == Apache Atlas Version: 1.1.0 Last Published: 201 ...
-
8,EasyNetQ-多态发布和订阅
您可以订阅一个接口,然后发布该接口的实现. 我们来看一个例子. 我有一个接口IAnimal和两个实现猫和狗: public interface IAnimal { string Name { get; ...