Rxjava--subscribe流程(订阅过程)

时间:2022-03-17 14:44:40
 myObservable.subscribe(mySubscriber);

我们看到是 被观察者 订阅 观察者

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}


调用观察者的onStart,onStart是Subscriber提供的方法,

然后subscriber如果不是SafeSubscriber,就把它包装为SafeSubscriber

    public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}

这里同样去掉Hook,最终调用observable.onSubscribe.call(subscriber),onSubscribe就是我们前面所创建的被观察者的onSubscribe,所以这里调用了他的call函数

             public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}

这里调用观察者的onNext和onCompleted

 @Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle *
Exceptions.throwOrReport(e, this);
}
}

这里调用SafeSubscriber的onNext,调用actual的onNext

   public void onNext(String s) {
System.out.println(s);
}
onCompleted也是同样的原理。