Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的过滤操作符。顾名思义,这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。
filter()
原理图:
方法:
public final Observable<T> filter(Predicate<? super T> predicate)
作用:
通过一定逻辑来过滤被观察者发送的事件,如果返回true则发送事件,否则不会发送
代码:
Observable.just(1,2,3,4,5).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 3 == 1;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
上述代码是筛选出发送的事件中对3取余为1的事件,打印结果如下:
onSubscribe
onNext:1
onNext:4
onComplete
ofType()
方法:
public final <U> Observable<U> ofType(final Class<U> clazz)
作用:
对发送的事件筛选出指定的数据类型发送
代码:
Observable.just(1,2,"hello","world").ofType(Integer.class).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
对发送的数据筛选出整形数据,打印结果如下:
onSubscribe
onNext:1
onNext:2
onComplete
skip() & skipLast()
原理图:
方法:
public final Observable<T> skip(long count)
public final Observable<T> skipLast(int count)
作用:
skip从开始跳过指定数量的事件,skipLast从结尾往前数跳过制定数量的事件
代码:
Observable.just(1,2,3,4).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
从起始位是跳过2个事件,打印结果为:
onSubscribe
onNext:3
onNext:4
onComplete
distinct()
原理图:
方法:
public final Observable<T> distinct()
作用:
去掉重复的事件
代码:
Observable.just(1,2,3,4,4,2,1).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
将重复事件去掉之后,结果如下:
onSubscribe
onNext:1
onNext:2
onNext:3
onNext:4
onComplete
distinctUntilChanged()
原理图:
方法:
public final Observable<T> distinctUntilChanged()
作用:
将连续的重复事件去掉,仅限于连续
代码:
Observable.just(1,2,3,4,4,2,1).distinctUntilChanged().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
将连续重复事件去掉结果如下:
onSubscribe
onNext:1
onNext:2
onNext:3
onNext:4
onNext:2
onNext:1
onComplete
take() & takeLast()
原理图:
方法:
public final Observable<T> take(long count)
public final Observable<T> takeLast(int count)
作用:
take控制观察者接收事件的数量,takeLast控制的是被观察者发送尾部的count个事件
代码:
Observable.just(1,2,3,4).take(3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
设置观察者接收3个事件,打印结果如下:
onSubscribe
onNext:1
onNext:2
onNext:3
onComplete
debounce()
原理图:
方法:
public final Observable<T> debounce(long timeout, TimeUnit unit)
作用:
根据发送的事件时间间隔做出筛选,如果两次发送事件的间隔小于设定的timeout,则会取消前一个事件的发送
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(0);
emitter.onNext(1);
Thread.sleep(400);
emitter.onNext(2);
Thread.sleep(600);
emitter.onNext(3);
emitter.onComplete();
}
}).debounce(500,TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
上述代码是最发送的事件设置500毫秒的时间间隔,运行结果:
onSubscribe
onNext:2
onNext:3
onComplete
firstElement() & lastElement
原理图:
方法:
public final Maybe<T> firstElement()
public final Maybe<T> lastElement()
作用:
firstElement去被观察者发送的第一个事件,lastElement取被观察者最后一个事件
代码:
Observable.just(1,2,3).firstElement().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("---",integer+"");
}
});
Observable.just(1,2,3).lastElement().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("---",integer+"");
}
});
运行结果:
E/---: 1
E/---: 3
elementAt() & elementAtOrError
原理图:
方法:
public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)
作用:
elementAt作用是取被观察者下标为index的事件进行发送,但是如果index超出事件总数的话就不会有任何结果。但是如果还想接收异常信息就可以用elementAtOrError
代码:
Observable.just(1,2,3).elementAt(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("---",integer+"");
}
});
此时输出结果为:
E/---: 2