RxJava 之四—— Lift()详解

时间:2022-04-30 17:45:52

转载请标明出处:http://blog.csdn.net/xx326664162/article/details/53611268 文章出自:薛瑄的博客

关于RxJava,从表面上看起来很容易使用,但是如果理解不够深刻,使用过程中,往往会出现一些问题,所以我写了五篇文章,从入门到精通,从简单的使用到部分源码详解,希望能给读者一个质的飞跃:

关于RxJava,从表面上看起来很容易使用,但是如果理解不够深刻,使用过程中,往往会出现一些问题,所以我写了五篇文章,从入门到精通,从简单的使用到部分源码详解,希望能给读者一个质的飞跃:
1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用它的好处
2、RxJava之二——Single和Subject 与Observable举足轻重的类,虽然用的少,但应该知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4、RxJava之四—— Lift()详解 想要了解Operators,Lift()一定要学习
5、RxJava之五—— observeOn()与subscribeOn()的详解Scheduler线程切换的原理
6、RxJava之六——RxBus 通过RxJava来替换EventBus

RxJava最让人兴奋的就是它有各种各样的操作符,什么map呀,flatMap呀各种,我们今天要知其然知其所以然,那么他们是如何实现功能的呢?

下面通过一个例子,逐步深入分析。最后面还会再进行一次总结

例子:

代码块一:

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "word";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.d("rx", s);
}
});

变换的原理:lift()

这些变换map(),flatMap()虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。

我们先看下进行链式调用map之后,发生了什么。

代码块二:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}

对,就是调用了lift()!,先来看一下OperatorMap这个类是个什么东西,看似有点多,其实很简单:

代码块三:

public final class OperatorMap<T, R> implements Operator<R, T> {

final Func1<? super T, ? extends R> transformer;

public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}

static final class MapSubscriber<T, R> extends Subscriber<T> {

final Subscriber<? super R> actual;

final Func1<? super T, ? extends R> mapper;

boolean done;

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void onNext(T t) {
R result;

try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}

actual.onNext(result);
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPluginUtils.handleException(e);
return;
}
done = true;

actual.onError(e);
}


@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}

@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}

使用传递进来的Func1参数,生成一个Subscriber类型的类,调用OperatorMap的call()函数,将返回这个类

代码块四:

分析一下lift()函数,主要就是使用代码块一中,map()函数中的Func1()来生成一个新的Observable,后续的操作使用这个新的Observable。这里知道一下,会有两个Observable对象

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

来看一下这个新的Observable使用的OnSubscribe

代码块五:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

final OnSubscribe<T> parent;

final Operator<? extends R, ? super T> operator;

public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}

@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}

分析一下这段代码:

  1. 第 17 行,Subscriber<? super T> st = hook.onLift(operator).call(o);这里的call(),其实就是代码块三中(第九行)的call()函数,返回的是一个Subscriber的父类
  2. 第 21 行,parent.call(st);这里的parent是第一个Observable的onSubScribe

先记住这里会有两个Observable,下面会详细分析。

subscribe()源码

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

代码块六

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}

对整个过程进行详解:

  • Observable.subscribe() 源码(代码块六)第三行的 onSubscribe 指的是 Observable 中的 onSubscribe 对象(在Observable.create()时传入的),但是 lift() 之后的情况就复杂了点。

  • 当含有 lift() 时:

    1. lift() 创建了一个 Observable ,我们把它取名为后Observable$2,加上之前的原始 Observable,同样把它取名为Observable$ 1,已经有两个 Observable 了;
    2. 同样地,Observable$2 里的新 OnSubscribe$2 加上之前的 Observable$1 中的原始 OnSubscribe$1,也就有了两个 OnSubscribe;
    3. 当用户调用经过 lift() ,再调用subscribe(), 使用的是Observable$2.subscribe() 。因此 subscribe()源码(代码块六)中的onSubscribe.call(subscriber),也是调用Observable$2 .OnSubscribe$2对象(即在 lift() 中创建Observable时新创建的OnSubscribe(代码块五的这个对象));
      • onSubscribe$2.call() 方法中的parent (onSubscribe的父类 ),是Observable$ 1 .onSubscribe$1 。

      • parent.call(st);(代码块五第 行)使用的是Observable$ 1 .onSubscribe$1(任务计划列表),执行新的任务newSubscriber。

这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

RxJava 之四—— Lift()详解

网上关于这部分的理解,很多博客都使用了这篇博客给 Android 开发者的 RxJava 详解的图,虽然其他部分讲的不错,但是画出的图并不是很贴切代码,理解起来很费劲。所以这类没有引用

参考:谜之RxJava (二) —— Magic Lift