RxJava使用详解系列文章
详细的例子可以查看文章末尾的源码
这篇文章主要讲RxJava中常见的过滤操作符
1.debounce操作符
源Observable每发射一个数据项,如果在debounce规定的间隔时间内Observable没有发射新的数据项,debounce就把这个结果提交给订阅者处理,如果在规定的间隔时间内产生了其他结果,就忽略掉发射的这个数据,通过制定的时间间隔来限流,可以过滤掉发射速率过快的数据项,默认在computatiion调度器上执行,可以指定执行线程。
注意:如果源Observable发射最后一个数据后,在debounce规定的时间间隔内调用了onCompleted,那么通过debounce操作符就把这个结果提交给订阅者
throttleWithTimeOut使用也是调用了debounce操作符来实现
Observable.create(new Observable.OnSubscribe<Integer>() {输出结果:
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;//如果没有订阅者就直接返回
try {
//发射数据的时间间隔:100~900毫秒,
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.debounce(400, TimeUnit.MILLISECONDS)//超时时间为400毫秒,预期结果:时间间隔在400毫秒以上的数据都会提交给订阅者,其他的不会。
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("debounce onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});12-19 15:22:26.910 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:4 所在线程:RxComputationScheduler-1
12-19 15:22:27.310 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:5 所在线程:RxComputationScheduler-1
12-19 15:22:27.811 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:6 所在线程:RxComputationScheduler-1
12-19 15:22:28.411 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:7 所在线程:RxComputationScheduler-1
12-19 15:22:29.111 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:8 所在线程:RxComputationScheduler-1
12-19 15:22:29.911 16572-16622/com.dingmouren.rxjavademo I/System.out: debounce onNext:9 所在线程:RxComputationScheduler-1
2.distinct()操作符:
过滤掉重复的数据项。过滤规则是只允许没有发射过的数据项通过。
变体distinct(Func1)根据返回的key值去过滤,不用数据本身.
distinctUntilChanged()只判断这个数据项跟前一个数据项是否相同,distinctUnitilChanged(Func1)也是根据返回的key值去比较过滤。
默认不在任何特定的调度器上执行。Observable.just(1,2,2,3,4,4,5).distinct().subscribe(new Action1<Integer>() {输出结果:
@Override
public void call(Integer integer) {
System.out.println("distinct() onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});distinct() onNext:1 所在线程:main
distinct() onNext:2 所在线程:main
distinct() onNext:3 所在线程:main
distinct() onNext:4 所在线程:main
distinct() onNext:5 所在线程:mainObservable.just(1,2,2,3,4,4,5).distinct(new Func1<Integer, String>() {输出结果:
@Override
public String call(Integer integer) {
return 3 < integer ? "第一组" :"第二组";//这里返回key值,小于3的key值是第一组,也就是说1和2的key值都是第一组,只会将1提交给订阅者,2的key值与1相同就直接被过滤掉了,这个变体是根据key值进行过滤的
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("distinct(Func1) onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});distinct(Func1) onNext:1 所在线程:main
distinct(Func1) onNext:4 所在线程:mainObservable.just(1,2,3,2,4,5,4).distinctUntilChanged().subscribe(new Action1<Integer>() {输出结果:
@Override
public void call(Integer integer) {
System.out.println("distinctUntilChanged() onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});distinctUntilChanged() onNext:1 所在线程:main
distinctUntilChanged() onNext:2 所在线程:main
distinctUntilChanged() onNext:3 所在线程:main
distinctUntilChanged() onNext:2 所在线程:main
distinctUntilChanged() onNext:4 所在线程:main
distinctUntilChanged() onNext:5 所在线程:main
distinctUntilChanged() onNext:4 所在线程:main3.elementAt(index)
将指定索引的数据项提交给订阅者,索引是从0开始,当没有这个索引或者索引为负数会抛异常。
elementAtOrDefault(index,default):这个会设置一个默认值,当没有指定的索引就提交默认值给订阅者,为负数就抛异常。Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("elementAt onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});Observable.just(1,2,3,4).elementAt(3).subscribe(new Action1<Integer>() {输出结果:elementAt onNext:4 所在线程:main
@Override
public void call(Integer integer) {
System.out.println("elementAt onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});
Observable.just(1,2,3,4).elementAtOrDefault(6,6).subscribe(new Action1<Integer>() {输出结果:elementAtOrDefault onNext:6 所在线程:main
@Override
public void call(Integer integer) {
System.out.println("elementAtOrDefault onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});4. filter操作符对源Observable发射的数据项按照指定的条件进行过滤,满足的条件的才会调给订阅者。默认不在任何特定的调度器上执行
Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() {输出结果:
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("filter(Func1) onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});filter(Func1) onNext:4 所在线程:main
filter(Func1) onNext:5 所在线程:main5.first()操作符提交源Observable发射的第一项数据,如果只是想要一个过滤符,最好使用take(2)或者elementAt(0)
first(Func1)操作符是提交第一项符合自定义条件的数据
firstOrDefault(T)操作符是在Observable没有发射任何数据时提交一个指定的默认值
takeFirst(Func1)操作符提交符合自定义条件的的第一项数据,
与first(Func1)不同的是,takeFirst(Func1)在没有符合条件的时候,会调用onCompleted,而first(Func1)会抛一个NoSuchElementException的异常Observable.just(1,2,3).first().subscribe(new Action1<Integer>() {输出结果:first() onNext:1 所在线程:main
@Override
public void call(Integer integer) {
System.out.println("first() onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});
Observable.just(1,2,3).first(new Func1<Integer, Boolean>() {输出结果:first(Func1) onNext:2 所在线程:main
@Override
public Boolean call(Integer integer) {
return 1 < integer;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("first(Func1) onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});
6.ignoreElements()操作符不提交任何数据给订阅者,只提交终止通知(onError或者onCompeleted)给订阅者,默认不在任何特定的调度器上执行
7.last()操作符与first()操作符相反,只提交最后一个数据项给订阅者,如果只是作为过滤操作符,最好使用takeLast(1),
官方文档解释说:first()操作符和last()操作符在某些实现中会返回一个阻塞函数。
与first()操作符系列对应,也有last(Func1)、lastOrDefault(T)、lastOrDefault(T,Func1)8.ofType操作符类似于filter操作符,区别在于ofType按照数据项的类型进行过滤,默认不在任何特定的调度器上执行
Observable.just(1,"String类型",true).ofType(String.class).subscribe(new Action1<String>() {输出结果:ofType(class) onNext:String类型 所在线程:main
@Override
public void call(String s) {
System.out.println("ofType(class) onNext:" + s + " 所在线程:" + Thread.currentThread().getName());
}
});
9.sample操作符对Observable发射的数据定时进行采样,采的是自从上一次采样以来,Observable最近发射的一项数据,也就是这段时间间隔中最后一个数据项。如果自上一次采样以来,源Observable没有发射任何数据,sample操作符返回的Observable在此段时间也不会发射任何数据
默认在computation调度器上执行,但是可以指定它执行的线程 sample(long,TimeUnit,Scheduler)
与之对应的操作符是throttleFirst,它采样的是采样时间间隔中第一项数据,在最后一个时间段会发射最后一个数据项,看下面例子
* △time = 2.2s
* Data : 1------2------3------4------5------6------7
* Time(s) : 0------1------2------3------4------5------6------7................
* △time点: 0------1------2-。----3------4-。---5------6----。-7................
* SampleResults: 3,5,7
* ThrottleFirst: 1,4,7
Observable.create(new Observable.OnSubscribe<Integer>() {输出结果:
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;//如果没有订阅者直接返回
try {
//前3个数字的时间间隔设置1秒,最后一个设置2秒
for (int i = 1; i <8 ; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).sample(2200, TimeUnit.MILLISECONDS)//采样间隔时间为2200毫秒
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("sample onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});sample onNext:3 所在线程:RxComputationScheduler-1
sample onNext:5 所在线程:RxComputationScheduler-1
sample onNext:7 所在线程:RxComputationScheduler-1
Observable.create(new Observable.OnSubscribe<Integer>() {输出结果:
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
try {
for (int i = 1; i < 9 ; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).throttleFirst(2200,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("throttleFirst onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});throttleFirst onNext:1 所在线程:main
throttleFirst onNext:4 所在线程:main
throttleFirst onNext:7 所在线程:main
10.single()操作符:在源Observable只发射一个数据项的时候,single()操作符会将这个数据提交给订阅者,大于1个就抛Sequence contains too many elements的异常,不是正好是一个数据项就会抛异常
single(Func1)操作符是对源Observable发射的数据项进行判断,如果符合条件的数据数量大于1就会抛异常。不是正好是一个数据项就会抛异常
也有设置默认值得api,默认不在任何特定的调度器上执行
11.skip操作符skip(count) 对于源Observable发射的数据项,跳过前count项,将后面的数据项提交给订阅者
skip(long,TimeUnit)对于原Observalbe发射的数据项,跳过long前的数据项,将之后的数据提交给订阅者,可以指定执行线程
Observable.just(1,2,3,4,5,6).skip(3).subscribe(new Action1<Integer>() {输出结果:
@Override
public void call(Integer integer) {
System.out.println("skip(count) onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});skip(count) onNext:4 所在线程:main
skip(count) onNext:5 所在线程:main
skip(count) onNext:6 所在线程:mainObservable.create(new Observable.OnSubscribe<Integer>() {输出结果:
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
try {
//每隔一秒发射一项数据
for (int i = 1; i < 5 ; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).skip(2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Action1<Integer>() {//发射2秒之后的数据项
@Override
public void call(Integer integer) {
System.out.println("skip(long,TimeUnit) onNext:" + integer + " 所在线程:" + Thread.currentThread().getName());
}
});skip(long,TimeUnit) onNext:3 所在线程:main
skip(long,TimeUnit) onNext:4 所在线程:main12.skipLast操作符
skipLast(count) 对于源Observable发射的数据项,省略最后count项,将前面的数据项提交给订阅者
skipLast(long,TimeUnit)对于原Observalbe发射的数据项,省略最后long时间段的数据项,将之前的数据提交给订阅者,可以指定执行线程
13.take操作符
take(count)操作符对于源Observable发射的数据项,提取前面的count项数据提交给订阅者,忽略后面的
take(long,TimeUnit)操作符对于源Obsrvable发射的数据项,提取前面long时间段里的数据项提交给订阅者,忽略后面的,可以指定线程14.takeLast操作符
takeLast(count)操作符对于源Observable发射的数据项,提取前面的count项数据提交给订阅者,忽略后面的
takeLast(long,TimeUnit)操作符对于源Obsrvable发射的数据项,提取前面long时间段里的数据项提交给订阅者,忽略后面的,可以指定线程
15.takeLastBuffer操作符
takeLastBuffer(count)操作符与takeLast(count)操作符类似,唯一不同就是takeLastBuffer(count)将最后的那些数据项收集到一个list集合中,提交这个集合给订阅者
takeLastBuffer(long,TimeUnit)操作符与takeLast(long,TimeUnit)操作符类似,唯一不同就是将在最后时间段Long中数据项收集到一个list集合中,
将这个集合提交给了订阅者,可以指定运行的线程。
更多详细内容和例子,可以查看源码