一、条件操作符列表
根据条件发射或变换Observables
名称 | 解析 |
---|---|
amb() | 给定多个Observable,只让第一个发射数据的Observable发射全部数据 |
defaultIfEmpty() | 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 |
switchIfEmpty() | 如果原始Observable没有发射数据,它发射一个备用Observable的发射物 |
(rxjava-computation-expressions) doWhile() | 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止 |
(rxjava-computation-expressions) ifThen() | 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列 |
skipUntil() | 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 |
skipWhile() | 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据 |
(rxjava-computation-expressions) switchCase() | 基于一个计算结果,发射一个指定Observable的数据序列 |
takeUntil() | 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知 |
takeWhile(),takeWhileWithIndex() | 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据 |
(rxjava-computation-expressions)whileDo() | 如果满足一个条件,发射原始Observable的数据,然后重复发射直到不满足这个条件为止 |
(rxjava-computation-expressions): 表示这个操作符当前是可选包 rxjava-
computation-expressions 的一部分,还没有包含在标准RxJava的操作符集合里,需要自己导包,但是这个包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的话,会出现问题。 可以使用repeat操作符代替
compile 'io.reactivex:rxjava-computation-expressions:0.21.0'
二、条件操作符
2.1 amb/ambArray/ambWith
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。
有两个参数方法:
- amb(Iterable);
- ambArray();
public void testAmb(){
Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
Observable o2 = Observable.just("d","e","f");
Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
Log.e(TAG, "accept: "+o);
}
});
}
只发送最先的Observable,所以这里输出为def:
02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: d
02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: e
02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: f
2.2 defaultlfEmpty
被观察者没有onNext发送数据就调用了onComplete,就发射defaultlfEmpty里面的数据
/**
* 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
*/
public void testDefaultlfEmpty(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).defaultIfEmpty("默认数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
输出内容为:
03-02 13:56:46.470 5925-5925/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 默认数据
2.3 switchEmpty
如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
/**
* 如果原始Observable没有发射数据,它发射一个备用Observable的发射物
*/
public void testSwitchEmpty(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).switchIfEmpty(Observable.just("a","b","c"))
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
输出的内容为:
03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: a
03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: b
03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: c
2.4 skipUntil
SkipUntil 订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。
/**
* 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
*/
private void testSkipUntil(){
//skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
Observable.interval(1, TimeUnit.SECONDS)
.skipUntil(Observable.just("1").delay(3,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});
}
输出内容为:
03-02 15:27:09.771 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 2
03-02 15:27:10.772 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 3
03-02 15:27:11.773 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 4
03-02 15:27:12.774 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 5
03-02 15:27:13.765 11586-11602/cn.com.minstone.rxjavalearn
....
2.5 skipWhile
和skipUntil不同的是,skipWhile可以判断,返回假才让数据发出去。
/**
* 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
*/
private void testSkipWhile(){
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(Schedulers.newThread())
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
return aLong < 5;
//返回假,原始的Observable发射的数据才可以接收到
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});
}
输出内容为:
03-02 15:33:06.449 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 5
03-02 15:33:07.460 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 6
03-02 15:33:08.461 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 7
03-02 15:33:09.462 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 8
2.6 takeUntil() / takeWhile() / takeWhileWithIndex()
和skilUntil、shikWhil相反,他们是开始先发射原始数据,到takeUntil的第二个Observable发射了一个数据或一个通知就不发射原来的数据了。
/**
* 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
*/
private void testTakeUntil(){
Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(Observable.just("1").delay(3,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});
}
输出内容为:
03-02 15:41:51.601 15734-15762/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 0
03-02 15:41:52.602 15734-15762/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 1
takeWhile:
private void testTakeWhile(){
Observable.interval(1, TimeUnit.SECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
//返回真就不让Observable继续发射了
return aLong > 3;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});
}
输出内容为:
03-02 15:44:32.108 18329-18358/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 0
03-02 15:44:33.109 18329-18358/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 1