首先,异步请求或者读取数据是平时开发中很常见的一个功能,在数据获取成功之后需要显示到主线程的UI上的时候平时我们都是通过Handler
进行转换。
通常的写法:
// 伪代码实现类似
new Thread(new Runnable() {
void run() {
// 通过Handler实现转换
handler.sendMessage()...
}
});
Handler handler = new Handler() {
void handleMessage() {...}
}
相比RxJava的链式不断开的写法:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// 得到对应的数据..
}
});
在RxJava中就是通过subscribeOn observeOn
这两个方法实现线程到主线程之间的切换,这是一段最基础的线程切换写法 所以本文的目的就是剖析这两个方法中到底做了什么…
各种Observable生成分析
一开始我们依然先进入Observable.create
在之前的源码分析分析过一次他是直接返回了一个ObservableCreate
这里最好先记清楚,因为后面会返回各种各样的Observable
对象:
// 这是一开始调用的方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// 这是返回的类型
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
暂时没有什么阻碍自然就进入到subscribeOn
方法:
// 调用的方法
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
// subscribeOn返回的类型
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
AbstractObservableWithUpstream<T, U> extends Observable<U>
所以就是说subscribeOn
这个方法其实又生成了一个Observable
,好吧,看他能生成多少个继续分析下去;进入observeOn
:
// 调用observeOn方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// 返回类型
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
很好又返回了一个ObservableObserveOn
;没错这也是一个新的Observable
,但是我们的observerOn
方法是在subscribeOn
之后调用的也就是说observeOn
最后new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
传入的this
对象也就是ObservableObserveOn
中的source
变量就是ObservableSubscribeOn
对象;这里一层一层的Observable
可别混淆了。
Schedulers简单分析
public abstract class Scheduler {
public abstract Worker createWorker();
public abstract static class Worker implements Disposable {
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
}
首先需要看一下Scheduler
这是一个抽象类,我列取了里面两个需要被重写的抽象方法和抽象类,Schedulers
其实里面都是常量说白了就是再封装一层可更直接调用里面声明好的线程池。
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
}
我们看一眼我们例子中用的Schedulers.IO
,其实就是类IoScheduler
:
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
这里主要就是封装了一个线程池CachedWorkerPool
类,当然具体线程池怎么封装展开说明感觉篇幅会很长,先略过;基本上大家都看的懂。
最后的subscribe分析
因为呢;subscribe
这个方法写在observeOn
的后面自然一开始被执行的Observable
就是ObservableObserveOn
,最后会执行如下方法:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
进入scheduler.createWorker
由于这里的scheduler
是AndroidSchedulers
:
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
这回明白为什么刚才要先讲Scheduler
常见的两个方法,因为这两个方法是我们最关心的。
进入source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
这个方法中传入的observer
是我们在Activity中传入的数据接收observer
,这时候执行source.subscribe
就会执行ObservableSubscribeOn.subscribeActual
方法:
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
我们主要看scheduler.scheduleDirect(new SubscribeTask(parent))
,要清楚的是SubscribeTask
这是一个Runnable
:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
很自然最后还是执行Worker.schedule
:
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
这里主要是进入最后一行的代码threadWorker.scheduleActual
:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
一看到executor.submit
我们就清楚了这里就是放入线程池去执行异步操作了,所以这时候当然是要看Runnable
也就是:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
好空的一句话:source.subscribe(parent)
还记得我们之前说了这里的source
也就是最早声明的ObservableCreate
对象,也就是会执行我们在Activity中写的ObservableOnSubscribe
相关的方法:
// 我们就可以在subscribe中做一些异步的代码
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}
到了这步我们就很清楚接下来是干嘛的了,自然整个流程我们就明白了所以总结一下:
从左到右然后又从右到左形成了一个闭环的样子。
The end…