Rxjava(Subject)-ReplaySubject--代码分析

时间:2022-02-19 17:47:30

先看一下类图结构

Rxjava(Subject)-ReplaySubject--代码分析

然后,看一下create

   public static <T> ReplaySubject<T> create() {
        return create(16);
    }
这里默认把容量设置为16,也即能保存到最新的16个状态

    public static <T> ReplaySubject<T> create(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("capacity > 0 required but it was " + capacity);
        }
        ReplayBuffer<T> buffer = new ReplayUnboundedBuffer<T>(capacity);
        ReplayState<T> state = new ReplayState<T>(buffer);
        return new ReplaySubject<T>(state);
    }
这里创建了一个ReplayUnboundedBuffer(无限大小的)

        public ReplayUnboundedBuffer(int capacity) {
            this.capacity = capacity;
            this.tail = this.head = new Object[capacity + 1];
        }
capacity为16,head和tail设置为数组的头

然后以buffer为参数,创建一个ReplayState

    public ReplayState(ReplayBuffer<T> buffer) {
            this.buffer = buffer;
            lazySet(EMPTY);
        }
最后以state为参数创建一个ReplaySubject

2、我们看一下onNext

  public void onNext(T t) {
        state.onNext(t);
    }

        @Override
        public void onNext(T t) {
            ReplayBuffer<T> b = buffer;

            b.next(t);
            for (ReplayProducer<T> rp : get()) {
                b.drain(rp);
            }
        }

这里的buffer就是前面创建的 ReplayUnboundedBuffer

然后调用buffer的next函数

public void next(T t) {
            if (done) {
                return;
            }
            int i = tailIndex;
            Object[] a = tail;
            if (i == a.length - 1) {
                Object[] b = new Object[a.length];
                b[0] = t;
                tailIndex = 1;
                a[i] = b;
                tail = b;
            } else {
                a[i] = t;
                tailIndex = i + 1;
            }
            size++;

        }

这里done为false,这里下面这个if走else分支,tailIndex默认是0,所以这里把值保存在a[0],tailIndex+1;size+1;

回调onNext,获取ReplayProducer,这里为0,所以直接返回。


3、subscribe流程

最终会调用到ReplayState的call函数

 public void call(Subscriber<? super T> t) {
            ReplayProducer<T> rp = new ReplayProducer<T>(t, this);
            t.add(rp);
            t.setProducer(rp);

            if (add(rp)) {
                if (rp.isUnsubscribed()) {
                    remove(rp);
                    return;
                }
            }
            buffer.drain(rp);
        }


以Subscriber为参数创建一个ReplayProducer
    public ReplayProducer(Subscriber<? super T> actual, ReplayState<T> state) {
            this.actual = actual;
            this.requested = new AtomicLong();
            this.state = state;
        }

回到call函数,调用setProducer

最终调用Producer的request

     public void request(long n) {
            if (n > 0L) {
                BackpressureUtils.getAndAddRequest(requested, n);
                state.buffer.drain(this);
            } else if (n < 0L) {
                throw new IllegalArgumentException("n >= required but it was " + n);
            }
        }
n>0,调用drain

 public void drain(ReplayProducer<T> rp) {
            if (rp.getAndIncrement() != 0) {
                return;
            }

            int missed = 1;

            final Subscriber<? super T> a = rp.actual;
            final int n = capacity;

            for (;;) {

                long r = rp.requested.get();
                long e = 0L;

                Object[] node = (Object[])rp.node;
                if (node == null) {
                    node = head;
                }
                int tailIndex = rp.tailIndex;
                int index = rp.index;

                while (e != r) {
                    if (a.isUnsubscribed()) {
                        rp.node = null;
                        return;
                    }

                    boolean d = done;
                    boolean empty = index == size;

                    if (d && empty) {
                        rp.node = null;
                        Throwable ex = error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onCompleted();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    if (tailIndex == n) {
                        node = (Object[])node[tailIndex];
                        tailIndex = 0;
                    }

                    @SuppressWarnings("unchecked")
                    T v = (T)node[tailIndex];

                    a.onNext(v);

                    e++;
                    tailIndex++;
                    index++;
                }

                if (e == r) {
                    if (a.isUnsubscribed()) {
                        rp.node = null;
                        return;
                    }

                    boolean d = done;
                    boolean empty = index == size;

                    if (d && empty) {
                        rp.node = null;
                        Throwable ex = error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onCompleted();
                        }
                        return;
                    }
                }

                if (e != 0L) {
                    if (r != Long.MAX_VALUE) {
                        BackpressureUtils.produced(rp.requested, e);
                    }
                }

                rp.index = index;
                rp.tailIndex = tailIndex;
                rp.node = node;

                missed = rp.addAndGet(-missed);
                if (missed == 0) {
                    return;
                }
            }
        }

这里node为null所以会设置为head
 Object[] node = (Object[])rp.node;
                if (node == null) {
                    node = head;
                }
前面我们已经在head (数组0位置)处添加了一个值

tailIndex为0

d跟empty都为False

T v = (T)node[tailIndex];

                    a.onNext(v);
获取 tailIndex出的值,调用a.onNext,这里最终会调用到我们subscribe回调函数的onNext中。

然后回到drain

 e++;
tailIndex++;
index++;
最后

                rp.index = index;
                rp.tailIndex = tailIndex;
                rp.node = node;
回到ReplayState的call函数

调用add

boolean add(ReplayProducer<T> rp) {
            for (;;) {
                ReplayProducer<T>[] a = get();
                if (a == TERMINATED) {
                    return false;
                }

                int n = a.length;

                @SuppressWarnings("unchecked")
                ReplayProducer<T>[] b = new ReplayProducer[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = rp;

                if (compareAndSet(a, b)) {
                    return true;
                }
            }
        }
添加一个新的ReplayProducer


3、再次调用onNext

       public void onNext(T t) {
            ReplayBuffer<T> b = buffer;

            b.next(t);
            for (ReplayProducer<T> rp : get()) {
                b.drain(rp);
            }
        }

这时,由于前面添加了ReplayProducer,这里会掉头drain

在drain中,这次会获取tailIndex为1出的值,就是刚刚onNext添加的值,调用ReplayProducer的actual的onNext函数