写在前面
rxjava 一直很火,我也用了一段时间,感觉特别好用。它属于响应式编程(Reactive Programming,以下简称 RP),脱胎于观察者模式。两者的对比如下:
- 观察者模式:observable -> observer
- 响应式编程:observable -> lift1 -> lift2 ->… ->observer
可以看到,RP 的特点是在观察的基础上,加入了传播路径上的变换(lift)。
这使得我们可以把数据变换逻辑提取出来,而不必全部杂糅在 observer 里。我们称变换为 lift,是因为在 rxjava 里面,所有的变换都由一个叫 lift() 的函数实现。
具体而言,它可以是 map(), flatMap(), filter(), groupBy() 等等任何变换。前戏也做足了,有请我们的主角 lift。
以下内容假定你已经熟悉 rxjava 的基本使用。
如不熟悉,可以先参看简单的介绍:
一个简单的流程
场景设计
下边要分析源码了,从简易的场景说起吧:
- 创建一个 Observable,它包含了数据 “hello rxjava.”。
- 然后订阅之,用 println() 打印出来。
我们按着数字 1~6 走一下流程:
// 变量以 Str 结尾时为了提示该变量的范型是 String, 下同。。。
private Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) { // 3: 接收来自 subscribe()的参数,即 mSubscriberStr
subscriber.onNext("hello rxjava."); // 4: 生成数据 "hello rxjava.", 下发给订阅者 mSubscriberStr
}
};
private Subscriber<String> mSubscriberStr = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String string) { // 5: 接收来自 mOnSubscribeStr 的数据
System.out.println(string); // 6: 打印
}
};
private void testSubscribe(){
Observable.create(mOnSubscribeStr) // 1: 传入 mOnSubscribeStr, 保存之, 相当于传入一个 Callback
.subscribe(mSubscriberStr); // 2: 传入 mSubscriberStr , 由 OnSubscribe.call() 接收
}
源码分析
有几点要注意:
- 第一步中,mOnSubscribeStr 会被 Observable 保存下来,具体请看源码。
- 第二步中, subscribe() 里的 mSubscriberStr 参数最终会被传递到 3 处:OnSubscribe.call() 里面
所以,我们有理由相信,subscribe() 的实现基本是:mOnSubscribeStr.call( mSubscriberStr )。
事实上确实如此,各种重载版本的 subscribe(), 最终辗转调用到
Observable.subscribe(subscriber, observable)。我们来分析一下它:
// subscribe 内部实现
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
// 看这行:这个 hook 基本**啥都不做**,返回一个 observable.onSubscribe,
// 然后立马 .call(subscriber),和我们的猜想一致。
// 至于这个奇怪的 hook,按源码说的,包裹一层 hook 是为了 intercept and/or decorate
// 反正这个 hook 之后还会反复出现,传入啥就传出啥,直接忽视即可
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) { ... }
}
变换
场景设计
到此为止,一个简单的流程走完了,一切都很完美。那么我们引入变换吧,先构造一个场景:
- 创建一个 Observable,它包含了数据 “hello rxjava.”。
- 做一个变换 string -> string.length()
- 然后订阅之,用 println() 打印出来。
Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava.");
}
};
private Func1<String, Integer> mFuncStrToInt = new Func1<String, Integer>() {
@Override
public Integer call(String string) {
return string.length(); // 1: string -> string.length()
}
};
private Subscriber<Integer> mSubscriberInt = new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer length) {
System.out.println(length);
}
};
private void testMap(){
Observable.create(mOnSubscribeStr)
.map(mFuncStrToInt) // 2: 把映射的 function 传进去,最后其实传给了 lift()
.subscribe(mSubscriberInt);
}
山寨实现 map
这次就不走流程了,大体上是在之前的流程上插入了一段 map()。我们来想想怎么实现这个 map 呢?
咱们列一下需求,然后试着山寨一把,实现一个 myMap():
- map 调用之后需要改变范型(String -> Integer),
这意味着我们需要 new 一个 Observable< Integer > 出来。
- new 了一个 ObservableInt 之后,我们的 subscribe 是针对这个新的 ObservableInt 的。
我们必须与旧的 ObservableStr 建立起联系(让新的 ObservableInt 去订阅旧的 ObservableStr)。
实现如下,跟着流程 1~7 走一遍:
// ---------- testMyMap ------------
private static void testMyMap(){
System.out.println("testMyMap: ");
myMap(mObservableStr, mFuncStrToInt) // 等价于 mObservableStr.map(mFuncStrToInt)
.subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收
System.out.println("");
}
// 山寨版实现 -- myMap()
private static Observable<Integer> myMap(final Observable<String> observableStr
, final Func1<String, Integer> func){
Observable.OnSubscribe<Integer> onSubscribeInt = new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriberInt) { // 2: 接收 mSubscriberInt
Subscriber<String> subscriberStr = new Subscriber<String>() { // 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() {
subscriberInt.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriberInt.onError(e);
}
@Override
public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
Integer length = func.call(string); // 6: 变换
subscriberInt.onNext(length); // 7: 下发新数据给下游
}
};
observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
// 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
}
};
return Observable.create(onSubscribeInt);
}
// ---------- testMyMap ------------
看下输出效果:和官方的 map() 效果一致
testSubscribe:
hello rxjava.
testMap:
length = 13
testMyMap:
length = 13
总结一下:
- 每次变换会 new 出一个 Observable,它们串成一个数据流。
- 下游 create 的时候,会 subscribe 上游,上下游之间是一种 订阅 —— 发布 关系。
- 最重要的一点,对于 map() 来说,责任只是配置,即设置好上/下游之间的监听。
无论步骤 4 的 subscribe(),还是步骤 7 的 onNext() 都是嵌在回调里边。
如果调用完 map() 而不去调用subscribe(),步骤 4 和 7 都不会被执行到 !!!
打个比方,map 就像是把片子都下好了放在那里,你去不去看(subscribe)不关它的事= =。感觉是一种奇怪的责任链模式,每个 Observable 只和前/后继(上/下游)是低耦合关系,与其他对象毫无关联。
官方 map 源码
// map
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func)); // OperatorMap: 等于我们的步骤 5~7,变换、下发数据
}
// 先看 OperatorMap
// Operator<R, T>接口: 基本是把 Subscriber<R> 转换成 Subscriber<T>
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
}
// 这里的 <T, R> 故意反着传进去,有点绕体会一下
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) { // 等于我们的步骤 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() { o.onCompleted(); }
@Override
public void onError(Throwable e) { o.onError(e); }
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t)); // 等于我们的步骤 5~7,变换、下发数据
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
// 再看 lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) { // 等于我们的步骤 2: 参数 o 接收 mSubscriberInt
try {
Subscriber<? super T> st = hook.onLift(operator).call(o); // 等于我们的步骤 3:
try {
...
this.onSubscribe.call(st); // 等于我们的步骤 4: 订阅上游
} catch (Throwable e) { ... }
} catch (Throwable e) { ... }
}
});
}
盗了张图来,有关这个回溯和下发的流程,生动又形象:
- 每行的 lift 都 new 了一个 Observable,即每行的 Observable 都是不同的对象,从上到下为(上游 ->下游)
- 向上的箭头:回溯,对应步骤 4: subscribe()
- 向下的箭头:下发,对应步骤 7: onNext()
参考
RxJava基本流程和lift源码分析
快速理解RxJava源码的设计理念
源码
以下是本文测试代码的汇总,请放心食用(有个 main() 函数,直接能 run 起来)。
package com.example.jinliangshan.littlezhihu.home.rxjava;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
/**
* Created by jinliangshan on 16/9/13.
* 注意变量命名, 成员变量带前缀 m, 局部变量不带
*/
public class RxjavaTest {
public static void main(String args[]){
testSubscribe();
// testMySubscribe();
testMap();
testMyMap();
}
// ---------- test ------------
private static Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava.");
}
};
private static Observable<String> mObservableStr = Observable.create(mOnSubscribeStr); // 数据源
private static Subscriber<String> mSubscriberStr = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String string) {
System.out.println(string);
}
};
private static void testSubscribe(){
System.out.println("testSubscribe: ");
mObservableStr
.subscribe(mSubscriberStr);
System.out.println("");
}
// ---------- test ------------
// ---------- testMySubscribe ------------
private static void testMySubscribe(){
System.out.println("testMySubscribe: ");
mySubscribe(mObservableStr, mSubscriberStr);
System.out.println("");
}
private static void mySubscribe(Observable<String> observableStr, Subscriber<String> subscriberStr){
mOnSubscribeStr.call(subscriberStr);
}
// ---------- testMySubscribe ------------
// ---------- testMap ------------
private static Func1<String, Integer> mFuncStrToInt = new Func1<String, Integer>() {
@Override
public Integer call(String string) {
return string.length();
}
};
private static Subscriber<Integer> mSubscriberInt = new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer length) {
System.out.println("length = " + length);
}
};
private static void testMap(){
System.out.println("testMap: ");
mObservableStr
.map(mFuncStrToInt)
.subscribe(mSubscriberInt);
System.out.println("");
}
// ---------- testMap ------------
// ---------- testMyMap ------------
private static void testMyMap(){
System.out.println("testMyMap: ");
myMap(mObservableStr, mFuncStrToInt) // 等价于 mObservableStr.map(mFuncStrToInt)
.subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收
System.out.println("");
}
private static Observable<Integer> myMap(final Observable<String> observableStr, final Func1<String, Integer> func){
Observable.OnSubscribe<Integer> onSubscribeInt = new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriberInt) { // 2: 接收 mSubscriberInt
Subscriber<String> subscriberStr = new Subscriber<String>() { // 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() {
subscriberInt.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriberInt.onError(e);
}
@Override
public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
Integer length = func.call(string); // 6: 变换
subscriberInt.onNext(length); // 7: 下发新数据给下游
}
};
observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
// 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
}
};
return Observable.create(onSubscribeInt);
}
// ---------- testMyMap ------------
}