rxjava笔记:关于lift

时间:2021-11-10 16:03:04

正在看rxjava,看到lift,在阅读了源码和网上的一些文章,整理了下思路。下文着重不是直接分析源代码,而是从lift解决什么问题和如何解决角度分析lift应该是做什么/怎么做的问题。具体源码实现请参考rxjava,网上很多文章分析的很详细。

 

Observable的本质上就是异步获取/加工数据(OnSubscribe的call方法),然后通知observer(Observer的几个方法)的一个框架。每个Observable都有一个OnSubscribe(继承Action1接口)对象。在调用Observable的subscribe方法创建,一旦subscribe后,Observable就开始工作。

举例来说,对于一个Observalbe<JSONObject>的对象,可以看作它最总是发射JSONObject数据,要求下游提供一个Subscriber<JSONObject>(Subscriber实现了Observer)来接收数据,而Subscriber<JSONObject>则放在OnSubscribe的参数。

 

rxjava中的lift是各种操作符的核心所在,具体操作符提供不同的如map,filter等效果。lift的代码设计比较精细,其实只要理解了上面Observable的本质,lift的实现也就迎刃而解了。

刚才讲Observable要获取/加工数据,那么它是怎么获取/加工数据呢,方式很多,如最基本的例子

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext(new String());
            }
        });

这个是最简单的,但是new String()可说是一个“获取数据的例子”,当然这样写毫无意义。而可能的一种实现是网络获取如

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(final Subscriber<? super String> subscriber) {
        RemoteApi.getInstance().getCurrentUserName(new Callback<String>(){
            public void onSuccess(String username){
                subscriber.onNext(username);
                subscriber.onCompleted();
            }

            public void onFail(int code, String detail){
                subscriber.onError(new Exception(detail));
            }
        });
    }
});

这都很好理解。

而lift本质是一个Observable数据是从另一个Observable获取应该怎么处理呢?一言不合直接先上代码

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

我们来分析下。

Observable1(O1)调用lift返回Observable2(这是个新的对象O2),此时O2要从O1获取数据,O2是消费者,O1是生产者

O2调用另外一个O1获取数据实际上要做3件事

1. 让O1开始获取数据

2. 获取数据后,发射给O2。

3. O2得到数据后,要发射给O2的消费者

先看第1,如何让O1开始获取数据?记得开始我们所讲的Observable,它有个OnSubscribe对象,它的call方法是获取数据的地方。因此,很简单,调用该方法。

不过,“调用该方法”这么简单一句话一般是产品经理的说得,作为一个程序员,当你脑子想到“调用该方法”时候,就需要落实到实现:要用到的对象从哪来,方法参数是什么,方法参数从哪来,是否有返回值,返回值怎么处理等。

在这里,O2就是通过调用O1的OnSubscribe对象的call方法让O1开始工作的。O1的OnSubscribe对象是在创建O2是传入的,代码清晰可见。

OnSubscribe的call对象要接收一个O2的Subscriber对象,这个就是我们关注的第二件事:“获取数据后,通知Observable2”。

而这里O2传给O1 OnSubscribe对象的Subscriber对象从哪来的?这就是lift的参数Operator的作用了。Operator就是负责提供给生产者(O1)监听回调Subscriber的作用,它实际是泛型为Subscriber的Func1的接口。不同的操作符实质是不同的Operator。比如map方法是OperatorMap,filter的是OperatorFilter,observeOn的是OperatorObserveOn(同样形式实现了线程切换,NB吧)

 

因此一个调用流程是

1. 第一步:O2的OnSubscribe的call -> 第二步:O2用operator构造Subscriber -> 第三步:O2用该Subscriber调用O1的OnSubscribe的call

    按照这三步依次上溯,直到最后一个没有parent的Observable

2. *Observable获取数据,调用下游Observable传来的Subscriber发射数据

3. 如果该Subscriber是你写的(通过subscribe方法),这个就结束了;如果是级联Observer,则

4. 上一步的Subscriber是O2的Operator构造出来,这个Subscriber一个任务,就是对收到的数据进行处理,然后在通知O2的下游消费者(因为下游消费者的Subscriber对象会保存在operator返回的Subscriber中)

5. 如此,2,4,反复调用,直至到第三步,一切game over

 借用扔物线文章的图片如下

rxjava笔记:关于lift