RxJava1.2源码解析(一)——订阅过程

时间:2021-04-07 14:44:48

RxJava的订阅过程
1.例子代码

Observable.create(new Observable.OnSubscribe<String>() {
          // # 最终在call方法中完成订阅 #
          @Override
          public void call(Subscriber<? super String> subscriber) {
              // 方式一
              subscriber.onNext("lichaojun");
              subscriber.onNext("lichaojun1");
              subscriber.onCompleted();
          }
      }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                // 事件队列完结
            }

            @Override
            public void onError(Throwable e) {
               // 事件队列异常。在处理事件异常时,onError会触发,同时队列自动终止,不允许再有事件发出
            }

            @Override
            public void onNext(String s) {
                // 普通的事件,将要处理的事件添加到事件队列中
            }
        });

2.Observable.create()

    // 创建一个Observable对象
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    // 其中RxJavaHooks会返回OnSubscribe对象
    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }
    // 将OnSubscribe对象赋值给Observable对象的成员变量
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
  1. Observable.subscribe()订阅
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }

        // 此处调用onStart方法,是在事件还没有发送前被调用,可以做一些准备工作
        subscriber.onStart();

        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // 此处的SafeSubscriber对传入自定义的subscriber做了进一步的
            // 封装。比如在onCompleted和onError方法调用时不会再调用onNext
            // 方法,并且保证onCompleted和onError方法只有一个被执行
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        try {
            // 此处的call方法就是前面自己定义的OnSubscribe对象的call方法
            // 在上面的例子中,会将call方法中调用subscriber方法的onNext,onCompleted方法
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }