#RxJava2源码解析(2)

时间:2023-01-20 17:44:44

上一篇中,我们对最基本的使用方法进行了源码分析,可是这远远不能体现rxJava的强大,我们没有看到线程转换,没有看到强大的操作符,还记得我们想通过分析RxJava2源码所达到的目的么?有5个。
我们的目的:

  1. 知道源头(Observable)是如何将数据发送出去的。
  2. 知道终点(Observer)是如何接收到数据的。
  3. 何时将源头和终点关联起来的
  4. 知道线程调度是怎么实现的
  5. 知道操作符是怎么实现的

通过上一篇文章的介绍,已经解决了目的中的1、2、3,这篇文章中先分析最常使用的map操作符。

map的使用

老规矩,先附上使用方法,通过它来进行源码的分析。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1111");
                e.onComplete();
            }
        });
        observable.map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG,value+"");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

这里先简单介绍一下map操作符的作用,它将我们从源头传入的类型转为了另一种类型,这个很简单,不多解释,直接看源码分析。

map源码分析

create方法上一篇文章已经分析过,直接看map

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //这里是判null操作,我们不用管
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //返回值中,RxJavaPlugins.onAssembly()是一个hook方法,暂时不管,需要关注的是ObservableMap。
        //记住,他是一个observable
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

注释里解释的很清楚了,这里需要注意两点,第一,作为参数传进的function

public interface Function<T, R> {
    R apply(@NonNull T t) throws Exception;
}

它只有一个方法apply,就是将T转换成R,这个方法对于整个map进行类型的转换起到至关重要的作用。
第二,记住返回值中的ObservableMap它是一个observable。记住这两点,接着往下看new ObservableMap()。

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //存储source,他也是一个Observable
        super(source);
        //存储function
        this.function = function;
    }

接着就是subscribe方法,

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //真正subscribe的地方,这里的Observable取决于之前调
            //用了什么方法,如果是调用了map方法,这里就是ObservableMap
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

通过上面的分析我们知道返回的observable对象是ObservableMap。现在很明了了,看ObservableMap。

    public void subscribeActual(Observer<? super U> t) {
    //源头与终点产生联系的地方,在这个过程中运用了大量的装饰者模式
    //ObservableMap与MapObserver分别是对Observable与Observer的装饰
       source.subscribe(new MapObserver<T, U>(t, function));
    }

在这里t是我们创建的那个observer,然后通过new MapObserver传入到MapObserver中。在mapObserver中重写了onnext(),

        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                //hook方法忽略,重点关注mapper.apply(t),在这里讲我们传入的t转为了v.
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

这里的actual就是传进来的由我们new出来的observer,在里面通过v=mapper.apply(t)进行了类型的转换,如果用我们的例子来说就是integer=mapper.apply(String)。这是我们observer的onNext的参数由最开始的t转换成了v,即actual.onNext(v),至此整个map的流程结束。

总结

其实map操作符很简单,最核心的方法就是function中的apply方法进行的类型转换。其实无论是map操作符,还是线程转换,因为都是对当前的observable进行包装装饰,我们就得把握到,我们是在哪个Observable中执行的subscribeActual方法的,这点很重要。

线程转换之subscribeOn的使用

使用subscribeOn我们就只增加一行代码,指定一个线程,在下面的例子中将线程转换到了io线程中,看到这里我们应当有一个疑问,这个io线程影响的范围到底是多大,接下来的分析中你们会找到答案。

 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1111");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());

Schedulers.io()相信大家都能理解,就是一个io线程。
重点看subscribeOn

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

通过前面的分析相信已经很清楚这两行代码了,一个是判null的,一个是hook,在我们需要new ObservableSubscribeOn(this, scheduler))这行代码中一定也是一个observable的装饰类,里面保存了我们的observable和scheduler信息。

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

果然,就是这样。通过之前对map以及第一篇文章的分析,我们知道subscribe方法后,最终会执行到我们这个ObservableSubscribeOn的subscribeActual方法,接着分析

    public void subscribeActual(final Observer<? super T> s) {
        //主要是将我们创建的Observer保存起来,其实也就是由SubscribeOnObserver来装饰我们的observer进行操作
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //SubscribeOnObserver是一个Disposable,这里传入的是Disposable
        s.onSubscribe(parent);
        //SubscribeTask继承自Runnable,在new SubscribeTask(parent)将SubscribeOnObserver传入,在在
        //run方法中进行注册的操作,即source.subscribe(parent)的操作,此时我们的注册以后的操作就会在subsrcribeon指定线程执行
        //在这里需要特别注意,我们虽然先创建的observer.create方法并进行一系列重写,可是由源码分析,那些
        // 方法是在我们调用subscribe后才开始执行的,也就是说订阅流程是由下游向上游传递的
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

代码中的注释粗略说完了整个过程,也许有点难以理解,我们还是一行行看下去分析,第一行代码很简单,就像注释所说那样,没什么好分析的,

        SubscribeOnObserver(Observer<? super T> actual) {
            //我们传入的observer保存起来
            this.actual = actual;
            //用于保存上游的Disposable,以便在自身dispose时,连同上游一起dispose
            this.s = new AtomicReference<Disposable>();
        }

第二行代码很简单,重点是第三行,首先看new SubscribeTask(parent),


    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

记住这里的run方法,这里的source是发射源,parent是我们创建的observer的装饰类,可以视为终点。从这以后重写的subscribe方法才开始执行,都是在subscribeOn指定的线程中

       @Override
       public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1111");
                e.onComplete();
        }

接着以后的操作就会在我们传入的scheduler中执行。我们继续看scheduler.scheduleDirect(new SubscribeTask(parent))

   public Disposable scheduleDirect(@NonNull Runnable run) {
        //由方法可知,立刻执行线程
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

依据名字和参数我们也能看出来,就是立即执行这个任务。

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

看这个DisposeTask task = new DisposeTask(decoratedRun, w);

static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

    }
}

在它的run方法中执行了我们刚刚的在SubscribeTask的run方法。接下来就不用在深入代码细节继续分析了。
最后,有一个在我们刚接触rxjava最容易忽略的问题是,整个订阅的流程是从下游到上游的,虽然e.onXXX方法写在了前面,可是当source.subscribe(parent)没有调动前这些方法都没有执行,所以当subscribe那一刻,才会有数据的发送。所以我们的e.onXXX方法是执行在subscribeOn指定的线程当中的。至此,整个subscribeOn的流程结束。

总结

  1. 首先切换了线程,再执行Observale.subscribe(observer)方法
  2. 我们知道整个事件调用的过程是从下游到上游,根据上一篇文章知道,上游发送数据仅仅是调用下游观察者对应的onXXX()方法而已,所以此时操作是在切换后的线程中进行。
  3. 最后是一个问题,为什么多次调用subscribeOn总会以第一次为准,或者说是以离数据源最近的那次为准,这个将在最后与observeOn的比较中来进行解释。

线程转换之observerOn的使用

使用observerOn只需要实现一行代码即可

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d(TAG,"currentThread="+Thread.currentThread().getName());
            e.onNext("11");
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io())
      //添加这个方法
      .observeOn(AndroidSchedulers.mainThread());

这个用法也很简单,没什么好讲的,接下来老规矩,随着方法进入源码分析

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
}

这里的Scheduler就是我们传入的线程

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

经过前几次的分析,这几行代码的套路我们已经很熟悉了,重点关注hook里的new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))
在new方法中也是保存了我们需要用到的一些参数,observable以及scheduler等。通过前几个操作符的分析,相信我们已经很清楚注册的流程最终会走到ObservableObserveOn的subscribeActual方法中

protected void subscribeActual(Observer<? super T> observer) {
    //false
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //ObservableSource,在这里由ObserveOnObserver来订阅上游数
        //据源,这样当数据从上游push下来,会由ObserveOnObserver对
        //应的onXXX()处理。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

注释已经解释的很清楚了,接下来看ObserveOnObserver对应的方法。首先看它里面比较重要的几个成员变量

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

    private static final long serialVersionUID = 6576896619930983584L;
    //真实的observer
    final Observer<? super T> actual;
    //我们传入线程对应的worker
    final Scheduler.Worker worker;
    final boolean delayError;
    final int bufferSize;
    //存放上游发送数据的队列
    SimpleQueue<T> queue;

接着看onXXX方法,我们以onNext为例

    public void onNext(T t) {
        if (done) {
            return;
        }
        //默认是同步的
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

因为默认是同步的,所以会执行queue.offer(t)这行代码将上游发送的数据存入队列中。schedule()会执行worker的schedule()方法。注意,这时候我们仍然在默认线程中,还未到observerOn方法设置的线程中,且我们真实observer的onNext方法还未调用,我们这个ObserveOnObserver它也是一个Runnable,所以我们可以看看他的run方法

    public void run() {
        //outputFused默认是false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }

所以接下来看drainNormal方法

    void drainNormal() {
        int missed = 1;
        //存储消息的队列
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (;;) {
                boolean d = done;
                T v;

                try {
                    //从队列中取出上游传来的数据
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {
                    return;
                }

                if (empty) {
                    break;
                }
                //在这里执行了我们new出来的真实observer的onnext方法
                a.onNext(v);
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

重要方法我已经进行了注释。接下来的问题是这个run方法是在什么时候执行,还记得我们onnext方法的最后一句么,会执行会执行worker的schedule(),一般来说就会在这个方法中执行,在我们写的案例中,传入的是AndroidSchedulers.mainThread()对应的Scheduler是HandlerScheduler,我们看他的createWorker方法,

public Worker createWorker() {
    return new HandlerWorker(handler);
}

创建的是HandlerWorker,所以看它的schedule方法,会有这样的一行ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
而这个ScheduledRunnable的run方法

    public void run() {
        try {
            delegate.run();
        } catch (Throwable t) {
            IllegalStateException ie =
                new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
            RxJavaPlugins.onError(ie);
            Thread thread = Thread.currentThread();
            thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
        }
    }

执行了我们ObserveOnObserver中的run,至此整个observerOn的流程结束。

subscribeOn与ObserveOn的比较

我们知道subscribeOn只有第一次调用起效果,ObserveOn每一次调用都起效果。那是因为每一次调用subscribeOn其实就是在包装一次observer与observable,无论包装多少次,都会以最里面一层也就是第一次调用subscribeOn那一层为主,所以只有第一次起效果。而ObserveOn是在subscribe后包装了observer,在observer的onXXX方法中进行的线程转换,所以每一次调用都有作用。下面两幅图其实就能很直观体现出来。#RxJava2源码解析(2)
#RxJava2源码解析(2)
相信这两幅图已经表示的比较清晰了。所以在使用上subscribeOn只使用一次,因为只有第一次才生效,observeOn可以调用多次,且每次都生效。