Rxjava(其他)--doOnSubscribe原理

时间:2022-02-10 17:47:52

doOnSubscribe一般用于执行一些初始化操作,我们看其实现原理

demo

      Observable.just(1).doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println("<<<<<<subscribe thread id = " + Thread.currentThread().getId());
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("<<<<<<subscribe thread id = " + Thread.currentThread().getId
());
}
});
doOnSubscribe用于在call之前执行一些初始化操作

我们看下其实现原理

 public final Observable<T> doOnSubscribe(final Action0 subscribe) {
return lift(new OperatorDoOnSubscribe<T>(subscribe));
}
新建了一个OperatorDoOnSubscribe并调用lift

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
创建了一个OnSubscribeLift,然后当我们订阅的时候会调用它的call方法
 public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
这里的operator就是我们前面的OperatorDoOnSubscribe,这里调用它的call方法

  public Subscriber<? super T> call(final Subscriber<? super T> child) {
subscribe.call();
// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return Subscribers.wrap(child);
}

这里的subscribe就是我们的doOnSubscribe的回调函数,当有多个doOnSubscribe时,在调用OnSubscribeLift的parent.call(st)时会先调用他的doOnSubscribe的回调,所以doOnSubscribe在后的先调。