RxJava操作符(三) __过滤操作

时间:2021-07-13 17:50:32

最近终于把项目搞上线了,有时间还是来研究RxJava 了,前面两篇文章中分别学习了RxJava中的创建操作和RxJava的变换操作,今天就来看看我觉得比较有趣的过滤操作吧。

还是老例子,先写出几个公共方法,来供程序测试:

//打印函数
private static void print(Object obj) {
System.out.println(obj);
}

//获取默认的观察者
private <T> Observer<T> getDefaultObserver() {
return new Observer<T>() {

@Override
public void onCompleted() {
print("onCompleted method");
}

@Override
public void onError(Throwable e) {
print("onError method : " + e);
}

@Override
public void onNext(T t) {
print("onNext:" + t) ;
}
};
}

filter

过滤只有我们想要的数据,比如我们想获取1-5之间大于3的数据,程序如下:

    @Test
public void filterFunction() {
Observable.range(1, 5).filter(new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t) {
return t >= 3;
}
}).subscribe(getDefaultObserver());
}

测试结果为:
RxJava操作符(三) __过滤操作

ofType

ofType操作符含义与filter类似,它只是返回指定类型的数据。
例子如下:获取1,5,“张三”,“Tom”,60.34f中的字符串

@Test
public void ofTypeFunction() {
Observable.just(1,5,"张三","Tom",60.34f).
ofType(String.class).
subscribe(getDefaultObserver()) ;
}

获取结果为:
RxJava操作符(三) __过滤操作

TakeLast

发射Observable发射的最后N项数据
例子:获取从100开始的10项数据中的后5项:

    @Test
public void takeLastFunction1() {
Observable.
range(100, 10).
takeLast(5).
subscribe(getDefaultObserver());
}

获取的结果为:
RxJava操作符(三) __过滤操作

Last

只发射最后一项(或者满足某个条件的最后一项)数据
例子:获取1-5之间大于3的数据的最后一个:

        Observable.
range(1, 5).
filter(new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t) {
return t > 3;
}
}).
last().
subscribe(getDefaultObserver());

获取的结果为:
RxJava操作符(三) __过滤操作

lastOrDefault

与last功能相似,但是加入了default值,意味着如果原始的Observable没有发射任何值,那么将会发射默认值。
来一个没有发射值的:

        Observable.
empty().
lastOrDefault(0).
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

takeLastBuffer

与takeLast功能相似,只是最后是以List的形式发射的
来个例子,注意看结果:

        Observable.
range(0, 10).
takeLastBuffer(40).
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作
可以看到,发射形式是我们非常熟悉的List形式。

当然takeLastBuffer还可以发射最后时间内发射的N项数据,比如我只需要发射过程中最后3秒的数据:

Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> t) {
for(int i = 0 ; i < 5 ; i ++) {
try {
//系统休眠1秒
Thread.sleep(1000);
t.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

t.onCompleted();
}
}).
takeLastBuffer(3, TimeUnit.SECONDS).
subscribe(getDefaultObserver()) ;

Skip

跳过Observable发射的前N项数据
这个比较简单,跳过从10开始的10项数据中的前5项数据,

Observable.
range(10, 10).
skip(5).
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

当然还可以跳过发射的时间,这里就不举例子了。

SkipLast

跳过Observable发射的后N项数据
还是上面那个例子:

Observable.
range(10, 10).
skipLast(5).
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

Take

只发射前面的N项数据
这个用的比较多,下面的代码中,只去发射数据的前3个

Observable.
interval(1, TimeUnit.SECONDS).
take(3).
subscribe(getDefaultObserver());

System.in.read() ;

获取的结果为:
RxJava操作符(三) __过滤操作

First & FirstOrDefault

只发射第一项(或者满足某一个条件的第一项)数据
声明一下,first与take(1)还是有区别的,因为first可以接受一个function,判断是否满足条件。如果第一项不满足条件,那么take(1)就与first的含义就不一样了。
获取”Steven”,”Micro”, “Merry”, “Tom”, “Tonny”中首字母为M的名字:

Observable.
just("Steven", "Micro", "Merry", "Tom", "Tonny").
first(new Func1<String, Boolean>() {
@Override
public Boolean call(String t) {
return null != t && t.startsWith("M");
}
}).
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

FirstOrDefault就如同LastOrDefault的含义一样了,如果没有找到合适的,就可以返回默认值了,这样做的好处是,first在原始Observable中没有发射任何满足条件的数据时,first会报错,而firstOrDefault不会。具体的代码就不贴了,基本意思一样啊。

takeFirst

takeFirst与first类似,除了这一点:如果原始的Observable没有发射满足条件的任何数据,first会抛出一个onSuchElementException,takeFirst会返回一个空的Observable(不会调用onNext()方法,但是会调用onCompleted())

比如还是上面四个人,找出首字母为A的人,得出的结果为:

RxJava操作符(三) __过滤操作

RxJava操作符(三) __过滤操作

可以看出,二者对没有没有满足条件的数据,处理方式会存在差异。

ElementAt & ElementAtOrDefault

只发射第N项数据(存在default值时,发射default值),它是获取原始Observable发射的数据序列中指定索引位置的数据项,然后当做自己的唯一数据发射。

先来看ElementAt:

Observable.
just("1", "2", "3", "4").
elementAt(1).
subscribe(getDefaultObserver());

来个越界的:

Observable.
just("1", "2", "3", "4").
elementAt(12).
subscribe(getDefaultObserver());

结果分别为:
RxJava操作符(三) __过滤操作

RxJava操作符(三) __过滤操作

可以看出来,结果是存在差异的。

现在来看ElementAtOrDefault,同样的代码结果如下:

Observable.
just("A","B","C","D").
elementAtOrDefault(1, "ZZZ").
subscribe(getDefaultObserver());

来个越界的:

Observable.
just("A","B","C","D").
elementAtOrDefault(12, "ZZZ").
subscribe(getDefaultObserver());

结果如下:
RxJava操作符(三) __过滤操作

RxJava操作符(三) __过滤操作

Timeout

对于的原始的Observable,如果过了一个指定的时长仍没有发射数据,它会发射一个错误通知【就是会执行observer的onError方法】

下面的例子中,Observable每隔1.9s发射一个数据,而timeout等待的时长为1s,看看此时是否会触发onError:

Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> t) {
for(int i = 0 ; i < 10 ; i++) {
try {
Thread.sleep(1900);
t.onNext(i + "");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).
timeout(1,TimeUnit.SECONDS).
subscribe(getDefaultObserver());

那么结果为:
RxJava操作符(三) __过滤操作
结果就像我们所要的,它触发了onError事件。

Distinct & distinctUntilChanged

distinct是过滤掉重复的数据,distinctUntilChanged是过滤掉连续重复的数据。
举个例子啊,过滤掉1,2,3,3,2,1中的重复数据:

Observable.
just(1, 2, 3, 3, 3, 2, 1).
distinct().
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

过滤数字中绝对值相等的数:

Observable.
just(1, -1, 2, -2).
distinct(new Func1<Integer,String>() {
@Override
public String call(Integer t) {
return String.valueOf(Math.abs(t));
}
}).
subscribe(getDefaultObserver());

那么我们得出的结果为:
RxJava操作符(三) __过滤操作

对于distinctUntilChanged,这个操作符只考虑连续的发射值,对于非连续的发射值,不起作用,来来,例子:

Observable.
just(1, -1, -1, 1).
distinctUntilChanged().
subscribe(getDefaultObserver());

我们开看一看结果:
RxJava操作符(三) __过滤操作
可以看到,只有中间的两个连续的-1值才会被过滤。

那么再来个function吧:

Observable.
just(1, -1, -1, 1).
distinctUntilChanged(new Func1<Integer, String>() {

@Override
public String call(Integer t) {
return String.valueOf(Math.abs(t));
}
}).
subscribe(getDefaultObserver());

RxJava操作符(三) __过滤操作
看到我们的结果变成了1,就知道怎么回事了吧。1, -1, -1, 1首先变成1,1,1,1那么连续的1只会接受一个1。

IgnoreElements

不发射任何数据,只发射Observable的终止通知
来个例子:

Observable.
range(0, 10).
ignoreElements().
subscribe(getDefaultObserver());

结果为:
RxJava操作符(三) __过滤操作

好了,还有两个延迟函数没有介绍到,主要是还不是特别理解它们的意思【官方文档有些难理解,吐槽一下】。今天就到这里吧,有些累,明天接着写啊。