Rxjava 2.x 源码系列 - 变换操作符 Map(上)
前言
在前几篇博客中,我们介绍了 Rxjava Observable 与 Observer 之间是如何订阅与取消订阅的,以及 Rxjava 是如何控制 subsribe 线程和 observer 的回调线程的。
今天,让我们一起来看一下 Rxjava 中另外一个比较重要的功能,操作符变化功能
基础知识
常用的变换操作符
操作符 | 作用 |
---|---|
map | 映射,将一种类型的数据流/Observable映射为另外一种类型的数据流/Observable |
cast | 强转 传入一个class,对Observable的类型进行强转. |
flatMap | 平铺映射,从数据流的每个数据元素中映射出多个数据,并将这些数据依次发射。(注意是无序的) |
concatMap | concatMap 与 flatMap 的功能非常类似,只不过发送的数据是有序的 |
buffer | 缓存/打包 按照一定规则从Observable收集一些数据到一个集合,然后把这些数据作为集合打包发射。 |
groupby | 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 |
to… | 将数据流中的对象转换为List/SortedList/Map/MultiMap集合对象,并打包发射 |
timeInterval | 将每个数据都换为包含本次数据和离上次发射数据时间间隔的对象并发射 |
timestamp | 将每个数据都转换为包含本次数据和发射数据时的时间戳的对象并发射 |
从 Demo 说起
接下来,我们一起来看一下一个 demo,我们通过 map 操作符将 Integer 转化为 String。
// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 被观察者发送事件 = 参数为整型 = 1、2、3
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
}
// 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
输出结果
使用 Map变换操作符 将事件1的参数从 整型1 变换成 字符串类型1
使用 Map变换操作符 将事件2的参数从 整型2 变换成 字符串类型2
使用 Map变换操作符 将事件3的参数从 整型3 变换成 字符串类型3
map 源码分析
- 借鉴前面几篇博客的分析,我们先来看一下 Observable 的 map 方法,它的套路跟 create 方法的套路也是相似的,判空是否为 null,为 null 抛出异常。
- 接着,用一个包装类包装当前的 Observable 实例,只不过这个包装类是 ObservableMap。在 ObsevableMap 里面持有上游 observable 实例的引用,这个是典型的装饰者模式. 关于装饰者模式,可以参考我的这一篇博客。装饰者模式及其应用
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
接下来,我们一起来看一下 ObservableMap。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}
在前面博客中,我们已经说到,当我们调用 observable.subscribe(observer) 的时候,代码调用逻辑是这样的。
在 observable 的 subscribeActual 方法中
- 如果有上游的话,会调用上游的 subscribe 方法(即 source.subscribe() 方法),而在 subscribe 方法中,又会调用当前 observable 的 subcribeActual 方法
- 如果没有上游的话,会直接调用当前 Observable 的 subscirbe 方法,并调用 observable 的 onSuscribe 方法
在 ObservableMap 的 subscribeActual 方法里面,MapObserver 类对 Observer 进行包装,又是这样的套路,装饰者模式。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
// 1 判断是否 done,如果已经 done ,直接返回
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 2 调用 mapper.apply(t) ,进行相应的转化
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 3 调用下游的 onNext 方法,并将 V 暴露出去
actual.onNext(v);
}
----
}
首先看他的构造方法,有两个参数, actual,mapper。 actual 代表下游的 Observer,mapper 为传入的 Function。
接着我们来看下 onNext 方法
- 判断是否 done,如果已经 done ,直接返回
- 调用 mapper.apply(t) ,进行相应的转化
- 调用下游的 onNext 方法,并将 V 暴露出去
这样就完成了操作符的操作功能
总结
OK,我们在回到上面的 demo,来整理一下他的流程
当我们调用 observable.subscribe(observer) 的时候
- 会促发第二个 Observable 的 subscribeAtActual 方法,在该方法中,又会调用上游 Observable 的 subscribe 方法,即第一个 Observable 的 subscribe 方法
- 在第一个 Observable 的 subscribe 方法里面,又会调用当前 Observable 的 subscribeAtActual 方法,会调用 observer.onSubscribe(parent) 方法,并调用 source.subscribe(parent) 将我们的 observer 的包装类 parent 暴露出去
- 当我们在我们创建的 ObservableOnSubscribe 的 subscribe 方法中,调用 emitter 的 onNext 方法的时候,这个时候会调用到我们的 MapObserver 的 onNext 方法
- 在 MapObserver 的 onNext 方法,有会调用到下游 Observer 的 onNext 方法,进而调用我们外部的 observer 的 onNext 方法
小结
- map 的操作过程跟之前的线程切换的实现原理基本一样,通过在中间使用装饰者模式插入一个中间的 Observable 和 Observer,你可以想象为代理。
- 代理 Observable 做的事就是接收下游 Obsever 的订阅事件,然后通过代理 Obsever 订阅上游 Observer,然后在上游 Observer 下发数据給代理 Observer 时,通过先调用 mapper.apply 转换回调函数获得转换后的数据,然后下发给下游 Obsever。