Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的转换操作符。
map()
原理图
方法:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
作用:
可以将被观察者发送的数据进行数据类型转换
代码:
比如说想将String数组的数据转换成Integer
Observable.fromArray(new String[]{"1","12","123"}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("----","----onNext:"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
flatMap()
原理图
方法:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
作用:
这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
1、将传入的事件对象装换成一个Observable对象;
2、这时不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
3、每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。
代码:
假设有一个People类,定义如下:
class People {
private String name;
private ArrayList<Book> saveBooks;//收藏书籍
public People(String name, ArrayList<Book> saveBooks){
this.name = name;
this.saveBooks = saveBooks;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ArrayList<Book> getSaveBooks() {
return saveBooks;
}
public void setSaveBooks(ArrayList<Book> saveBooks) {
this.saveBooks = saveBooks;
}
}
People类里有name和saveBooks两个变量,分别代表姓名和收藏得书籍清单
Book定义如下:
class Book{
private String name;
private ArrayList<String> history;//修订记录
public Book(String name,ArrayList<String> history){
this.name = name;
this.history = history;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ArrayList<String> getHistory() {
return history;
}
public void setHistory(ArrayList<String> history) {
this.history = history;
}
}
初始化一些简单数据
ArrayList<Book> books = new ArrayList<>(3); books.add(new Book("开发艺术探索",new ArrayList<String>(){{add("2018.01.01 第一版");add("2018.01.02 第二版");}})); books.add(new Book("第一行代码",new ArrayList<String>(){{add("2019.01.01 第一版");add("2019.01.02 第二版");}})); ArrayList<People> libraries = new ArrayList<>(2); libraries.add(new People("Tom",books)); libraries.add(new People("Jerry",books));
如果我们得需求是打印每个人收藏得书籍相关信息,可以用map来实现
Observable.fromIterable(libraries).map(new Function<People, ArrayList<Book>>() {
@Override
public ArrayList<Book> apply(People people) throws Exception {
return people.getSaveBooks();
}
}).subscribe(new Observer<ArrayList<Book>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ArrayList<Book> books) {
//打印想要数据
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
但是如果这里我们有进一步得需求,那就是只打印每个人收藏得每本书得每一次修订记录,如果再次用map得话就得在onNext里面获取Book对象进行遍历获取数据,但是如果用flatMap()可以很方便得实现
Observable.fromIterable(libraries).flatMap(new Function<People, ObservableSource<Book>>() {
@Override
public ObservableSource<Book> apply(People people) throws Exception {
//获取当前传递进来得用户得每一本收藏
return Observable.fromIterable(people.getSaveBooks());
}
}).flatMap(new Function<Book, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Book book) throws Exception {
//获取每本书得修订记录
return Observable.fromIterable(book.getHistory());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
//打印修订记录
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
从上述代码可以看出,经过两次flagMap()对数据进行转换,就可以完成需求,代码逻辑比较清晰。
concatMap()
原理图
方法:
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
作用:
功能同flatMap()基本上是一样的,只不过concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
代码:
将上述flatMap()代码做如下更改
Observable.fromIterable(libraries).flatMap(new Function<People, ObservableSource<Book>>() {
@Override
public ObservableSource<Book> apply(People people) throws Exception {
if("Tom".equals(people.getName())){
return Observable.fromIterable(people.getSaveBooks()).delay(3, TimeUnit.SECONDS);
}
return Observable.fromIterable(people.getSaveBooks());
}
}).flatMap(new Function<Book, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Book book) throws Exception {
return Observable.fromIterable(book.getHistory());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
//打印修订记录
Log.e("----",s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
将用户名为Tom得用户数据延迟3秒再发送,打印日志如下
06-04 18:17:23.084 25841-25841/ E/----: 2018.01.01 第一版
2018.01.02 第二版
2019.01.01 第一版
2019.01.02 第二版
06-04 18:17:26.086 25841-25874/ E/----: 2018.01.01 第一版
2018.01.02 第二版
2019.01.01 第一版
2019.01.02 第二版
会发现Tom得数据是3秒之后接收到得,如果我们把flatMap() 改成concatMap()打印日志如下
06-04 18:19:47.002 26620-26644/ E/----: 2018.01.01 第一版
2018.01.02 第二版
2019.01.01 第一版
2019.01.02 第二版
2018.01.01 第一版
2018.01.02 第二版
2019.01.01 第一版
2019.01.02 第二版
这就代表 concatMap() 转换后发送的事件序列是有序的了。
flatMapIterable()
原理图
方法:
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper)
作用:
flatMapIterable() 和flatMap()功能在流程上大体一致,唯一不同的是,flatMap是转一个Observable转换成多个Observable,每一个Observable最后又返回一个Observable。而flatMapInterable是将一个Observable转换成多个Observable,但是每一个Observable最后返回得是Iterable。Iterable,可以理解成返回一个list
代码:
Observable.fromArray(new String[]{"1","2","3"}).flatMapIterable(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> apply(final String s) throws Exception {
return new ArrayList<String>(){{add("a"+s);add("b"+s);}};
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("----",s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码打印日志如下
06-04 18:35:15.400 30351-30351/ E/----: a1
b1
a2
b2
a3
b3
switchMap()
原理图
用法:
public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
作用:
switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。
buffer()
原理图
用法:
public final Observable<List<T>> buffer(int count, int skip)
count:缓冲区元素的数量
skip:缓冲区满了之后,发送下次事件序列的时候要跳过多少元素
作用:
从需要发送的事件当中获取一定数量事件,并将这些事件放到缓冲区当中一并发送
代码:
Observable.just(1,2,3,4,5,6).buffer(3,2).subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> value) {
Log.e("---","---"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码是设置缓冲区大小为3,每次跳过2个元素,代码打印结果如下:
06-04 22:20:48.595 2631-2631/ E/---: ---[1, 2, 3]
06-04 22:20:48.595 2631-2631/ E/---: ---[3, 4, 5]
06-04 22:20:48.595 2631-2631/ E/---: ---[5, 6]
scan()
原理图:
用法:
public final Observable<T> scan(BiFunction<T, T, T> accumulator)
作用:
将数据以一定的逻辑聚合起来,并将计算结果发送出去作为下个数据应用函数时的第一个参数使用
代码:
Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("---","integer:"+integer+" integer2:"+integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("---","value:"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码打印日志如下:
06-04 22:35:41.659 11404-11404/? E/---: value:1
06-04 22:35:41.659 11404-11404/? E/---: integer:1 integer2:2
06-04 22:35:41.659 11404-11404/? E/---: value:3
06-04 22:35:41.659 11404-11404/? E/---: integer:3 integer2:3
06-04 22:35:41.659 11404-11404/? E/---: value:6
06-04 22:35:41.659 11404-11404/? E/---: integer:6 integer2:4
06-04 22:35:41.659 11404-11404/? E/---: value:10
06-04 22:35:41.659 11404-11404/? E/---: integer:10 integer2:5
06-04 22:35:41.659 11404-11404/? E/---: value:15
通过上述执行结果可以发现,第一个参数integer就是上一次运算的结果
groupby()
原理图:
用法:
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
作用:
将原始Observable发送的数据按照key进行分组,每个分组都会返回一个Observable,这些Observable分别发射其包含的数据。
代码:
下面代码代表将1到5的数据根据是否能对2整除进行分组
Observable.just(1, 2, 3, 4, 5)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 2;
}
}).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(final GroupedObservable<Integer, Integer> value) {
value.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("---",value.getKey()+" "+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码执行结果如下:
06-04 22:47:41.850 21780-21780/ E/---: 1 1
06-04 22:47:41.850 21780-21780/ E/---: 0 2
06-04 22:47:41.850 21780-21780/ E/---: 1 3
06-04 22:47:41.850 21780-21780/ E/---: 0 4
06-04 22:47:41.850 21780-21780/ E/---: 1 5
window()
用法:
public final Observable<Observable<T>> window(long count)
public final Observable<Observable<T>> window(long count, long skip)
...
count:事件数量
作用:
发送指定数量事件时,就将这些事件分为一组。
代码:
Observable.just(1,2,3,4,5).window(3).subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","---onSubscribe");
}
@Override
public void onNext(final Observable<Integer> value) {
value.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","value---onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("---","value---"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("---","value---onError");
}
@Override
public void onComplete() {
Log.e("---","value---onComplete");
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("---","---onComplete");
}
});
上述代码代表每发送3个事件为一组,执行结果如下:
06-04 22:59:16.776 30167-30167/ E/---: ---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---1
06-04 22:59:16.777 30167-30167/ E/---: value---2
06-04 22:59:16.777 30167-30167/ E/---: value---3
06-04 22:59:16.777 30167-30167/ E/---: value---onComplete
06-04 22:59:16.777 30167-30167/ E/---: value---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---4
06-04 22:59:16.777 30167-30167/ E/---: value---5
06-04 22:59:16.777 30167-30167/ E/---: value---onComplete
06-04 22:59:16.777 30167-30167/ E/---: ---onComplete