Android RxJava2(四)过滤操作符

时间:2021-10-08 14:36:37

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的过滤操作符。顾名思义,这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。

filter()

原理图:
Android RxJava2(四)过滤操作符
方法:

public final Observable<T> filter(Predicate<? super T> predicate)

作用:
通过一定逻辑来过滤被观察者发送的事件,如果返回true则发送事件,否则不会发送
代码:

       Observable.just(1,2,3,4,5).filter(new Predicate<Integer>() {
           @Override
           public boolean test(Integer integer) throws Exception {
               return integer % 3 == 1;
           }
       }).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

上述代码是筛选出发送的事件中对3取余为1的事件,打印结果如下:

    onSubscribe
    onNext:1
    onNext:4
    onComplete

ofType()

方法:

public final <U> Observable<U> ofType(final Class<U> clazz)

作用:
对发送的事件筛选出指定的数据类型发送
代码:

       Observable.just(1,2,"hello","world").ofType(Integer.class).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

对发送的数据筛选出整形数据,打印结果如下:

    onSubscribe
    onNext:1
    onNext:2
    onComplete

skip() & skipLast()

原理图:
Android RxJava2(四)过滤操作符
Android RxJava2(四)过滤操作符
方法:

public final Observable<T> skip(long count)
public final Observable<T> skipLast(int count) 

作用:
skip从开始跳过指定数量的事件,skipLast从结尾往前数跳过制定数量的事件
代码:

       Observable.just(1,2,3,4).skip(2).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

从起始位是跳过2个事件,打印结果为:

    onSubscribe
    onNext:3
    onNext:4
    onComplete

distinct()

原理图:
Android RxJava2(四)过滤操作符
方法:

public final Observable<T> distinct()

作用:
去掉重复的事件
代码:

       Observable.just(1,2,3,4,4,2,1).distinct().subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

将重复事件去掉之后,结果如下:

    onSubscribe
    onNext:1
    onNext:2
    onNext:3
    onNext:4
    onComplete

distinctUntilChanged()

原理图:
Android RxJava2(四)过滤操作符
方法:

public final Observable<T> distinctUntilChanged()

作用:
将连续的重复事件去掉,仅限于连续
代码:

       Observable.just(1,2,3,4,4,2,1).distinctUntilChanged().subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

将连续重复事件去掉结果如下:

    onSubscribe
    onNext:1
    onNext:2
    onNext:3
    onNext:4
    onNext:2
    onNext:1
    onComplete

take() & takeLast()

原理图:
Android RxJava2(四)过滤操作符
Android RxJava2(四)过滤操作符
方法:

 public final Observable<T> take(long count)
 public final Observable<T> takeLast(int count)

作用:
take控制观察者接收事件的数量,takeLast控制的是被观察者发送尾部的count个事件
代码:

       Observable.just(1,2,3,4).take(3).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

设置观察者接收3个事件,打印结果如下:

    onSubscribe
    onNext:1
    onNext:2
    onNext:3
    onComplete

debounce()

原理图:
Android RxJava2(四)过滤操作符
方法:

public final Observable<T> debounce(long timeout, TimeUnit unit)

作用:
根据发送的事件时间间隔做出筛选,如果两次发送事件的间隔小于设定的timeout,则会取消前一个事件的发送
代码:

       Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(0);
                emitter.onNext(1);
                Thread.sleep(400);
                emitter.onNext(2);
                Thread.sleep(600);
                emitter.onNext(3);
                emitter.onComplete();
           }
       }).debounce(500,TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("---","onNext:"+integer);
           }

           @Override
           public void onError(Throwable e) {
               Log.e("---","onError");
           }

           @Override
           public void onComplete() {
               Log.e("---","onComplete");
           }
       });

上述代码是最发送的事件设置500毫秒的时间间隔,运行结果:

    onSubscribe
    onNext:2
    onNext:3
    onComplete

firstElement() & lastElement

原理图:
Android RxJava2(四)过滤操作符
Android RxJava2(四)过滤操作符
方法:

public final Maybe<T> firstElement()
public final Maybe<T> lastElement()

作用:
firstElement去被观察者发送的第一个事件,lastElement取被观察者最后一个事件
代码:

        Observable.just(1,2,3).firstElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---",integer+"");
            }
        });
        Observable.just(1,2,3).lastElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---",integer+"");
            }
        });

运行结果:

E/---: 1
E/---: 3

elementAt() & elementAtOrError

原理图:
Android RxJava2(四)过滤操作符
方法:

public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)

作用:
elementAt作用是取被观察者下标为index的事件进行发送,但是如果index超出事件总数的话就不会有任何结果。但是如果还想接收异常信息就可以用elementAtOrError
代码:

        Observable.just(1,2,3).elementAt(1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---",integer+"");
            }
        });

此时输出结果为:

E/---: 2