Java多线程 -- JUC包源码分析16 -- Exchanger源码分析

时间:2022-01-30 07:30:55

上一篇所讲的SynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take。
而今天所讲的Exchange,顾明思义,是双向的数据传输,2个线程在一个同步点,交换数据。

其使用方式,大致如下:

Exchange<String> exchange = new Exchange<String>(); //建1个多个线程共用的exchange对象

//把exchange对象,传给4个线程对象。每个线程在自己的run函数里面,调用exchange,把自己的数据当参数传进去,返回值是另外一个线程调用exchange塞进去的参数。
ThreadA a = new ThreadA(exchange);
run()
{
String other = exchange.exchange(self.data) //没有别的线程调用exchange的话,自己会阻塞在这。直到有别的线程调用exchange。
}


ThreadB b = new ThreadB(exchange);
run()
{
String other = exchange.exchange(self.data)
}


ThreadB c = new ThreadC(exchange);
run()
{
String other = exchange.exchange(self.data)
}

ThreadC d = new ThreadD(exchange);
run()
{
String other = exchange.exchange(self.data)
}

在上面的例子中,4个线程并发的调用exchange,会两两交互数据。可能是A/B, C/D,也可能A/C, B/D,也可能是A/D, B/C。

Exchange实现

从简单的角度来考虑,Exchange只需要一个互斥变量就够了。因为可以限制,任何时候,只能有1对发生交换,不是多对,同时发生交换。

但为了提高并发度,Exchange内部用了多个变量,在其内部,称之为Slot。

public class Exchanger<V> {

private static final class Slot extends AtomicReference<Object> {
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}

private volatile Slot[] arena = new Slot[CAPACITY];

。。。
}

关键技术点1:CacheLine填充

在上面的代码中,Slot其实就是一个AtomicReference,其里面的q0, q1,..qd那些变量,都是多余的,不用的。那为什么要添加这些多余的变量呢?

是为了让不同的Slot不要落在cpu的同一个CacheLine里面。因为cpu从内存读取数据的时候,不是一个字节一个字节的读,而是按块读取,这里的块也就是“CacheLine”,一般一个CacheLine大小是64Byte。

保证一个Slot的大小 >= 64Byte,这样更改一个Slot,就不会导致另外一个Slot的cpu cache失效,从而提高性能。

知道了Slot其实就是一个AtomicReference,下面讨论第2个技术点

关键技术点2:锁分离

同ConcurrentHashMap类型,Exchange没有只定义一个slot,而是定义了一个slot的数组。这样在多线程调用exchange的时候,可以各自在不同的slot里面进行匹配。

exchange的基本思路如下:
(1)根据每个线程的thread id, hash计算出自己所在的slot index;
(2)如果运气好,这个slot被人占着(slot里面有node),并且有人正在等待交换,那就和它进行交换;
(3)slot为空的(slot里面没有node),自己占着,等人交换。没人交换,向前挪个位置,把当前slot里面内容取消,index减半,再看有没有交换;
(4)挪到0这个位置,还没有人交互,那就阻塞,一直等着。别的线程,也会一直挪动,直到0这个位置。

所以0这个位置,是一个交易的“终结点”位置!别的位置上找不到人交易,最后都会到0这个位置。

下面是exchange的源代码:

    public V exchange(V x) throws InterruptedException {
if (!Thread.interrupted()) {
Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
if (v == NULL_ITEM)
return null;
if (v != CANCEL)
return (V)v;
Thread.interrupted(); // Clear interrupt status on IE throw
}
throw new InterruptedException();
}

private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item);
int index = hashIndex(); //根据thread id计算出自己要去的那个交易位置(slot)
int fails = 0;

for (;;) {
Object y;
Slot slot = arena[index];
if (slot == null)
createSlot(index); //slot = null,创建一个slot,然后会回到for循环,再次开始
else if ((y = slot.get()) != null && //slot里面有人等着(有Node),则尝试和其交换
slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
Node you = (Node)y;
if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的
LockSupport.unpark(you.waiter); //唤醒对方
return you.item; //自己把对方的东西拿走
} //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始
}
else if (y == null && //slot里面为空(没有Node),则自己把位置占住
slot.compareAndSet(null, me)) {
if (index == 0) //如果是0这个位置,自己阻塞,等待别人来交换
return timed? awaitNanos(me, slot, nanos): await(me, slot);
Object v = spinWait(me, slot); //不是0这个位置,自旋等待
if (v != CANCEL) //自旋等待的时候,运气好,有人来交换了,返回
return v;
me = new Node(item); //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环
int m = max.get();
if (m > (index >>>= 1))
max.compareAndSet(m, m - 1);
}
else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了 case2: slot没人,自己要占位置,又被人家抢了
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1; //3次匹配失败,把index扩大,再次开始for循环
else if (--index < 0)
index = m;
}
}
}

注意:上面唯一有点绕的地方,是Slot里面套了个Node。双方的交易地点,准确说,不是在Slot里面,是在Slot的Node里面。
当Slot的Node = null时候,代表Slot为空,新建一个Node占住;
当Slot的Node != null的时候,把Node拿出来,双方在Node里面交换,此时Slot已经释放出去了!

总结

Exchanger的代码虽然少,但还是蛮绕的,总结下来,有以下几个关键点:
(1) Cache Line填充技术
(2) 锁分离
(3) 俩人还没交换结束的时候,就把slot给释放出去了。双方拿着那个Node,在Node里面交互的。也就是上面的for循环里面的第2个分支。
这里面有一种情况就是:运气不好,2人正要在Node里面交换的时候,被另外一个线程抢了。那这个没抢到的,只能for循环,重新开始
(4)0是交换的终结点位置:没有人交换的时候,会不断挪位置,挪的时候,要把当前位置的slot清空掉,一个人不能同时占2个Slot。直到挪到0这个位置,一直等待。