RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

时间:2022-03-17 14:45:10

Github 相关代码: Github地址

一直感觉 RxJava2 的取消订阅有点混乱, 这样也能取消, 那样也能取消, 没能系统起来的感觉就像掉进了盘丝洞, 迷乱…
下面说说这几种情况
RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )


几种取消的情况

  1. subscribe 时返回了 disposable:
    RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

  2. subscribe 不返回 disposable, 从 observer 的 onSubscribe 中获取:
    RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

  3. 之前从网上看的, 使用继承 DisposableObserver 的 observer, 这个 observer 可以直接 dispose
    RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

之前刚了解到到这几种方式的时候表情是这样的RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )
今天打起精神, 看了点源码, 搞懂了这到底是什么鬼.


源码分析

啰嗦啥啊, 这么简单的东西还需要贴源码?
大哥, 下面有总结….

第一种情况开始看, 我们进入到 .subscribe((s) -> {}) 中看, 发现它是返回了一个四参数的重载方法

    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ...
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }

可以看到, 这个方法里创建了一个 LambdaObserver, 这个 Observer 实现了Disposable 接口, 所以可以直接作为 Disposable 返回到最上级, 这就是为什么第一种情况中的 subscribe 能返回 disposable 的原因.

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, LambdaConsumerIntrospection {
            ...
}

第二种情况subscribe 其实就是上面方法里的 subscribe(LambdaObserver)

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

这个方法内执行了 subscribeActual(observer) 抽象方法, 交给了 Observale 的子类重写.

第三种情况 中出现的 DisposableObserverLambdaObserver 差不多, 甚至更简单

public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
    final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

    @Override
    public final void onSubscribe(@NonNull Disposable s) {
        if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
            onStart();
        }
    }

    protected void onStart() { }

    @Override
    public final boolean isDisposed() {
        return s.get() == DisposableHelper.DISPOSED;
    }

    @Override
    public final void dispose() {
        DisposableHelper.dispose(s);
    }
}

简单总结

稍微一看我们就明白了, 当传入 Observer 接口的四个方法时, subscribe 在内部构建了一个 LambdaObserver , 而这个 LambdaObserver 和第三种情况的 DisposableObserver 都实现了 Disposable 接口, 所以可以作为 Disposable 返回, 就是这么简单.

另外第三种情况里出现的 CompositeDisposable, 简单说就是一个 Disposable 集合( 由 RxJava 内部提供的OpenHashSet 维护, 线程安全 ), CompositeDisposable.dispose() 时会遍历内部的所有 Disposable 执行 dispose 操作.

    /**
     * Dispose the contents of the OpenHashSet by suppressing non-fatal
     * Throwables till the end.
     * @param set the OpenHashSet to dispose elements of
     */
    void dispose(OpenHashSet<Disposable> set) {
        ...
        for (Object o : array) {
            if (o instanceof Disposable) {
                try {
                    ((Disposable) o).dispose();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(ex);
                }
            }
        }
        ...
    }

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )


几种方式的适用情况

  1. 如果是零星使用的话, 第一种最方便, observer 的四个方法可以按需使用, 相同逻辑的方法有多种可供选择
    RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

  2. 如果使用的 Observer 有很多共同逻辑, 则可以写一个 BaseObserver 继承 DisposableObserver 或者 LambdaObserver, 直接使用 xxObserver.dispose()

    open class BaseObserver<T>() : DisposableObserver<T>()
  3. 如果有很多的 diposable 需要取消的话, 使用 CompositeDisposable 会更简单一些

更加深入

如果只是看如何 取消订阅, 上面的就已经够了, 不过你们不好奇 DisposableObserverLambdaObserver 的区别么? 网上流行的 RxLifescycle库 又是怎么做的呢?

我后面有时间再写
RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )