RxJava中的线程调度源码解析

时间:2021-03-06 04:07:22

首先,异步请求或者读取数据是平时开发中很常见的一个功能,在数据获取成功之后需要显示到主线程的UI上的时候平时我们都是通过Handler进行转换。
通常的写法:

// 伪代码实现类似
new Thread(new Runnable() {
void run() {
// 通过Handler实现转换
handler.sendMessage()...
}
});

Handler handler = new Handler() {
void handleMessage() {...}
}

相比RxJava的链式不断开的写法:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// 得到对应的数据..
}
});

在RxJava中就是通过subscribeOn observeOn这两个方法实现线程到主线程之间的切换,这是一段最基础的线程切换写法 所以本文的目的就是剖析这两个方法中到底做了什么…

各种Observable生成分析

一开始我们依然先进入Observable.create在之前的源码分析分析过一次他是直接返回了一个ObservableCreate这里最好先记清楚,因为后面会返回各种各样的Observable对象:

// 这是一开始调用的方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

// 这是返回的类型
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}

暂时没有什么阻碍自然就进入到subscribeOn方法:

// 调用的方法
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

// subscribeOn返回的类型
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}

AbstractObservableWithUpstream<T, U> extends Observable<U>所以就是说subscribeOn这个方法其实又生成了一个Observable,好吧,看他能生成多少个继续分析下去;进入observeOn

// 调用observeOn方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

// 返回类型
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}

很好又返回了一个ObservableObserveOn;没错这也是一个新的Observable,但是我们的observerOn方法是在subscribeOn之后调用的也就是说observeOn最后new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)传入的this对象也就是ObservableObserveOn中的source变量就是ObservableSubscribeOn对象;这里一层一层的Observable可别混淆了。

Schedulers简单分析

public abstract class Scheduler {

public abstract Worker createWorker();

public abstract static class Worker implements Disposable {
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
}

首先需要看一下Scheduler这是一个抽象类,我列取了里面两个需要被重写的抽象方法和抽象类,Schedulers其实里面都是常量说白了就是再封装一层可更直接调用里面声明好的线程池。

public final class Schedulers {
@NonNull
static final Scheduler SINGLE;

@NonNull
static final Scheduler COMPUTATION;

@NonNull
static final Scheduler IO;

@NonNull
static final Scheduler TRAMPOLINE;

@NonNull
static final Scheduler NEW_THREAD;

static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}

static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}

static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}

static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
}

我们看一眼我们例子中用的Schedulers.IO,其实就是类IoScheduler:

    public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}

这里主要就是封装了一个线程池CachedWorkerPool类,当然具体线程池怎么封装展开说明感觉篇幅会很长,先略过;基本上大家都看的懂。

最后的subscribe分析

因为呢;subscribe这个方法写在observeOn的后面自然一开始被执行的Observable就是ObservableObserveOn,最后会执行如下方法:

protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

进入scheduler.createWorker由于这里的schedulerAndroidSchedulers

public Worker createWorker() {
return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
private final Handler handler;
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposables.disposed();
}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}

return scheduled;
}
}

这回明白为什么刚才要先讲Scheduler常见的两个方法,因为这两个方法是我们最关心的。
进入source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))这个方法中传入的observer是我们在Activity中传入的数据接收observer,这时候执行source.subscribe就会执行ObservableSubscribeOn.subscribeActual方法:

public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

我们主要看scheduler.scheduleDirect(new SubscribeTask(parent)),要清楚的是SubscribeTask这是一个Runnable

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

很自然最后还是执行Worker.schedule

public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

这里主要是进入最后一行的代码threadWorker.scheduleActual:

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

一看到executor.submit我们就清楚了这里就是放入线程池去执行异步操作了,所以这时候当然是要看Runnable也就是:

final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

好空的一句话:source.subscribe(parent)还记得我们之前说了这里的source也就是最早声明的ObservableCreate对象,也就是会执行我们在Activity中写的ObservableOnSubscribe相关的方法:

// 我们就可以在subscribe中做一些异步的代码
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}

到了这步我们就很清楚接下来是干嘛的了,自然整个流程我们就明白了所以总结一下:

RxJava中的线程调度源码解析

从左到右然后又从右到左形成了一个闭环的样子。
The end…