Java多线程 -- JUC包源码分析15 -- SynchronousQueue与CachedThreadPool

时间:2022-09-21 10:01:01

在前面分析工具类Executors的时候,提到了CachedThreadPool:其线程数会无限增大,每来一个新请求,就会new一个Thread,其maxPoolSize = Integer.MAX。
其构造函数如下:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//ThreadPoolExecutor的execute函数:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

至所以会达到这个效果:就是因为SynchronousQueue本身没有容量,上面的wokerQueue.offer(command)函数,永远返回空,所以就会一直走到下面的addIfUnderMaximuxPoolSize里面。

下面就来详细分析SynchronousQueue

SynchronousQueue使用方式

SynchronousQueue最大的特点就是put/take是成对调用的:
先调put,线程会阻塞在那;直到另外一个线程调用了take,2个线程才同时解锁。反之亦然。
对于多个线程,比如3个,调用3次put,3个都会阻塞在那;直到等另外的线程,调用3次take,大家才同时解锁。反之亦然。

这里就会有1个问题:先调用了3次put,那调用take的时候,是首先唤醒哪一个put线程呢?第1个,还是最后一个呢?

这就涉及到2种不同的模式:公平模式(队列模式) 和 非公平模式(栈模式)。

队列模式:最先调用put的线程,最先被take唤醒

栈模式:最后调用put的线程,最先被take唤醒。

SynchronousQueue实现

public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {


private transient volatile Transferer transferer;

public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { //第一个参数为put进去的obejct
Thread.interrupted();
throw new InterruptedException();
}
}

public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); //第1个参数为空,返回值是上面put进去的object
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

public E poll() {
return (E)transferer.transfer(null, true, 0);
}

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
。。。
}

从代码可以看出,无论是put/take/offer/poll,都是调用的transfer.transfer函数,只是传进去的参数不一样而已。

那这个Transfer是什么呢?看代码知道,Transfer是个接口,有TransferQueue/TransferStack 2种实现,这2个实现,都是SynchronousQueue的内部类,其结果如下:

static final class TransferStack extends Transferer {
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
...
}

volatile SNode head;

...
}

static final class TransferQueue extends Transferer {
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
...
}

transient volatile QNode head;
transient volatile QNode tail;
}

从上面代码可以看出,2者结构都是一个单向链表。对于栈,只需要维护head结点;对于队列,维护head + tail结点。

实现思路

不管是栈,还是队列,其基本实现思路类似:
put的时候,new一个item != null的结点;take的时候,new一个item = null的结点。从head开始遍历,遇到的结点和自己同类型的,说明没有匹配者,自己也加入链表;遇到和自己不同类型的,尝试匹配。
加上链表的时候,如果是栈模式,加在头部;如果是队列模式,加在尾部。如下图所示:

Java多线程 -- JUC包源码分析15 -- SynchronousQueue与CachedThreadPool

transfer代码

//TransferQueue.transfer
Object transfer(Object e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin

if (h == t || t.isData == isData) { //队列为空,或者head结点和自己同类型
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;

advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;

} else { //队列不为空,head和自己不同类型
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read

Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}

advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null)? x : e;
}
}
}

//TransferStack.transfer
Object transfer(Object e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null)? REQUEST : DATA;

for (;;) { //for循环里面3大分支
SNode h = head;
if (h == null || h.mode == mode) { //case1:栈为空,或者栈顶元素和自己同类型
if (timed && nanos <= 0) { // 关键点:offer函数就走的这个逻辑,offer(e, ture, 0) 一直会返回null
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { //case 2: 和自己不同类型,并且没有其他线程fulfilling这个结点,进入
if (h.isCancelled())
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (mode == REQUEST)? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { //case 3: 和自己不同类型,但现在有其他线程正在fulfilling此结点
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

这里面有一个关键点:

无论是栈的实现,还是队列的实现,链表本身是没有加锁的。因此在多线程访问下,就会有弱一致性问题,inconsistent read问题。

但这个不会出问题,因为匹配上,那是最好;匹配不上,出现inconsistent read,for循环回来再重新读,直到匹配上了,线程才会解锁。

也正因为如此,上面的代码中,有诸多的double check逻辑。