RxJava的操作符
创建和订阅一个 Observable 是足够简单的,可能这并不是非常有用的,但这只是用 RxJava 的一个开始。通过调用操作符,任何的 Observable 都能进行输出转变,多个Operators 能链接到 Observable上。RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
map()
map()是RXJava操作符中最简单的一个,它所实现的功能为对事件对象的直接转换,并且是一对一的转换。如下面所示的例子,我通过map()操作符进行了如下操作:通过new Date()方法创建一个当前时间的对象,通过map()操作符对这个对象进行格式的转换后输出为字符串,并在字符串前面加上前缀“Current time: ”,最终将字符串显示到TextView控件当中。
Observable.just(new Date()).map(new Func1<Date, String>() {
@Override
public String call(Date date) {
String s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
return "Current time: " + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
sb.append(s).append("\n");
tv_rx_text.setText(sb.toString());
}
});
flatMap()
有了map()操作符之后,我们已经能够比较容易的对事件对象进行所需的转换操作,但是您也能发现在上一部分我强调了一下map()操作符所进行的转换是一对一的,这就是这个操作符的缺陷。如果我们需要对一个事件中的对象进行转换,而这个事件包含N个事件对象,那么我们相对于要进行N次的map()操作,这无疑是很浪费的。有人说了我们可以将事件中的N个对象封转到一个List里面,我们在转换的时候对List进行遍历,分别对每个事件对象进行转换。这能实现现在的需求吗?答案是肯定的,但是如果我们不想去封转一个List呢,这个时候一个新的操作符flatMap()就出现了,这个操作符实现的功能也是对事件对象的转换,并且它是支持一对多的转换。
Subscriber<Student.Course> subscriber = new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
sb.append("onCompleted\n");
tv_rx_text.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student.Course course) {
sb.append(course.getCourse() + "\n");
tv_rx_text.setText(sb.toString());
}
};
Observable.just(student[i]).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.getCourseList());
}
}).subscribe(subscriber);
如上述代码所示,这个示例中我们做了这么一件事情:我们传入了一个Student对象(里面包含一个内部类Course用来保存student的课程信息),我们通过flatMap()对传入的Student对象进行转换,将Student对象转换成了一个Course对象,并用这个Course对象构造一个Observable对象返回进行处理,最后将Course对象中包含的课程信息显示在TextView中。
从上面的代码中可以看出,map()和flatMap()有一个共同点:都是把传入的参数转换之后返回另一个对象。但是和map()方法不同的是,flatMap()中返回的是一个Observable对象,并且这个Observable不是直接被发送到了Subscriber的回调方法中。flatMap()的原理是这样的:
- 使用传入的事件对象创建一个 Observable 对象;
- 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
- 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
filter()
filter()操作符是可以对Observable流程的数据进行一层过滤处理,filter() 返回为 false 的值将不会发出到 Subscriber。
int num = (int)(Math.random()*100);
Observable.just(num).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
boolean isEvenNumber = integer % 2 == 0;
return isEvenNumber;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
sb.append("Current number: " + integer + "\n");
tv_rx_text.setText(sb.toString());
}
});
上述示例为随机生成一个0~100之间的数字,当数字为偶数时,对数字按“Current number: ”的格式显示在TextView中。
interval()
对于轮询器大家一定不陌生,开发中无论是Java的Timer+TimeTask , 还是Android的Hanlder都可实现。当然在RxJava中也有这样的实现方式,那就是使用interval()操作符。我们使用interval()实现一个10s的计时器,每间隔一面在TextView中更新一下时间,直到10s。代码如下:
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
sb.append("Complete ! \n");
tvRxText.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
if(aLong < 10){
sb.append(aLong + 1 + " s" + "\n");
tvRxText.setText(sb.toString());
}else {
onCompleted();
}
}
});
interval()方法的第一个参数为每次更新的时间间隔,第二个参数为该时间间隔的单位。通过运行此代码,我们发现确实能实现10s计时器的功能,但是到了10s以后,计时器仍未停止,它会一直下去(TextView中的“Complete !”依然每隔一秒打印一次)。所以这其实也是一种浪费,网上有说可以在onNext()里计算时间,达到要求时进行解绑(目前我还没找到解绑interval的方法,如果您知道,请赐教)。在这种情况下,take()操作符应运而生,它和interval()能完美结合实现计时器的功能,接下来我们来看一下take()操作符的使用。
take()
take从字面意思上可以理解就是“拿,取”的意思。所以take()所起的作用也就是取的作用,根据传入参数的数值N来获取前N个onNext()的结果,达到指定数值之后,调用onCompleted()完成此次计时操作。
Observable.interval(1, TimeUnit.SECONDS)
.take(10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
sb.append("Complete ! \n");
tvRxText.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
sb.append(aLong + 1 + " s" + "\n");
tvRxText.setText(sb.toString());
}
});
take()操作符的使用方法如上所示,虽然比较简单,但是却很好的解决了interval计时不能停的问题。
总的来说,RxJava中的操作符可以说是RxJava中比较核心的部分,合理的运用这些操作符会让我们的工作事半功倍。
参考:
https://github.com/ReactiveX/RxJava
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_1
http://blog.csdn.net/lzyzsd/article/details/44094895