[Android开发] RxJava2之路九 - 条件操作符例子Demo

时间:2022-05-20 17:50:49

一、条件操作符列表

根据条件发射或变换Observables

名称 解析
amb() 给定多个Observable,只让第一个发射数据的Observable发射全部数据
defaultIfEmpty() 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
switchIfEmpty() 如果原始Observable没有发射数据,它发射一个备用Observable的发射物
(rxjava-computation-expressions) doWhile() 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止
(rxjava-computation-expressions) ifThen() 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列
skipUntil() 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
skipWhile() 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
(rxjava-computation-expressions) switchCase() 基于一个计算结果,发射一个指定Observable的数据序列
takeUntil() 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
takeWhile(),takeWhileWithIndex() 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
(rxjava-computation-expressions)whileDo() 如果满足一个条件,发射原始Observable的数据,然后重复发射直到不满足这个条件为止

(rxjava-computation-expressions): 表示这个操作符当前是可选包 rxjava-
computation-expressions 的一部分,还没有包含在标准RxJava的操作符集合里,需要自己导包,但是这个包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的话,会出现问题。 可以使用repeat操作符代替

compile 'io.reactivex:rxjava-computation-expressions:0.21.0'

二、条件操作符

2.1 amb/ambArray/ambWith

给定多个Observable,只让第一个发射数据的Observable发射全部数据。

ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。

有两个参数方法:
- amb(Iterable);
- ambArray();

    public void testAmb(){
Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
Observable o2 = Observable.just("d","e","f");


Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
Log.e(TAG, "accept: "+o);
}
});

}

只发送最先的Observable,所以这里输出为def:

02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: d
02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: e
02-21 17:23:24.286 18753-18753/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: f

2.2 defaultlfEmpty

被观察者没有onNext发送数据就调用了onComplete,就发射defaultlfEmpty里面的数据

/**
* 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
*/

public void testDefaultlfEmpty(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).defaultIfEmpty("默认数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}

输出内容为:

03-02 13:56:46.470 5925-5925/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 默认数据

2.3 switchEmpty

如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源

    /**
* 如果原始Observable没有发射数据,它发射一个备用Observable的发射物
*/

public void testSwitchEmpty(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).switchIfEmpty(Observable.just("a","b","c"))
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});


}

输出的内容为:

03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: a
03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: b
03-02 14:32:45.416 9858-9858/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: c

2.4 skipUntil

SkipUntil 订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

    /**
* 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
*/

private void testSkipUntil(){

//skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
Observable.interval(1, TimeUnit.SECONDS)
.skipUntil(Observable.just("1").delay(3,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);

}
});

}

输出内容为:

03-02 15:27:09.771 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 2
03-02 15:27:10.772 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 3
03-02 15:27:11.773 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 4
03-02 15:27:12.774 11586-11602/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 5
03-02 15:27:13.765 11586-11602/cn.com.minstone.rxjavalearn

....

2.5 skipWhile

和skipUntil不同的是,skipWhile可以判断,返回假才让数据发出去。

   /**
* 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
*/

private void testSkipWhile(){


Observable.interval(1, TimeUnit.SECONDS)
.observeOn(Schedulers.newThread())
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
return aLong < 5;

//返回假,原始的Observable发射的数据才可以接收到

}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);

}
});

}

输出内容为:

03-02 15:33:06.449 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 5
03-02 15:33:07.460 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 6
03-02 15:33:08.461 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 7
03-02 15:33:09.462 14710-14781/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 8

2.6 takeUntil() / takeWhile() / takeWhileWithIndex()

和skilUntil、shikWhil相反,他们是开始先发射原始数据,到takeUntil的第二个Observable发射了一个数据或一个通知就不发射原来的数据了。

    /**
* 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
*/

private void testTakeUntil(){
Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(Observable.just("1").delay(3,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);

}
});
}

输出内容为:

03-02 15:41:51.601 15734-15762/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 0
03-02 15:41:52.602 15734-15762/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 1

takeWhile:

    private void testTakeWhile(){
Observable.interval(1, TimeUnit.SECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
//返回真就不让Observable继续发射了
return aLong > 3;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);

}
});
}

输出内容为:

03-02 15:44:32.108 18329-18358/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 0
03-02 15:44:33.109 18329-18358/cn.com.minstone.rxjavalearn E/ConditionActivity: accept: 1