RxJava 学习记(三) —— 1.x 线程调度器Schedulers

时间:2021-04-29 02:14:20

简介


在没有给定调度器(Scheduler)的情况下,Subscription将默认(产生事件与订阅)运行于调用线程上。

线程调度器(Scheduler)是将RxJava从同步观察者模式转到异步观察者模式的一个重要工具。

RxJava提供了5种主要的调度器:

  • Scheduler Schedulers.io()
  • Scheduler Schedulers.computation()
  • Scheduler Schedulers.immediate()
  • Scheduler Schedulers.newThread()
  • Scheduler Schedulers.trampoline()

还有可用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form()
 

Schedulers.io()


内部创建一个rx.internal.schedulers.CachedThreadScheduler。底层实现是一个java中的ScheduledThreadPoolExecutor (extends ThreadPoolExecutorimplements ScheduledExecutorService)

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}

corePoolSize=1, DEFAULT_KEEPALIVE_MILLIS=10L, DelayedWorkQueue是一个二叉树结构实现的BlockingQueue

整体还是一个*(即容量特别大)的队列实现

例如,存储Bitmap到本地时,可以直接在Schedulers的io线程中执行任务:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
})
;
}

 

Schedulers.computation()


内部是由 rx.internal.schedulers.EventLoopsScheduler 实现的。

这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认
调度器: buffer()debounce()delay()interval()sample()skip()
 

Schedulers.immediate()


内部创建一个rx.internal.schedulers.ImmediateScheduler。 这个调度器允许你立即在当前线程执行指定的工作。

它是 timeout()timeInterval()timestamp() 方法默认的调度器
 

Schedulers.newThread()


内部创建一个rx.internal.schedulers.NewThreadScheduler。一底层跟Schedulers.io()一样是由java的ScheduledThreadPoolExecutor实现。

它为指定任务启动一个新的线程
 

Schedulers.trampoline()


内部创建一个rx.internal.schedulers.TrampolineScheduler。运行在当前线程。当有新任务时,并不会立即执行,而是将它加入队列PriorityBlockingQueue中,直到运行任务执行完成后,才从队列中按序取出一个继续执行。

它是repeat()retry()默认的调度器
 

用于测试的调度器Schedulers.test()


(some from http://blog.csdn.net/siguoyi/article/details/51849964)

创建一个rx.schedulers.TestScheduler。这是一个公开的可访问的类。也可以直接使用无参构造方法,new出一个实例。

主要提供如下三个方法,来对调度器的时钟表现进行手动微调,这对依赖精确时间安排的任务的测试很有用处。

  • advanceTimeBy(time,unit) 将调度器时时钟,前进一个指定时间。这是相对操作
  • advanceTimeTo(time,unit) 将调度器时钟拨动到一个指定的时间。 这个是绝对操作
  • triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间

假定当前时间为0, 先advanceTimeBy(2, TimeUnit.SECONDS)再advanceTimeTo(2, TimeUnit.SECONDS),那么现在时间还是2。若反过来调用,那么现在时间就是4b了
 

自定义Scheduler—-Schedulers.form()


使用Schedulers.form(java.util.concurrent.Executor executor) ,来自定义Scheduler
 

subscribeOn()和observeOn()


subscribeOn()observeOn() 是用来指定事件生产与订阅在哪个线程执行的。

  • 默认没有定义observeOn、subscribeOn,即运行于当前线程
  • subscribeOn 指定 订阅事件发生(OnSubscribe)的线程。若仅出现它,不出现observeOn, 还会影响其它所有事件
  • observeOn 指定 在其之后的所有事件发生的线程,即使后面出现了 subscribeOn
  • 若两者同时出现,subscribeOn 影响 observeOn 出现前的所有事件 及 OnSubscribe 事件

示例:

Action1 action = (Action1<String>) s -> 
System.out.println("test-Observer: " + Thread.currentThread().getName() + ", " + s);

Observable.Transformer<Integer, String> transformer = integerObservable ->
integerObservable.map((Func1<Integer, String>) integer ->
"test-tran.call: " + Thread.currentThread().getName() + ", " + integer);

Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println( "test-OnSubscribe.call: " + Thread.currentThread().getName());
subscriber.onNext(9); }).subscribeOn(io()).observeOn(AndroidSchedulers.mainThread()).compose(transformer).subscribe(action);

可*变换subscribeOn、observeOn出现的位置,观察影响哪部分运行的线程

 

GitHub示例


https://github.com/aa86799/RxJavaDemo