先看一下类图结构
然后,看一下create
public static <T> ReplaySubject<T> create() { return create(16); }这里默认把容量设置为16,也即能保存到最新的16个状态
public static <T> ReplaySubject<T> create(int capacity) { if (capacity <= 0) { throw new IllegalArgumentException("capacity > 0 required but it was " + capacity); } ReplayBuffer<T> buffer = new ReplayUnboundedBuffer<T>(capacity); ReplayState<T> state = new ReplayState<T>(buffer); return new ReplaySubject<T>(state); }这里创建了一个ReplayUnboundedBuffer(无限大小的)
public ReplayUnboundedBuffer(int capacity) { this.capacity = capacity; this.tail = this.head = new Object[capacity + 1]; }capacity为16,head和tail设置为数组的头
然后以buffer为参数,创建一个ReplayState
public ReplayState(ReplayBuffer<T> buffer) { this.buffer = buffer; lazySet(EMPTY); }最后以state为参数创建一个ReplaySubject
2、我们看一下onNext
public void onNext(T t) { state.onNext(t); }
@Override public void onNext(T t) { ReplayBuffer<T> b = buffer; b.next(t); for (ReplayProducer<T> rp : get()) { b.drain(rp); } }
这里的buffer就是前面创建的 ReplayUnboundedBuffer
然后调用buffer的next函数
public void next(T t) { if (done) { return; } int i = tailIndex; Object[] a = tail; if (i == a.length - 1) { Object[] b = new Object[a.length]; b[0] = t; tailIndex = 1; a[i] = b; tail = b; } else { a[i] = t; tailIndex = i + 1; } size++; }
这里done为false,这里下面这个if走else分支,tailIndex默认是0,所以这里把值保存在a[0],tailIndex+1;size+1;
回调onNext,获取ReplayProducer,这里为0,所以直接返回。
3、subscribe流程
最终会调用到ReplayState的call函数
public void call(Subscriber<? super T> t) { ReplayProducer<T> rp = new ReplayProducer<T>(t, this); t.add(rp); t.setProducer(rp); if (add(rp)) { if (rp.isUnsubscribed()) { remove(rp); return; } } buffer.drain(rp); }
public ReplayProducer(Subscriber<? super T> actual, ReplayState<T> state) { this.actual = actual; this.requested = new AtomicLong(); this.state = state; }
回到call函数,调用setProducer
最终调用Producer的request
public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); state.buffer.drain(this); } else if (n < 0L) { throw new IllegalArgumentException("n >= required but it was " + n); } }n>0,调用drain
public void drain(ReplayProducer<T> rp) { if (rp.getAndIncrement() != 0) { return; } int missed = 1; final Subscriber<? super T> a = rp.actual; final int n = capacity; for (;;) { long r = rp.requested.get(); long e = 0L; Object[] node = (Object[])rp.node; if (node == null) { node = head; } int tailIndex = rp.tailIndex; int index = rp.index; while (e != r) { if (a.isUnsubscribed()) { rp.node = null; return; } boolean d = done; boolean empty = index == size; if (d && empty) { rp.node = null; Throwable ex = error; if (ex != null) { a.onError(ex); } else { a.onCompleted(); } return; } if (empty) { break; } if (tailIndex == n) { node = (Object[])node[tailIndex]; tailIndex = 0; } @SuppressWarnings("unchecked") T v = (T)node[tailIndex]; a.onNext(v); e++; tailIndex++; index++; } if (e == r) { if (a.isUnsubscribed()) { rp.node = null; return; } boolean d = done; boolean empty = index == size; if (d && empty) { rp.node = null; Throwable ex = error; if (ex != null) { a.onError(ex); } else { a.onCompleted(); } return; } } if (e != 0L) { if (r != Long.MAX_VALUE) { BackpressureUtils.produced(rp.requested, e); } } rp.index = index; rp.tailIndex = tailIndex; rp.node = node; missed = rp.addAndGet(-missed); if (missed == 0) { return; } } }
这里node为null所以会设置为head
Object[] node = (Object[])rp.node; if (node == null) { node = head; }前面我们已经在head (数组0位置)处添加了一个值
tailIndex为0
d跟empty都为False
T v = (T)node[tailIndex]; a.onNext(v);获取 tailIndex出的值,调用a.onNext,这里最终会调用到我们subscribe回调函数的onNext中。
然后回到drain
e++; tailIndex++; index++;最后
rp.index = index; rp.tailIndex = tailIndex; rp.node = node;回到ReplayState的call函数
调用add
boolean add(ReplayProducer<T> rp) { for (;;) { ReplayProducer<T>[] a = get(); if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") ReplayProducer<T>[] b = new ReplayProducer[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = rp; if (compareAndSet(a, b)) { return true; } } }添加一个新的ReplayProducer
3、再次调用onNext
public void onNext(T t) { ReplayBuffer<T> b = buffer; b.next(t); for (ReplayProducer<T> rp : get()) { b.drain(rp); } }
这时,由于前面添加了ReplayProducer,这里会掉头drain
在drain中,这次会获取tailIndex为1出的值,就是刚刚onNext添加的值,调用ReplayProducer的actual的onNext函数