ObservableEmitter事件发送规则
- Observable可以发送无限个onNext, 观察者也可以接收无限个onNext.
- Observable发送了一个onComplete(或者onError)后,可以继续发送onComplete(或者onError)后续事件,但观察者收到onComplete(或者onError)后不再接收事件。
- Observable可以不发送onComplete或onError.
- onComplete和onError必须唯一并且互斥(比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.)
由此可见,onComplete(或者onError)针对观察者,控制观察者的事件的接受。
ThreadScheduler线程调度
subscribeOn() 指定的是被观察者发送事件的线程, observeOn() 指定的是观察者接收事件的线程.
多次指定被观察者的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定观察者的线程是可以的, 也就是说每调用一次observeOn() , 观察者的线程就会切换一次.
即:多次指定事件发送线程只有第一次有效,多次指定事件接受线程每次奏效。
执行结果:
在RxJava中, 常见内置线程如下:
Schedulers.io()
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation()
代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread()
代表一个常规的新线程
AndroidSchedulers.mainThread()
操作符
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里. map只是单个发送事件的Observable转换为一个Observable。
注意:flatmap并不能保证合并后的Observable中的各个转换后的Observables的顺序。要保证顺序则要使用concatMap
Zip通过一个函数将多个**Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个**Observable一样多的数据。
具体的转换分解:
使用场景:页面展示内容数据来源于多个接口并且当且仅当所有接口全部请求完成(得到数据)后才能展示。
Backpressure(背压)
控制事件发送流量。如Zip中的多个Observable可能存在事件发送速度不一致问题(一个Observable发送了100个事件,另一个只发送了1个事件),此时发送快的Observable事件还需要等待发送慢的Observable实事件来组合。Zip本身对每一个Observable做了缓存(队列实现)。显然缓存区的容量是有限的,这就要用Backpressure去控制源头解决。不然观察者接受事件的速度远远小于被观察者发送事件的速度,撑爆缓存区,导致OOM(异步订阅情况下发生)。
说明:同步的订阅关系, 也就是说上被观察者每发送一个事件必须等到观察者接收处理完了以后才能接着发送下一个事件.
同步:
异步:
继续,解决在异步订阅的情况下OOM问题,可以想到的方案如下:
-
减少被观察者发送事件的数量
使用操作符filter()添加过滤条件
使用操作符sample()指定时间间隔接受事件
弊端:丢失部分事件
2.控制被观察者发送事件速度
Flowable
在Flowable的世界里,Observable和Observer,分别被替代为Flowable和Subscriber.subscribe还是当年的subscribe。
BackpressureStrategy.ERROR在发送事件和接收事件不均衡时直接抛异常(MissingBackpressureException)。
void request(long n)
request当成观察者处理事件的能力, 观察者能处理几个就告诉上游我要几个,比如Subscription.request(1)表示观察者只接受被观察者发送的前1个事件,Subscription.request(100)表示观察者只接受被观察者发送的前100个事件。
注意:Flowable里默认有一个大小为128(buffersize事件个数)的缓存, 在异步订阅时, 被观察者就会先把事件发送到这个缓存中, 只有当观察者调用request时, 才从缓存中取出事件发给观察者.
继续:显然128的数量限制不能满足要发送1000个时间的需求,真是可以借助背压的其它策略,如BackpressureStrategy.BUFFER(增大缓存数量),及数量控制策略:BackpressureStrategy.DROP
(丢弃存不下的事件)和BackpressureStrategy.LATEST(保留最新事件)。
BackpressureStrategy.DROP示例如下:
有些Flowable并不是自己创建的, 该怎么办呢?使用RxJava提供的方法 onBackpressureBuffer() ,onBackpressureDrop() ,onBackpressureLatest()
响应式拉取
观察者调用request(1)告诉被观察者自己能处理1个事件,但在那里告诉被观察者的呢?是在FlowableEmitter中的long requested()。