RxJava的订阅过程
1.例子代码
Observable.create(new Observable.OnSubscribe<String>() {
// # 最终在call方法中完成订阅 #
@Override
public void call(Subscriber<? super String> subscriber) {
// 方式一
subscriber.onNext("lichaojun");
subscriber.onNext("lichaojun1");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
// 事件队列完结
}
@Override
public void onError(Throwable e) {
// 事件队列异常。在处理事件异常时,onError会触发,同时队列自动终止,不允许再有事件发出
}
@Override
public void onNext(String s) {
// 普通的事件,将要处理的事件添加到事件队列中
}
});
2.Observable.create()
// 创建一个Observable对象
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
// 其中RxJavaHooks会返回OnSubscribe对象
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
// 将OnSubscribe对象赋值给Observable对象的成员变量
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
- Observable.subscribe()订阅
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
// 此处调用onStart方法,是在事件还没有发送前被调用,可以做一些准备工作
subscriber.onStart();
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// 此处的SafeSubscriber对传入自定义的subscriber做了进一步的
// 封装。比如在onCompleted和onError方法调用时不会再调用onNext
// 方法,并且保证onCompleted和onError方法只有一个被执行
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// 此处的call方法就是前面自己定义的OnSubscribe对象的call方法
// 在上面的例子中,会将call方法中调用subscriber方法的onNext,onCompleted方法
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}