RxJava 源码分析之 —— lift 变换

时间:2022-07-07 19:37:03

写在前面


rxjava 一直很火,我也用了一段时间,感觉特别好用。它属于响应式编程(Reactive Programming,以下简称 RP),脱胎于观察者模式。两者的对比如下:

  • 观察者模式:observable -> observer
  • 响应式编程:observable -> lift1 -> lift2 ->… ->observer

可以看到,RP 的特点是在观察的基础上,加入了传播路径上的变换(lift)
这使得我们可以把数据变换逻辑提取出来,而不必全部杂糅在 observer 里。

我们称变换为 lift,是因为在 rxjava 里面,所有的变换都由一个叫 lift() 的函数实现。
具体而言,它可以是 map(), flatMap(), filter(), groupBy() 等等任何变换。

前戏也做足了,有请我们的主角 lift。
以下内容假定你已经熟悉 rxjava 的基本使用。
如不熟悉,可以先参看简单的介绍:

RxJava 初探(网络请求)


一个简单的流程


场景设计

下边要分析源码了,从简易的场景说起吧:

  1. 创建一个 Observable,它包含了数据 “hello rxjava.”。
  2. 然后订阅之,用 println() 打印出来。


我们按着数字 1~6 走一下流程:

    // 变量以 Str 结尾时为了提示该变量的范型是 String, 下同。。。
private Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) { // 3: 接收来自 subscribe()的参数,即 mSubscriberStr
subscriber.onNext("hello rxjava."); // 4: 生成数据 "hello rxjava.", 下发给订阅者 mSubscriberStr
}
};

private Subscriber<String> mSubscriberStr = new Subscriber<String>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(String string) { // 5: 接收来自 mOnSubscribeStr 的数据
System.out.println(string); // 6: 打印
}
};

private void testSubscribe(){
Observable.create(mOnSubscribeStr) // 1: 传入 mOnSubscribeStr, 保存之, 相当于传入一个 Callback
.subscribe(mSubscriberStr); // 2: 传入 mSubscriberStr , 由 OnSubscribe.call() 接收
}


源码分析

有几点要注意:

  1. 第一步中,mOnSubscribeStr 会被 Observable 保存下来,具体请看源码。

  2. 第二步中, subscribe() 里的 mSubscriberStr 参数最终会被传递到 3 处:OnSubscribe.call() 里面


所以,我们有理由相信,subscribe() 的实现基本是:mOnSubscribeStr.call( mSubscriberStr )
事实上确实如此,各种重载版本的 subscribe(), 最终辗转调用
Observable.subscribe(subscriber, observable)。我们来分析一下它:

// subscribe 内部实现
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
// 看这行:这个 hook 基本**啥都不做**,返回一个 observable.onSubscribe,
// 然后立马 .call(subscriber),和我们的猜想一致。

// 至于这个奇怪的 hook,按源码说的,包裹一层 hook 是为了 intercept and/or decorate
// 反正这个 hook 之后还会反复出现,传入啥就传出啥,直接忽视即可
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) { ... }
}


变换


场景设计

到此为止,一个简单的流程走完了,一切都很完美。那么我们引入变换吧,先构造一个场景:

  1. 创建一个 Observable,它包含了数据 “hello rxjava.”。

  2. 做一个变换 string -> string.length()

  3. 然后订阅之,用 println() 打印出来。
    Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava.");
}
};

private Func1<String, Integer> mFuncStrToInt = new Func1<String, Integer>() {
@Override
public Integer call(String string) {
return string.length(); // 1: string -> string.length()
}
};

private Subscriber<Integer> mSubscriberInt = new Subscriber<Integer>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(Integer length) {
System.out.println(length);
}
};

private void testMap(){
Observable.create(mOnSubscribeStr)
.map(mFuncStrToInt) // 2: 把映射的 function 传进去,最后其实传给了 lift()
.subscribe(mSubscriberInt);
}


山寨实现 map

这次就不走流程了,大体上是在之前的流程上插入了一段 map()。我们来想想怎么实现这个 map 呢?

咱们列一下需求,然后试着山寨一把,实现一个 myMap():

  1. map 调用之后需要改变范型(String -> Integer),
    这意味着我们需要 new 一个 Observable< Integer > 出来。

  2. new 了一个 ObservableInt 之后,我们的 subscribe 是针对这个新的 ObservableInt 的。
    我们必须与旧的 ObservableStr 建立起联系(让新的 ObservableInt 去订阅旧的 ObservableStr)。


实现如下,跟着流程 1~7 走一遍:

    // ---------- testMyMap ------------
private static void testMyMap(){
System.out.println("testMyMap: ");

myMap(mObservableStr, mFuncStrToInt) // 等价于 mObservableStr.map(mFuncStrToInt)
.subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收

System.out.println("");
}

// 山寨版实现 -- myMap()
private static Observable<Integer> myMap(final Observable<String> observableStr
, final Func1<String, Integer> func){

Observable.OnSubscribe<Integer> onSubscribeInt = new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriberInt) { // 2: 接收 mSubscriberInt

Subscriber<String> subscriberStr = new Subscriber<String>() { // 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() {
subscriberInt.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriberInt.onError(e);
}

@Override
public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
Integer length = func.call(string); // 6: 变换
subscriberInt.onNext(length); // 7: 下发新数据给下游
}
};

observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
// 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
}
};

return Observable.create(onSubscribeInt);
}
// ---------- testMyMap ------------


看下输出效果:和官方的 map() 效果一致

testSubscribe: 
hello rxjava.

testMap:
length = 13

testMyMap:
length = 13


总结一下:

  1. 每次变换会 new 出一个 Observable,它们串成一个数据流

  2. 下游 create 的时候,会 subscribe 上游,上下游之间是一种 订阅 —— 发布 关系。

  3. 最重要的一点,对于 map() 来说,责任只是配置,即设置好上/下游之间的监听
    无论步骤 4 的 subscribe(),还是步骤 7 的 onNext() 都是嵌在回调里边
    如果调用完 map() 而不去调用subscribe(),步骤 4 和 7 都不会被执行到 !!!
    打个比方,map 就像是把片子都下好了放在那里,你去不去看(subscribe)不关它的事= =。

感觉是一种奇怪的责任链模式每个 Observable 只和前/后继(上/下游)是低耦合关系,与其他对象毫无关联。


官方 map 源码

// map
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func)); // OperatorMap: 等于我们的步骤 5~7,变换、下发数据
}

// 先看 OperatorMap
// Operator<R, T>接口: 基本是把 Subscriber<R> 转换成 Subscriber<T>
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
}

// 这里的 <T, R> 故意反着传进去,有点绕体会一下
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;

public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) { // 等于我们的步骤 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() { o.onCompleted(); }

@Override
public void onError(Throwable e) { o.onError(e); }

@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t)); // 等于我们的步骤 5~7,变换、下发数据
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}

// 再看 lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) { // 等于我们的步骤 2: 参数 o 接收 mSubscriberInt
try {
Subscriber<? super T> st = hook.onLift(operator).call(o); // 等于我们的步骤 3:
try {
...
this.onSubscribe.call(st); // 等于我们的步骤 4: 订阅上游

} catch (Throwable e) { ... }
} catch (Throwable e) { ... }
}
});
}


盗了张图来,有关这个回溯下发的流程,生动又形象:

  • 每行的 liftnew 了一个 Observable,即每行的 Observable 都是不同的对象,从上到下为(上游 ->下游
  • 向上的箭头:回溯,对应步骤 4: subscribe()
  • 向下的箭头:下发,对应步骤 7: onNext()

RxJava 源码分析之 —— lift 变换

参考


RxJava基本流程和lift源码分析
快速理解RxJava源码的设计理念

源码


以下是本文测试代码的汇总,请放心食用(有个 main() 函数,直接能 run 起来)。

package com.example.jinliangshan.littlezhihu.home.rxjava;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/**
* Created by jinliangshan on 16/9/13.
* 注意变量命名, 成员变量带前缀 m, 局部变量不带
*/

public class RxjavaTest {

public static void main(String args[]){
testSubscribe();
// testMySubscribe();
testMap();
testMyMap();
}

// ---------- test ------------
private static Observable.OnSubscribe<String> mOnSubscribeStr = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava.");
}
};

private static Observable<String> mObservableStr = Observable.create(mOnSubscribeStr); // 数据源

private static Subscriber<String> mSubscriberStr = new Subscriber<String>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(String string) {
System.out.println(string);
}
};

private static void testSubscribe(){
System.out.println("testSubscribe: ");

mObservableStr
.subscribe(mSubscriberStr);

System.out.println("");
}
// ---------- test ------------

// ---------- testMySubscribe ------------
private static void testMySubscribe(){
System.out.println("testMySubscribe: ");

mySubscribe(mObservableStr, mSubscriberStr);

System.out.println("");
}

private static void mySubscribe(Observable<String> observableStr, Subscriber<String> subscriberStr){
mOnSubscribeStr.call(subscriberStr);
}
// ---------- testMySubscribe ------------

// ---------- testMap ------------
private static Func1<String, Integer> mFuncStrToInt = new Func1<String, Integer>() {
@Override
public Integer call(String string) {
return string.length();
}
};

private static Subscriber<Integer> mSubscriberInt = new Subscriber<Integer>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(Integer length) {
System.out.println("length = " + length);
}
};

private static void testMap(){
System.out.println("testMap: ");

mObservableStr
.map(mFuncStrToInt)
.subscribe(mSubscriberInt);

System.out.println("");
}
// ---------- testMap ------------

// ---------- testMyMap ------------
private static void testMyMap(){
System.out.println("testMyMap: ");

myMap(mObservableStr, mFuncStrToInt) // 等价于 mObservableStr.map(mFuncStrToInt)
.subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收

System.out.println("");
}

private static Observable<Integer> myMap(final Observable<String> observableStr, final Func1<String, Integer> func){

Observable.OnSubscribe<Integer> onSubscribeInt = new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriberInt) { // 2: 接收 mSubscriberInt

Subscriber<String> subscriberStr = new Subscriber<String>() { // 3: 包裹 subscriberInt, 转换成 subscriberStr
@Override
public void onCompleted() {
subscriberInt.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriberInt.onError(e);
}

@Override
public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
Integer length = func.call(string); // 6: 变换
subscriberInt.onNext(length); // 7: 下发新数据给下游
}
};

observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
// 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
}
};

return Observable.create(onSubscribeInt);
}
// ---------- testMyMap ------------
}