Rxjava2源码浅析(三)

时间:2022-12-28 10:00:41

首先开始填坑,上篇文章最后的问题还没有解决,subscribeOn是如何切换线程的。

先回顾一下:
使用方法:

 observable.subscribeOn(Schedulers.newThread());

我们直接看它重写的abstract方法subscribeActual

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}

看到了一个熟悉的Runnable是不是瞬间热泪盈眶?这里就看到了在一个Runnable中订阅了事件,由于是接口回调,所以observable中的事件是运行在这个线程的,而observer回调接口的时候就要看具体的observeOn是什么参数了。

跟踪这里的schedulerDirect方法。

 @NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);

return w;
}

而这个creatework()的具体实现类在HandlerScheduler中。

@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
private final Handler handler;

private volatile boolean disposed;

HandlerWorker(Handler handler) {
this.handler = handler;
}

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposables.disposed();
}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}

return scheduled;
}

@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}

@Override
public boolean isDisposed() {
return disposed;
}
}

而这里的handler是ObserveOn的时候new Handler(Looper.getMainLooper())时候创建的,运行在主线程。
所以这里通过handler发送一个带有Runnable的消息,完成了new Thread和Main Thread的线程切换。