过滤、组合、功能、布尔操作符
根据指定条件过滤事件
通过设置指定的过滤条件,当且仅当该事件满足条件,就将该事件过滤(不发送)
filter
过滤特定条件的事件
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
}
}).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d(":..nht", integer + "");
}
});
12-20 22:30:00.137 16244-16244/com.sankuai.moviepro D/:..nht: 4
12-20 22:30:00.137 16244-16244/com.sankuai.moviepro D/:..nht: 5
解释:
过滤掉小于等于3的事件
oftype
过滤 特定数据类型的数据
Observable.just(1, "carson", 3, "ho", 5)
.ofType(Integer.class)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d(":..nht", integer + "");
}
});
12-20 22:33:15.354 17503-17503/? D/:..nht: 1
12-20 22:33:15.355 17503-17503/? D/:..nht: 3
12-20 22:33:15.355 17503-17503/? D/:..nht: 5
skip/skiplast
跳过某个事件
// 使用1:根据顺序跳过数据项
Observable.just(1, 2, 3, 4, 5)
.skip(1) // 跳过正序的前1项
.skipLast(2) // 跳过正序的后2项
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
Log.d(TAG,"获取到的整型事件元素是: "+ integer);
}
});
// 使用2:根据时间跳过数据项
// 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
.skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
.skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
.subscribe(new Consumer<Long>() {
@Override
public void accept( Long along ) throws Exception {
Log.d(TAG,"获取到的整型事件元素是: "+ along);
}
});
distinct/distinctUntilChanged
过滤事件序列中重复的事件 / 连续重复的事件
// 使用1:过滤事件序列中重复的事件
Observable.just(1, 2, 3, 1 , 2 )
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
Log.d(TAG,"不重复的整型事件元素是: "+ integer);
}
});
// 使用2:过滤事件序列中 连续重复的事件
// 下面序列中,连续重复的事件 = 3、4
Observable.just(1,2,3,1,2,3,3,4,4 )
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
Log.d(TAG,"不连续重复的整型事件元素是: "+ integer);
}
});
根据指定事件数量过滤事件
通过设置指定的事件数量,仅发送特定数量的事件
take
指定观察者最多能接收到的事件数量
takeLast
指定观察者只能接收到被观察者发送的最后几个事件
根据指定时间过滤事件
通过设置指定的时间,仅发送在该时间内的事件
throttleFirst()
在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
如,1段时间内连续点击按钮,但只执行第1次的点击操作
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
//1000milliseconds = 1s
subscriber.onNext(1);
Thread.sleep(500);
subscriber.onNext(2);
Thread.sleep(400);
subscriber.onNext(3);
Thread.sleep(300);
subscriber.onNext(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d(":..nht", integer + "");
}
});
12-21 18:03:08.443 17948-17948/com.sankuai.moviepro D/:..nht: 1
12-21 18:03:09.643 17948-17948/com.sankuai.moviepro D/:..nht: 4
throttleLast()
在某段时间内,只发送该段时间内最后1次事件
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
//1000milliseconds = 1s
subscriber.onNext(1);
Thread.sleep(500);
subscriber.onNext(2);
Thread.sleep(400);
subscriber.onNext(3);
Thread.sleep(300);
subscriber.onNext(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
}//每一秒采用的数据
}).throttleLast(1, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d(":..nht", integer + "");
}
});
12-21 18:05:51.848 18838-19161/com.sankuai.moviepro D/:..nht: 3
12-21 18:05:52.847 18838-19161/com.sankuai.moviepro D/:..nht: 4
sample
在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似
throttleWithTimeout () / debounce()
作用
发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
根据指定事件位置过滤事件
需求场景
通过设置指定的位置,过滤在该位置的事件
firstElement/lastElement
作用
仅选取第1个元素 / 最后一个元素
elementAt
作用
指定接收某个元素(通过 索引值 确定)
注:允许越界,即获取的位置索引 > 发送事件序列长度
elementAtOrError
作用
在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
参考资料
Android RxJava:过滤操作符 全面讲解
https://www.jianshu.com/p/c3a930a03855
RxJava:功能性操作符
Android RxJava:功能性操作符 全面讲解
https://www.jianshu.com/p/b0c3669affdb
错误处理
发送事件过程中,遇到错误时的处理机制
onErrorReturn
作用
遇到错误时,发送1个特殊事件 & 正常终止(所有不会走调用error事件了)
可捕获在它之前发生的异常
rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onError(new Throwable("发生错误了"));
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
// 捕捉错误异常
Log.e(":nht...", "在onErrorReturn处理了错误: "+throwable.toString() );
// 发生错误事件后,发送一个"666"事件,最终正常结束
return 666;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d(":nht...", "对Error事件作出响应");
}
@Override
public void onNext(Integer integer) {
Log.e(":nht...", "接收到了事件: "+integer );
}
});
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 1
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 2
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 3
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 在onErrorReturn处理了错误: java.lang.Throwable: 发生错误了
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 666
onErrorResumeNext
作用
遇到错误时,发送1个新的Observable
注:
onErrorResumeNext()拦截的错误 = Throwable;若需拦截Exception请用onExceptionResumeNext()
若onErrorResumeNext()拦截的错误 = Exception,则会将错误传递给观察者的onError方法
这里就看你onError方法里传递的是throwable还是exception了。
rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new Throwable("发生错误了"));
}
}).onErrorResumeNext(new Func1<Throwable, rx.Observable<? extends Integer>>() {
@Override
public rx.Observable<? extends Integer> call(Throwable throwable) {
Log.e(":nht...", "在onErrorResumeNext处理了错误: " + throwable.toString());
return rx.Observable.just(4, 5, 5);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d(":nht...", "对onCompleted事件作出响应");
}
@Override
public void onError(Throwable e) {
Log.d(":nht...", "对Error事件作出响应");
}
@Override
public void onNext(Integer integer) {
Log.d(":nht...", "接收到事件:" + integer);
}
});
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:1
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:2
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro E/:nht...: 在onErrorResumeNext处理了错误: java.lang.Throwable: 发生错误了
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:4
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:5
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:5
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 对onCompleted事件作出响应
onExceptionResumeNext
作用
遇到错误时,发送1个新的Observable
注:
onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
若onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法
retry
作用
重试,即当出现错误时,让被观察者(Observable)重新发射数据
接收到 onError()时,重新订阅 & 发送事件
Throwable 和 Exception都可拦截
<-- 1. retry() --> // 作用:出现错误时,让被观察者重新发送数据 // 注:若一直错误,则一直重新发送
<-- 2. retry(long time) --> // 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制 // 参数 = 重试次数
<-- 3. retry(Predicate predicate) --> // 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试) // 参数 = 判断逻辑
<-- 4. retry(new BiPredicate<Integer, Throwable>) --> // 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试 // 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
<-- 5. retry(long time,Predicate predicate) --> // 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制 // 参数 = 设置重试次数 & 判断逻辑
retryUntil
作用
出现错误后,判断是否需要重新发送数据
若需要重新发送 & 持续遇到错误,则持续重试
作用类似于retry(Predicate predicate)
具体使用
具体使用类似于retry(Predicate predicate),唯一区别:返回 true 则不重新发送数据事件。此处不作过多描述
retryWhen
作用
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
具体使用
rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new Exception("发生错误了"));
subscriber.onNext(3);
}
}).retryWhen(new Func1<rx.Observable<? extends Throwable>, rx.Observable<?>>() {
@Override
public rx.Observable<?> call(rx.Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, rx.Observable<?>>() {
@Override
public rx.Observable<?> call(Throwable throwable) {
//会触发之前Observable的重新订阅,但是自己的这个事件是不会被接受者接受的
//仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
return rx.Observable.just(10);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d(":nht...", "对onCompleted事件作出响应");
}
@Override
public void onError(Throwable e) {
Log.d(":nht...", "对onError事件作出响应");
}
@Override
public void onNext(Integer integer) {
Log.d(":nht...", "接受到事件" + integer + "");
}
});
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1
与repeatWhen比较类似
RxJava:组合操作符
Android RxJava:组合 / 合并操作符 详细教程
https://www.jianshu.com/p/c2a7c03da16d
RxJava:条件操作符
Android RxJava:详解 条件 / 布尔操作符
https://www.jianshu.com/p/954426f90325