SynchronousQueue------TransferStack源码分析

时间:2021-01-22 17:37:48

s,e在线程栈里面,TransferStack在堆里面,方法只是线程的执行逻辑。线程过来调用transfer方法,线程在堆里面创建一个节点,加到Stack里面去,然后这个线程归属节点的waiter,阻塞(方法局部变量保留)。配对的线程过来,在堆里创建一个节点加入stack,

配对后移除2个节点,正在配对时候,有线程带着局部变量e入队或者来交易,什么都不做只是帮助匹配(同时只能一个节点在配对),帮助配对完成之后,该入队就入队该交易就交易。

SynchronousQueue------TransferStack源码分析

 

package com.itmayiedu;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.Spliterator;
import java.util.Spliterators;

public class SynchronousStack<E>   {
    abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    static final int maxUntimedSpins = maxTimedSpins * 16;
    static final long spinForTimeoutThreshold = 1000L;

    static final class TransferStack<E> extends Transferer<E> {
        static final int REQUEST    = 0;//消费者 
        static final int DATA       = 1;//生产者 
        static final int FULFILLING = 2;//表示正在进行交易的节点。
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }//FULFILLING返回true,是否是正在进行交易的生产者或者消费者。
        static final class SNode {
            volatile SNode next;         
            volatile SNode match;       // 相匹配的节点
            volatile Thread waiter;      // 等待的线程
            //item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读
            Object item;                // item 域
            int mode;//  REQUEST,DATA,FULFILLING
            SNode(Object item) {
                this.item = item;
            }
            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
            boolean tryMatch(SNode s) {//匹配成功,则unpark等待线程
                if (match == null &&//设置本结点的匹配为s节点
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }
            void tryCancel() {//取消这个节点,match从原来的null变为this
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }
            boolean isCancelled() {
                return match == this;
            }
            private static final sun.misc.Unsafe UNSAFE;
            private static final long matchOffset;
            private static final long nextOffset;
            static {
                try {
                    UNSAFE = getUnsafe();//sun.misc.Unsafe.getUnsafe();
                    Class<?> k = SNode.class;
                    matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }
        volatile SNode head;//栈的头结点 
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);//改变头节点需要cas只有一个线程成功
        }
        static SNode snode(SNode s, Object e, SNode next, int mode) {//入账,新进来节点下一个节点是head节点。
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

        E transfer(E e, boolean timed, long nanos) {
            SNode s = null;
            int mode = (e == null) ? REQUEST : DATA;//消费者是0生产者是1
            for (;;) {
                SNode h = head;//刚开始头节点为null,第一个进来的节点就是头节点。
                //入队2步:构建新节点新节点.next=原来头节点原来头节点变为新节点
                if (h == null || h.mode == mode) {  // 栈为空或者当前节点模式与头节点模式一样,将节点压入栈内,等待匹配
                    if (timed && nanos <= 0) {      // 新进来的入队节点已经超时,还要看一下头节点是否取消了?只有一个头节点,看下头节点。
                        if (h != null && h.isCancelled())// 节点被取消了,向前推进
                            casHead(h, h.next);    
                        else
                            return null;// 头节点没有被取消,但是这个节点已经超时,什么都不做,直接返回
                    } else if (casHead(h, s = snode(s, e, h, mode))) {//线程栈里面构建节点s,头节点指向最新进来的节点s,s.next=原来头节点
                        SNode m = awaitFulfill(s, timed, nanos);//  等待 匹配,线程阻塞时候局部变量保留,唤醒时候再次使用不变。
                        if (m == s) {  // 返回match到的节点m == s节点自己, 表示该节点被取消了或者超时、中断了
                            clean(s);
                            return null;
                        }
                        // 先唤醒在移除节点,唤醒之后这里执行唤醒这里执行时候有可能还没有移除,这里就帮助移除唤醒的是第一个生产节点s=62,此时head是交易节点81, h.next == s
                        if ((h = head) != null && h.next == s) 
                            casHead(h, s.next);// 将s.next节点设置为head,相当于取消节点h、s帮助移除。
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                // 取节点,头节点不是正在取节点的节点,头节点没有配对//取节点:线程过来调用transfer方法,线程在堆里面创建一个节点改变模式为FULFILLING,加到Stack里面去,然后配对,移除2个节点,返回值。
                } else if (!isFulfilling(h.mode)) { 
                    if (h.isCancelled())            // 取节点看头节点有没有取消
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//消费者s进来也要入队成为头节点,将这个节点s模式变为FULFILLINGs是交易节点
                        for (;;) { // 比while效率高
                            SNode m = s.next;       //s是交易节点81,m是被匹配的节点62
                            if (m == null) {         // m == null,其他帮助配对的线程移除了
                                casHead(s, null);    // 将s弹出
                                s = null;           // 将s置空,下轮循环的时候还会新建,帮助GC
                                break;              // 退出该循环,继续主循环
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {//s节点和m节点匹配配对时候唤醒节点
                                casHead(s, mn);     // 匹配成功,将s 、 m弹出,完成交易之后将两个节点一起弹出,并且返回交易的数据。
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // 如果没有匹配成功m有可能取消了,那么就需要把m从栈中移除。s继续跟mn做交易  没有人指向m就会被回收。
                        }
                    }
//如果栈顶已经存在一个模式为FULFILLING的节点,说明栈顶的节点正在进行匹配,那么就帮助这个栈顶节点快速完成交易,然后继续交易。
                } else { 
                    SNode m = h.next;    // h=81,m=62,h是交易节点,m是被配对节点,
                    if (m == null) // m == null ,执行到这一行时候,配对已经被别的帮助线程或者交易节点线程自己执行完了,
                        casHead(h, null);            
                    else {
                        SNode mn = m.next;//跟取节点差不多mn=56
                        if (m.tryMatch(h))    //62和81配对       
                            casHead(h, mn);         //帮助移除。h和m
                        else                       
                            h.casNext(m, mn);//帮助匹配失败,h的下一个节点变为下一个的下一个节点。  没有人指向m就会被回收。
                    }
                }
            }
        }
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?//第一个节点要自旋
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match; 
                if (m != null)
                    return m;//返回这个节点match到的节点
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
        boolean shouldSpin(SNode s) {//如果当前节点在栈顶,或者正在请求交易,那么就应该自旋//因为很可能立刻就会有新的线程到来,那么就会立刻进行交易而不需要进行阻塞,然后被唤醒,这是需要过程的,所以这样的自旋等待是值得的。
            SNode h = head;
            boolean b = isFulfilling(h.mode);
            return (h == s || h == null || isFulfilling(h.mode));
        }
        void clean(SNode s) {//clean()方法就是将head节点到S节点之间所有已经取消的节点全部移出。
            s.item = null;   // forget item
            s.waiter = null; // forget thread
            SNode past = s.next;//s=39,past=38
            //这个方法首先找到接下来的第一个不为null并且没有被取消交易的节点past,然后设置新的head节点,
            if (past != null && past.isCancelled())
                past = past.next;
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);//p是从头节点开始第一个不移除的节点
            while (p != null && p != past) {//p到past之间的都要移除
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);//移除节点n
                else
                    p = n;//修改p指针
            }
        }
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        static {
            try {
                UNSAFE = getUnsafe();//sun.misc.Unsafe.getUnsafe();
                Class<?> k = TransferStack.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    private transient volatile Transferer<E> transferer;
    public SynchronousStack() {
        this(false);
    }
    public SynchronousStack(boolean fair) {
        transferer = fair ? new TransferStack<E>() : new TransferStack<E>();
    }
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

    public E poll() {
        return transferer.transfer(null, true, 0);
    } 
    
    private static sun.misc.Unsafe getUnsafe() {
        try {
            return sun.misc.Unsafe.getUnsafe();
        } catch (SecurityException tryReflectionInstead) {}
        try {
            return java.security.AccessController.doPrivileged
                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
                        public sun.misc.Unsafe run() throws Exception {
                            Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
                            for (java.lang.reflect.Field f : k.getDeclaredFields()) {
                                f.setAccessible(true);
                                Object x = f.get(null);
                                if (k.isInstance(x))
                                    return k.cast(x);
                            }
                            throw new NoSuchFieldError("the Unsafe");
                        }});
        } catch (java.security.PrivilegedActionException e) {
            throw new RuntimeException("Could not initialize intrinsics",
                    e.getCause());
        }
    }
}

SynchronousQueue------TransferStack源码分析