简介
在没有给定调度器(Scheduler)的情况下,Subscription将默认(产生事件与订阅)运行于调用线程上。
线程调度器(Scheduler)是将RxJava从同步观察者模式转到异步观察者模式的一个重要工具。
RxJava提供了5种主要的调度器:
- Scheduler Schedulers.io()
- Scheduler Schedulers.computation()
- Scheduler Schedulers.immediate()
- Scheduler Schedulers.newThread()
- Scheduler Schedulers.trampoline()
还有可用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form()
Schedulers.io()
内部创建一个rx.internal.schedulers.CachedThreadScheduler。底层实现是一个java中的ScheduledThreadPoolExecutor (extends ThreadPoolExecutorimplements ScheduledExecutorService)
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
corePoolSize=1, DEFAULT_KEEPALIVE_MILLIS=10L, DelayedWorkQueue是一个二叉树结构实现的BlockingQueue
整体还是一个*(即容量特别大)的队列实现
例如,存储Bitmap到本地时,可以直接在Schedulers的io线程中执行任务:
public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
});
}
Schedulers.computation()
内部是由 rx.internal.schedulers.EventLoopsScheduler 实现的。
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认
调度器: buffer()、debounce()、delay()、interval()、sample()、skip()
Schedulers.immediate()
内部创建一个rx.internal.schedulers.ImmediateScheduler。 这个调度器允许你立即在当前线程执行指定的工作。
它是 timeout()、timeInterval() 及 timestamp() 方法默认的调度器
Schedulers.newThread()
内部创建一个rx.internal.schedulers.NewThreadScheduler。一底层跟Schedulers.io()一样是由java的ScheduledThreadPoolExecutor实现。
它为指定任务启动一个新的线程
Schedulers.trampoline()
内部创建一个rx.internal.schedulers.TrampolineScheduler。运行在当前线程。当有新任务时,并不会立即执行,而是将它加入队列PriorityBlockingQueue中,直到运行任务执行完成后,才从队列中按序取出一个继续执行。
它是repeat()和retry()默认的调度器
用于测试的调度器Schedulers.test()
(some from http://blog.csdn.net/siguoyi/article/details/51849964)
创建一个rx.schedulers.TestScheduler。这是一个公开的可访问的类。也可以直接使用无参构造方法,new出一个实例。
主要提供如下三个方法,来对调度器的时钟表现进行手动微调,这对依赖精确时间安排的任务的测试很有用处。
- advanceTimeBy(time,unit) 将调度器时时钟,前进一个指定时间。这是相对操作
- advanceTimeTo(time,unit) 将调度器时钟拨动到一个指定的时间。 这个是绝对操作
- triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间
假定当前时间为0, 先advanceTimeBy(2, TimeUnit.SECONDS)再advanceTimeTo(2, TimeUnit.SECONDS),那么现在时间还是2。若反过来调用,那么现在时间就是4b了
自定义Scheduler—-Schedulers.form()
使用Schedulers.form(java.util.concurrent.Executor executor) ,来自定义Scheduler
subscribeOn()和observeOn()
subscribeOn()和observeOn() 是用来指定事件生产与订阅在哪个线程执行的。
- 默认没有定义observeOn、subscribeOn,即运行于当前线程
- subscribeOn 指定 订阅事件发生(OnSubscribe)的线程。若仅出现它,不出现observeOn, 还会影响其它所有事件
- observeOn 指定 在其之后的所有事件发生的线程,即使后面出现了 subscribeOn
- 若两者同时出现,subscribeOn 影响 observeOn 出现前的所有事件 及 OnSubscribe 事件
示例:
Action1 action = (Action1<String>) s ->
System.out.println("test-Observer: " + Thread.currentThread().getName() + ", " + s);
Observable.Transformer<Integer, String> transformer = integerObservable ->
integerObservable.map((Func1<Integer, String>) integer ->
"test-tran.call: " + Thread.currentThread().getName() + ", " + integer);
Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println( "test-OnSubscribe.call: " + Thread.currentThread().getName());
subscriber.onNext(9); }).subscribeOn(io()).observeOn(AndroidSchedulers.mainThread()).compose(transformer).subscribe(action);
可*变换subscribeOn、observeOn出现的位置,观察影响哪部分运行的线程