一、辅助操作符列表
用于处理Observable的操作符,例如延迟、定时等。
名称 | 解析 |
---|---|
materialize() | 将Observable转换成一个通知列表 |
dematerialize() | 将上面的结果逆转回一个Observable |
timestamp() | 给Observable发射的每个数据项添加一个时间戳 |
serialize() | 强制Observable按次序发射数据并且要求功能是完好的 |
cache() | 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者 |
observeOn() | 指定观察者观察Observable的调度器 |
subscribeOn() | 指定Observable执行任务的调度器 |
doOnEach() | 注册一个动作,对Observable发射的每个数据项使用 |
doOnCompleted() | 注册一个动作,对正常完成的Observable使用 |
doOnError() | 注册一个动作,对发生错误的Observable使用 |
doOnTerminate() | Observable终止之前会被调用,无论是正常还是异常终止 |
doOnSubscribe() | 注册一个动作,在观察者订阅时使用 |
doOnUnsubscribe() | 注册一个动作,在观察者取消订阅时使用 |
doOnNext() | 在onNext前执行 |
doAfterNext() | 在onNext之后执行 |
doAfterTerminate | 终止发送时候调用 |
doOnLifecycle | 可以在订阅之后 设置是否取消订阅 |
doFinally | 在最后执行 |
finallyDo() | 注册一个动作,在Observable完成时使用 |
delay() | 延时发射Observable的结果 |
delaySubscription() | 延时处理订阅请求 |
timeInterval() | 转换获取数据发送的时间间隔 |
using() | 创建一个只在Observable生命周期存在的资源 |
single() | 强制返回单个数据,否则抛出异常 |
singleOrDefault() | 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据 |
toFuture(),toIterable(),toList() | 将Observable转换为其它对象或数据结构 |
二、辅助操作符
2.1 delay操作符
整体延迟一段指定的时间再发射来自Observable的发射物。就是延迟。
它有6种方法参数:
- delay(Function):
- delay(long delay,TimeUnit unit): 指定延迟多长时间
- delay(long delay,TimeUnit unit,mScheduler scheduler): 指定延迟多长时间并添加调度器
- delay(long delay,TimeUnit unit,boolean delayError): 指定延迟多长时间。delayError参数如果为假 就直接抛出onError,为真就如常延迟执行。
- delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
- delay(ObservableSource ,Function):
看下面带delayError的栗子:
public void testDelay(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0; i<=3 ;i++){
if(i == 2){
e.onError(new Throwable("自定义的错误"));
}else{
e.onNext(i+"");
}
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
e.onComplete();
}
})
//.delay(3000, TimeUnit.MILLISECONDS)
//delayError参数如果为假就直接抛出onError,为真就如常延迟执行
.delay(3000,TimeUnit.MILLISECONDS,false)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "收到错误: " + throwable.toString());
}
});
}
输出内容为:
02-21 14:03:02.437 17903-17918/? E/AssistActivity: 收到错误: java.lang.Throwable: 自定义的错误
2.2 delaySubscription操作符
delaySubscription 和 delay的差别就是: delaySubscription只做一件事,延迟订阅,因为订阅了才能发送, 这样子发送的数据还是原来的数据。
2.3 do操作符
do操作符有很多个,但是这个很容易理解,就相当于生命周期,在啥时候调用。
例如doOnNext在onNext的时候回调。
ps: 有三个方法的顺序为: doOnComplete -> doFinally -> doAfterTerminate
栗子:
public void testDo(){
Observable.just("1","2")
.doOnNext(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "doOnNext: " + s);
}
})
.doAfterNext(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "doAfterNext: " + s);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
//订阅之后回调的方法
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
//Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
.doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(@NonNull Notification<String> stringNotification) throws Exception {
Log.e(TAG, "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
}
})
//订阅后可以进行取消订阅
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle: "+disposable.isDisposed());
//disposable.dispose();
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle run: ");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
});
}
输出结果为:
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnSubscribe:
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnLifecycle: false
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnNext: 1
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnEach: onNext
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: 收到消息: 1
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doAfterNext: 1
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnNext: 2
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnEach: onNext
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: 收到消息: 2
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doAfterNext: 2
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnComplete:
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doOnEach: onComplete
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doFinally:
02-21 14:55:10.329 22869-22869/cn.com.minstone.rxjavalearn E/AssistActivity: doAfterTerminate:
2.4 materialize操作符
materialize把 被观察者Observable转换为Notification通知对象。dematerialize相反了。 注意用了materialize之后,onNext会回调多了一个数据,因为onComplete也回调到这里了。
ps: 在Rxjava1中Notification有一个getKind的方法的,到了Rxjava2 就没了。。。
public void testMaterialize(){
Observable.just("1","2")
.materialize()
.subscribe(new Consumer<Notification<String>>() {
@Override
public void accept(@NonNull Notification<String> stringNotification) throws Exception {
//这时候的数据就是一个Notification对象了
Log.e(TAG, (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+": "+stringNotification.getValue());
}
});
}
输出为:
02-21 15:05:51.424 24043-24043/cn.com.minstone.rxjavalearn E/AssistActivity: onNext: 1
02-21 15:05:51.424 24043-24043/cn.com.minstone.rxjavalearn E/AssistActivity: onNext: 2
02-21 15:05:51.424 24043-24043/cn.com.minstone.rxjavalearn E/AssistActivity: onComplete: null
2.5 TimeInterval操作符
获取数据发送的时间间隔,就是把数据转换为数据发送的间隔Timed。
有4个参数方法:
- timeInterval(): 转换为时间Timed,默认时间单位为毫秒
- timeInterval(Scheduler): 转换为时间Timed,可以设置调度器
- timeInterval(TimeUnit): 转换为时间Timed,可以设置时间单位
- timeInterval(TimeUnit,Scheduler): 转换为时间Timed,可以设置时间单位和调度器
栗子
public void testTimeInterval(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0; i<3; i++){
e.onNext(i+"");
Thread.sleep(1000);
}
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.timeInterval()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(@NonNull Timed<String> stringTimed) throws Exception {
Log.e(TAG, "accept: "+stringTimed.time());
}
});
}
输出为:
02-21 15:50:44.642 28021-28286/cn.com.minstone.rxjavalearn E/AssistActivity: accept: 32
02-21 15:50:45.643 28021-28286/cn.com.minstone.rxjavalearn E/AssistActivity: accept: 1000
02-21 15:50:46.644 28021-28286/cn.com.minstone.rxjavalearn E/AssistActivity: accept: 1002
2.6 timestamp操作符
给发射的每个数据添加时间,转换了为Timed,和timeInterval的参数一致,但是timestamp获取到的time是时间戳,需要自己转换。
public void testTimeStamp(){
Observable.just("a","b")
.timestamp()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(@NonNull Timed<String> stringTimed) throws Exception {
//转换时间
String date = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss", Locale.CHINA)
.format(new Date(stringTimed.time()));
Log.e(TAG, "accept: "+date);
}
});
}
输出结果为:
02-21 16:05:09.956 10120-10120/? E/AssistActivity: accept: 2017-02-21 04:05:09
02-21 16:05:09.956 10120-10120/? E/AssistActivity: accept: 2017-02-21 04:05:09
2.7 ObserveOn/SubscribeOn操作符
这两个是切换线程用 的,在线程调度器的时候说了: