Transforming Observables(转换操作符)
本节介绍转换数据流中数据的方法。在真实世界中, Observable中的数据可以是任意类型的,可能在你的应用中无法直接使用这些数据类型,你需要对这些数据对象进行一些转换。
- map 和 flatMap 是本节中操作函数的基础。 下面是三种转换方式的示意:
- Ana(morphism) T –> IObservable
- Cata(morphism) IObservable –> T
- Bind IObservable –> IObservable
map
最基础的转换函数就是 map。 map 使用一个转换的参数把源Observable 中的数据转换为另外一种类型的数据。返回的
Observable 中包含了转换后的数据。
public final <R> Observable<R> map(Func1<? super T,? extends R> func)
Observable<Integer> values = Observable.just("0", "1", "2", "3")
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
});
values.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer+"");
}
});
结果:
0
1
2
3
源 Observable 发射的为 String 类型数据,而我们需要的是 int 类型,则可以通过 map 把 String 转换为 int。
如果你认为这种转换太简单了, 完全可以在 Subscriber 中完成,这样在设计架构上并不合理,没有有效的区分职责。
代码设计每个部分都有各自的职责,使用 map 可以有效的确保职责清晰。方便后续修改。
cast
cast 是把一个对象强制转换为子类型的缩写形式。 假设源 Observable为 Observable
Observable<String> values = Observable.just("0","1","2","3");
values
.cast(Object.class)
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
log(object.toString()+":"+object.getClass());
}
});
结果:
0:class java.lang.String
1:class java.lang.String
2:class java.lang.String
3:class java.lang.String
如果遇到类型不一样的对象的话,就会抛出一个 error。
如果你不想处理类型不一样的对象,则可以用 ofType
。 该函数用来判断数据是否为 该类型,如果不是则跳过这个数据。
flatMap
map 把一个数据转换为另外一个数据。而 flatMap 把源 Observable 中的一个数据替换为任意数量的数据,可以为 0 个,也可以为无限个。 flatMap 把源 Observable 中的一个数据转换为一个新的 Observable 发射出去。
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)
flatMap 的参数会把 源 Observable 中发射的每个数据转换为一个新的 Observable, 然后 flatMap 再把这些新的 Observable 中发射的数据发射出来。每个新的 Observable 数据都是按照他们产生的顺序发射出来,但是 Observable 之间数据的顺序可能会不一样。
下面通过一个简单的例子来帮助理解 flatMap 。
Observable<Integer> values = Observable.range(1,3);
values
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.range(0,integer);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});;
这里 values 会发射 1 、 2、 3 三个数据。 然后 flatMap 把每个数据变为新的 Observable(Observable.range(0,i)),所以会有 3 个 Observable,这 3个 Observable 分别发射 [0],[0,1] 和 [0,1,2]。最终 flatMap 再把这 3 个新 Observable 发射的数据合并到一个 Observable发射出去。
所以上面的结果如下:
结果:
0
0
1
0
1
2
再看一个示例,把 int 值转换为 字母:
Observable<Integer> values2 = Observable.just(1);
values2
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.just(Character.valueOf((char)(integer+64)));
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
A
上面的示例,用 map 函数实现会更简单,这里是为了说明 flatMap 另外一种功能,如果你发现源 Observable 中发射的数据不符合你的要求,则你可以返回一个 空的 Observable。这就相当于过滤数据的作用, 例如:
Observable<Integer> values = Observable.range(0,30);
values
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
if (0 < integer && integer <= 26)
return Observable.just(Character.valueOf((char)(integer+64)));
else
return Observable.empty();
}
})
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log( e.getMessage().toString());
}
@Override
public void onNext(Object o) {
log(o.toString());
}
});
结果:
A
B
C
...
X
Y
Z
Complete!
上面示例源 Observable 一共发射 0 到 29 这 30个数字。在 flatMap 中判断 如果数字大于 0 并且小于等于26,则转换为字母用 并用 Observable.just 生成新的 Observable;其他数字都返回一个Observable.empty() 空 Observable。
注意,flatMap 是把几个新的 Observable 合并为一个 Observable 返回, 只要这些新的 Observable有数据发射出来, flatMap 就会把数据立刻发射出去。所以如果这些新的 Observable 发射数据是异步的,那么 flatMap返回的数据也是异步的。
下面示例中使用 Observable.interval 来生成每个数据对应的新 Observable,由于 interval 返回的Observable 是异步的,所以可以看到最终输出的结果是每当有 Observable 发射数据的时候, flatMap 就返回该数据。
Observable.just(100, 150)
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(final Integer integer) {
return Observable.interval(integer,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return integer;
}
});
}
})
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
上面的 new Func1<Integer, Observable<?>>()
先把参数integer(这里分别为 100 和 150 这两个数字)转换为 Observable.interval(integer, TimeUnit.MILLISECONDS)
, 每隔 integer 毫秒发射一个数字,这样两个 Observable.interval
都发射同样的数字,只不过发射的时间间隔不一样,所以为了区分打印的结果,我们再用 map(new Func1<Long, Object>()
把结果转换为 integer 。
结果:
100
150
100
100
150
100
可以两个新的 Observable 的数据交织在一起发射出来。
concatMap
- 如果你不想把新 Observable 中的数据交织在一起发射,则可以选择使用 concatMap 函数。
- 该函数会等第一个新的 Observable 完成后再发射下一个 新的 Observable 中的数据。
Observable.just(100, 150)
.concatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(final Integer integer) {
return Observable.interval(integer, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return integer;
}
})
.take(3);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
100
100
100
150
150
150
所以 concatMap 要求新的Observable 不能是无限的,否则该无限 Observable 会阻碍后面的数据发射。为此,上面的示例使用 take 来结束 Observable。
switchMap
switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。
与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!流程图如下:
以下是flatMap、concatMap和switchMap的运行实例对比:
//flatMap操作符的运行结果
Observable.just(10, 22, 34).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;
return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("flatMap Next:" + integer);
}
});
//concatMap操作符的运行结果
Observable.just(10, 22, 34).concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;
return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("concatMap Next:" + integer);
}
});
//switchMap操作符的运行结果
Observable.just(10, 22, 34).switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;
return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("switchMap Next:" + integer);
}
});
结果:
flatMap Next:22
flatMap Next:34
flatMap Next:17
flatMap Next:11
flatMap Next:10
flatMap Next:5
switchMap Next:34
switchMap Next:17
concatMap Next:10
concatMap Next:5
concatMap Next:22
concatMap Next:11
concatMap Next:34
concatMap Next:17
这里from返回的是一个Observable,所以【10,5】、【22,11】、【34,17】都是一个整体。switchMap直接去最后一个,也就是【34,17】,。flatMap在180毫秒之后是【22,11】和【34,17】,200毫秒之后是【10,5】。concatMap就是一个接着一个,按顺序来了。
flatMapIterable
flatMapIterable 和 flatMap 类似,区别是 flatMap 参数把每个数据转换为 一个新的 Observable,而 flatMapIterable 参数把一个数据转换为一个新的 iterable 对象。
例如下面是一个把参数转换为 iterable 的函数:
public static Iterable<Integer> toList(int start, int count) {
List<Integer> list = new ArrayList<>();
for (int i=start ; i<start+count ; i++) {
list.add(i);
}
return list;
}
然后可以这样使用该函数作为 flatMapIterable 的参数:
Observable.range(1, 3)
.flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
return toList(1,integer);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
1
1
2
1
2
3
flatMapIterable 把生成的 3 个 iterable 合并为一个 Observable 发射。
作为 Rx 开发者,我们需要知道在 Rx 中应该使用 Observable 数据流来发射数据而不要混合使用传统的iterable。但是如果你无法控制数据的来源,提供数据的一方只提供 iterable数据,则依然可以直接使用这些数据。flatMapIterable 把多个 iterable 的数据按照顺序发射出来,不会交织发射。
flatMapIterable 还有另外一个重载函数可以用源 Observable 发射的数据来处理新的 iterable 中的每个数据:
Observable.range(1, 3)
.flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
return toList(1,integer);
}
}, new Func2<Integer, Object, Object>() {
@Override
public Object call(Integer integer, Object o) {
return integer * (Integer) o;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
1
2
4
3
6
9
注意,上面的 Func2()
的call(Integer integer, Object o)
中integer参数取值为 源 Observable 发射出来的数据,也就是 1、 2、 3. 而 o参数取值为toList(1,integer)
参数生成的 iterable 中的每个数据,也就是分别为 [1]、[1,2]、[1,2,3],所以最终的结果就是:[11], [12, 22], [13, 23, 33].
buffer
buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。
需要注意的是,一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。
buffer的名字很怪,但是原理很简单,流程图如下:
final int[] items = new int[]{1, 3, 5, 7, 9};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
if (subscriber.isUnsubscribed()) return;
Random random = new Random();
while (true) {
int i = items[random.nextInt(items.length)];
subscriber.onNext(i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
observable.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});
结果:
[3, 1, 7]
[5, 9, 7]
[1, 5, 3]
[7, 1, 5]
...
buffer有很多重载的方法:
public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector)
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler)
public final <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector)
public final <B> Observable<List<T>> buffer(Observable<B> boundary)
public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity)
window
window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。
window操作符有很多个重载方法,这里只举一个简单的例子,其流程图如下:
Observable.interval(1, TimeUnit.SECONDS).take(12)
.window(3, TimeUnit.SECONDS)
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> observable) {
log("subdivide begin......");
observable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Next:" + aLong);
}
});
}
});
结果:
subdivide begin……
Next:0
Next:1
subdivide begin……
Next:2
Next:3
Next:4
subdivide begin……
Next:5
Next:6
Next:7
subdivide begin……
Next:8
Next:9
Next:10
subdivide begin……
Next:11
scan
scan 和 reduce 很像,不一样的地方在于 scan会发射所有中间的结算结果。
public final Observable<T> scan(Func2<T,T,T> accumulator)
通过上图可以看到和 reduce 的区别, reduce 只是最后把计算结果发射出来,而 scan 把每次的计算结果都发射出来。
Observable<Integer> values = Observable.range(0, 5);
values.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
})
// .takeLast()//实现reduce
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Sum:Complete!");
}
@Override
public void onError(Throwable e) {
log("Sum:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Sum:" + integer);
}
});
结果:
Sum: 0
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: Complete!
reduce 可以通过 scan 来实现: reduce(acc) = scan(acc).takeLast() 。所以 scan 比 reduce 更加通用。
- 源 Observable 发射数据,经过 scan 处理后 scan 也发射一个处理后的数据,
- 所以 scan 并不要求源 Observable 完成发射。
- 下面示例实现了 查找已经发射数据中的最小值的功能:
Subject<Integer, Integer> values = ReplaySubject.create();
values
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Values:Complete!");
}
@Override
public void onError(Throwable e) {
log("Values:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Values:" + integer);
}
});
values
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return (integer<integer2) ? integer : integer2;
}
})
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Min:Complete!");
}
@Override
public void onError(Throwable e) {
log("Min:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Min:" + integer);
}
});
values.onNext(2);
values.onNext(3);
values.onNext(1);
values.onNext(4);
values.onCompleted();
结果:
Values: 2
Min: 2
Values: 3
Values: 1
Min: 1
Values: 4
Values: Completed
Min: Completed
groupBy
groupBy 是 toMultimap 函数的 Rx 方式的实现。groupBy 根据每个源Observable 发射的值来计算一个
key, 然后为每个 key 创建一个新的 Observable并把key 一样的值发射到对应的新 Observable 中。
public final <K> Observable<GroupedObservable<K,T>> groupBy(Func1<? super T,? extends K> keySelector)
- 返回的结果为 GroupedObservable。 每次发现一个新的key,内部就生成一个新的 GroupedObservable并发射出来。和普通的 Observable 相比 多了一个 getKey 函数来获取 分组的 key。来自于源Observable中的值会被发射到对应 key 的 GroupedObservable 中。
- 嵌套的 Observable 导致方法的定义比较复杂,但是提供了随时发射数据的优势,没必要等源Observable 发射完成了才能返回数据。
- 下面的示例中使用了一堆单词作为源Observable的数据,然后根据每个单词的首字母作为分组的 key,最后把每个分组的 最后一个单词打印出来:
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.subscribe(new Action1<GroupedObservable<Object, String>>() {
@Override
public void call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
objectStringGroupedObservable.last().subscribe(new Action1<String>() {
@Override
public void call(String s) {
log( objectStringGroupedObservable.getKey() +":" + s);
}
});
}
});
t:third
f:fifth
s:sixth
上面的代码使用了嵌套的 Subscriber,但Rx 功能之一就是为了避免嵌套回调函数,所以下面演示了如何避免嵌套:
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.flatMap(new Func1<GroupedObservable<Object, String>, Observable<?>>() {
@Override
public Observable<?> call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
return objectStringGroupedObservable.last().map(new Func1<String, Object>() {
@Override
public Object call(String s) {
return objectStringGroupedObservable.getKey() +":" + s;
}
});
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
s: sixth
t: third
f: fifth
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 转换数据流 - 云在千峰
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET