6.1 RxJava1.x线程调度原理分析

时间:2022-08-16 15:40:10

欢迎大家加入QQ群一起讨论: 489873144(android格调小窝)
我的github地址:https://github.com/jeasonlzy

1. 回调线程小例子

在讲线程调度前,首先看这么一段代码,点击一个按钮,开启一个线程,在线程内部执行一个回调,那么打印的结果是多少?
6.1 RxJava1.x线程调度原理分析

结果如下,反应出一个问题,只要是在子线程中调用的方法,无论回调还是方法本身,都在子线程中执行。

System.out: main:main
System.out: onNext:Thread-69327
System.out: testCallback:Thread-69327

2. Scheduler调度器

RxJava提供了5种调度器:

  • Schedulers. io();
    这个调度器时用于I/O操作。是可以根据需要增长或缩减来自适应的线程池。由于它专用于I/O操作,所以并不是RxJava的默认方法,需要开发者自己正确的使用。重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。
  • Schedulers.computation();
    这个是计算工作默认的调度器,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers. io());默认线程数等于处理器的数量。以下方法默认使用该调度器:buffer(),debounce(),delay(),interval(),timer(),replay(),sample(),skip(),skipLast(),take(),takeLast(),takeLastBuffer(),throttleFirst(),throttleWithTimeout(),window()。
  • Schedulers.immediate();
    这个调度器允许你立即在当前线程执行你指定的工作。以下方法默认使用该调度:timeout(),timelnterval(),timestamp()。
  • Schedulers.newThread();
    为指定任务启动一个新的线程。
  • Schedulers.trampoline();
    当其它排队的任务完成后,在当前线程排队开始执行。以下方法默认使用该调度:repeat(),retry()。
  • Schedulers.from();
    使用指定的Executor作为调度器。

如果我们调度器想创建一个异步任务可以使用如下代码,一共就三步:
1. 获取一个线程调度器
2. 创建一个执行线程的worker对象
3. 使用worker去执行一个任务,并返回一个Subscription对象
4. 如果不想执行了,可以用上一步的Subscription可以取消订阅

Scheduler io = Schedulers.io();
Scheduler.Worker worker = io.createWorker();
Subscription subscription = worker.schedule(new Action0() {
@Override
public void call() {
SystemClock.sleep(3000);
showToast("异步执行");
}
});
subscription.unsubscribe();

那么Scheduler内部的原理是如何实现的呢?我们来分析源码,为了简单起见,我们先看newThread这个方法的原理。

先看方法内部实现:
6.1 RxJava1.x线程调度原理分析

简单的通过单利获取了一个静态成员变量newThreadScheduler,那么这个变量的初始化在哪呢?
我们在构造函数中找到了
6.1 RxJava1.x线程调度原理分析

发现最后创建了一个Scheduler,如何创建的呢?跟进去
6.1 RxJava1.x线程调度原理分析
6.1 RxJava1.x线程调度原理分析

简单的传进来一个工厂,创建了一个Scheduler,这个工厂是什么呢?
6.1 RxJava1.x线程调度原理分析

代码也很简单,其实目的就一个,想实现一个带有前缀名的Thread,并且命名能都自增长,就这么一个简单目的,所以,核心还是 NewThreadScheduler 这个类,我们继续看代码
6.1 RxJava1.x线程调度原理分析

发现依然没有几行代码,也是简单的创建一个Worker,再跟,终于发现了核心代码,根据我们传递的工厂,创建了一个核心线程池数量为1的线程池,并且赋值为executor
6.1 RxJava1.x线程调度原理分析

我们知道获取Worker之后就是调用schedule方法,那么这个方法是如何实现的呢?核心代码也就一行,把我们传进去的Action0对象包装成一个ScheduledAction对象,其实也就一个Runnable对象,然后提交到线程池执行,就结束了。
6.1 RxJava1.x线程调度原理分析

我们一路看下来并没有很困难的地方,其实他就做了一件事,让我们传递进去的对象在指定的环境下执行,这个环境可以是线程池,可以是队列,也可以是什么都不用直接执行等,不同的策略也就衍生了不同的方法。

其余关于Schedulers. io(),Schedulers.computation(),Schedulers.immediate(),Schedulers.trampoline()原理都一样,唯一的区别就是生产线程池的策略不一样,或者不用线程池,使用队列实现。

我们常用的还有个 RxAndroid 中的AndroidSchedulers.mainThread(),其实原理是一样的,把我们传递的Action对象包装成Runnable之后,通过Handler发送到主线程。代码也很简单,我们将在下一篇详细分析RxAndroid的原理。

3. RxJava线程调度

对于RxJava来讲,有以下两个方法来做线程调度:
- observerOn():指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,也就是回调发生的线程
- subscribeOn():更进一步,它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

看起来不太理解意思,我们先看一张图,这里感谢抛物线大大,下图参考他讲解的例子重新制作给 Android 开发者的 RxJava 详解,这张图的每个方法的颜色表示不同的线程,右边是代码,左边是执行顺序,对于这个例子我要知道以下几点注意事项:

  1. Observable创建完成后,如果没有调用subcribe方法,是不会有事件发出的,也就是create里面的方法是不会执行的
  2. 当开始订阅后事件首先是从最底层一层一层的向上通知的,当事件通知到最顶上的一级后才真正的发出
  3. 发送的事件会依次向下传递,直到到达最后的观察者,也就是我们传递的那个observer/subcriber对象

6.1 RxJava1.x线程调度原理分析

上图解释如下:
1. ①和②受第一个 subscribeOn() 影响,运行在红色线程;
2. ③和④处受第一个 observeOn() 的影响,运行在绿色线程;
3. ⑤处受第二个 onserveOn() 影响,运行在紫色线程;
4. 而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn()截断,因此对整个流程并没有任何影响。这里也就是说当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

这两个方法很神奇,一行代码就切换线程了,是如何做到的呢,我们来分析下。

3.1 subscribeOn

我们先看发起通知的部分,也就是subscribeOn的分析。

进入源码,看到这个方法的实现如下:
6.1 RxJava1.x线程调度原理分析

create只是简单的创建的一个Observable并返回
6.1 RxJava1.x线程调度原理分析

那么重点我们是看OperatorSubscribeOn里面的实现如下,可以看出来就是让事件在传递进的调度器中来发射,到这里就解释了为什么subscribeOn影响的是发射数据之前的线程调度操作。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
// 获取传递进来的调度器
final Worker inner = scheduler.createWorker();
// 在这个调度器中执行一个任务
inner.schedule(new Action0() {
@Override
public void call() {
// 这里用一个新的观察者去包装了一下以前的观察者
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
}
// 订阅新生成的观察者
source.unsafeSubscribe(s);
});
}
}
}

3.2 onserveOn

有了前面的分析再看这个,肯定也是一个道理是不,我们进入onserveOn的实现:
6.1 RxJava1.x线程调度原理分析
6.1 RxJava1.x线程调度原理分析
6.1 RxJava1.x线程调度原理分析
我们看到层层调用,最后采用了lift变换,说到底就是个特殊的自定义操作符是吧,有了我们第三部分讲解的自定义操作符的原理,看这个一点不困难,毫无疑问核心逻辑在OperatorObserveOn这个类中。

还需要注意一下的是这里面有个RxRingBuffer.SIZE,我们看源码,如果是安卓平台,大小是16,这个值是背压的缓存区,关于什么是背压,签名第四节也讲的很清楚了,不知道的回去再看看。
6.1 RxJava1.x线程调度原理分析

那么我们应该可以估摸着,这个onserveOn就是一个实现了背压的,缓存大小区为16的自定义操作符。

那么我我们继续看OperatorObserveOn中的实现,细节在注释里面

public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {

private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
// 我们发现调取器如果是 immediate 或者 trampoline,直接就返回了
if (scheduler instanceof ImmediateScheduler) {
// 避免开销,直接执行
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// 避免开销,直接执行
return child;
} else {
// 这里创建了一个新的内部类,实现了操作符的逻辑
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
}

我们继续看真正的操作符逻辑,进入ObserveOnSubscriber,以下代码是精简省略了背压相关逻辑后,只有线程调度相关的代码,总结起来就是:
1. 在观察者的 onNext,onError,onCompleted三个方法中分别都去调度线程,并且在onNext中,把数据入队
2. 执行call方法,该方法是一个无线循环去队列中取出数据
3. 根据取出数据的结果来判断该执行传入的目标观察者(也就是child)该执行三个方法中的哪一个,需要注意的是,这个时候执行的call方法已经是被线程调度过了,也就是child的三个方法的也会在调度完成后的线程中执行

这样,通过这么三步就完成了observerOn的线程调度,需要注意的是,我们的代码没有分析背压策略,加入背压分析的话会看起来比较凌乱,如果有兴趣可以自己看源码,observerOn的背压就是观察者先通知生产者发送并缓存最多16条数据,全部执行完成后,再通知生产者继续发送最多16条数据,避免数据来不及处理。

private static final class ObserveOnSubscriber<T> implements Observer<T>, Action0 {

private AtomicLong requested = new AtomicLong();
private AtomicLong counter = new AtomicLong();
private SpscAtomicArrayQueue<T> queue = new SpscAtomicArrayQueue<>(16);
private volatile boolean finished;
private Exception error;

private Scheduler.Worker worker;
private Observer<T> child;

private ObserveOnSubscriber(Scheduler scheduler, Observer<T> observer) {
worker = scheduler.createWorker();
child = observer;
requested.set(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
queue.offer(t);
schedule();
}

@Override
public void onError(Exception e) {
error = e;
finished = true;
schedule();
}

@Override
public void onCompleted() {
finished = true;
schedule();
}

@Override
public void call() {
long currentEmission = 0;
for (; ; ) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
T t = queue.poll();
if (finished) {
if (error != null) {
queue.clear();
child.onError(error);
return;
} else if (t == null) {
child.onCompleted();
return;
}
}
if (t == null) break;
child.onNext(t);
currentEmission++;
}
}
}

private void schedule() {
if (counter.getAndIncrement() == 0) {
worker.schedule(this);
}
}
}

好到这里,我们的线程调度分析就完成了,其实看起来很神奇的方法,在搞清楚原理后其实也并不是多复杂,总要的是思想,有了思想就能用简单的代码实现神奇的功能。

如果你觉得好,对你有过帮助,请给我一点打赏鼓励吧,一分也是爱呀!

6.1 RxJava1.x线程调度原理分析