RxJava(10-操作符原理&自定义操作符)

时间:2021-06-23 12:38:05

转载请标明出处:

http://blog.csdn.net/xmxkf/article/details/51791120

本文出自:【openXu的博客】

目录:

  通过前面一系列操作符的学习,我们基本上了解了RxJava中的操作符,并大概知道他们有什么作用。Observable中实现了很多自带的操作符,能够实现丰富多彩的变化操作。比如创建操作符能够构建出发射不同数据类型、数据数量及发射时间的Observable,变换操作能将原始Observable发射的数据做一些变换后发射出去等。

  对于平时基本的使用,这些操作符已经足够我们了。那为什么还要写这篇文章?目的在于让我们更清楚的了解前面学习的那些操作符内部是怎么实现的,还能让我们定义出现有操作符不能满足的一些变换以供我们使用。

根据操作符的作用和形式,操作符可以分为三类:

  • 创建操作符
  • 数据序列操作符
  • 对Observable整体变换

1. 自定义创建操作符

  Observable封装了很多创建操作符供我们使用,比如:

  • Form

    它可以将某个对象转化为Observable对象,然后依次将其内容发射出去,比如将一个数组转化为Observable对象,然后依次发射数据中的值
  • Interval

    每隔一定的时间间隔发射一个整数(从0开始的整数序列)

  所有的创建操作符内部都是使用create()方法,如果现有的操作符不能满足我们的需求,我们可以自定义创建操作符,比如现在有一个需求,每隔指定的时间间隔发射一个整数数组中的数据,而不是整数序列,相当于上面两个操作符的合体。对于一些需求我们可以通过现有的操作符进行一系列变化后达到我们预期的效果,但这里只是举一个例子,目的就是理解自定义创建操作符。

  如果你的操作符是被用于创造一个Observable,而不是变换或者响应一个Observable,使用 create()方法,不要试图手动实现 Observable。create()创建操作符是最基本的操作符,它创建的Observable是一个空的Observable,不发射任何数据,需要我们重写call()方法发射数据。

  对于create操作符,在第一篇介绍操作符的博客中已经讲解过,这里就不再赘述,主要注意几点:

  • 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法;
  • 建议你在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。

示例代码:

//每隔指定的时间间隔发射一个整数数组中的数据,而不是整数序列
int[] datas = new int[]{2,5,3,1,7,4,8,3,2};
int sleepTime = 1000;
Observable<Integer> obs = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try{
            for(int data : datas){
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(data);
                }else{
                    return;
                }
                Thread.sleep(sleepTime);
            }
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }catch (Exception e){
            if (!subscriber.isUnsubscribed()) {
                subscriber.onError(e);
            }
        }
    }
});
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Log.v(TAG, "start time:" + sdf.format(new Date()));
obs.subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            Log.v(TAG, "onCompleted");

        }
        @Override
        public void onError(Throwable e) {
            Log.v(TAG, "onError:"+e.getMessage());
        }
        @Override
        public void onNext(Integer integer) {
            Log.v(TAG, "onNext:"+integer+" time:"+sdf.format(new Date()));
        }
    });
/*
输出:
start time:16:16:49
onNext:2 time:16:16:49
onNext:5 time:16:16:50
onNext:3 time:16:16:51
onNext:1 time:16:16:52
onNext:7 time:16:16:53
onNext:4 time:16:16:54
onNext:8 time:16:16:55
onNext:3 time:16:16:56
onNext:2 time:16:16:57
onCompleted
 */

2. 数据序列操作符(lift)

  如果你需要对Observable发射的每一项数据做一些处理(变换),也就是对Observable的数据序列的每一项做处理,你可以使用lift()。Observable中的变换操作符内部都是使用的lift()。

①. 源码分析

  在学习自定义操作符之前,我们不妨了解了解操作符的原理,就拿最简单的变换操作符map举例,跟踪源码,你会发现”卧槽,怎么这么复杂?”,感觉转去转来就晕了,其实跟踪源码时,我们不用去管一些参数、泛型之类的干扰条件,只需要抓住几个主要的类和方法,弄清楚他们的作用即可。下图就是通过分析一段代码得出的流程思路图:

代码

Observable.just(1,2,3)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "map:"+integer;
            }
        })
        .subscribe(str->Log.v(TAG, "onNext:"+str));

分析思路图

    RxJava(10-操作符原理&自定义操作符)

相关类以及它们的作用:

  • Observable:被观察者,他的作用就是发射数据
  • OnSubscribe:其实真正发射数据的是OnSubscribe,它就像是一个任务计划表,控制数据什么时候发射,什么时候结束。每个Observable中都会有一个这样的计划表。
  • Subscriber:观察者,用来接收Observable发射的数据
  • Operator:创建出一个新的观察者,让这个观察者拦截原始Observable发射的数据,然后通过功能函数Func1将数据做变换,再发给目标观察者。它的作用就像为我们的观察者请了一个饮食保镖,当食物(数据)到来时,保镖先尝一口看有没有放毒(数据变换),然后将尝过的食物给老观察者。

重要的方法及其作用:

  • map(Func1):map操作符,作用就是接受一个功能函数,创建一个Operator,并调用lift方法。其实绝大多数操作符的流程都跟map一样,他们都调用left方法
  • lift(Operator ):生成一个新的Observable

结论

  原始Observable调用了某个操作符(方法),这个操作符会调用lift(Operator)生成一个新的Observable,当有观察者Subscriber订阅这个新的Observable时,新的Observable会通知原始Observable开始发射数据。发射的数据会传递给Operator生成的新的观察者,新的观察者收到数据后会调用FuncN功能函数处理数据,然后将处理过的数据发送给目标观察者。

  新生成的Observable就相当于一个代理,它拦截原始Observable发射的数据,然后对数据做一些处理,再发射给观察者。

  调用多次lift()方法的情况如下图,一级一级往上通知直到原始Observable发射数据,数据从最后一个liftSubscriber一级一级向上传递处理,最后发射给目标Subscriber

    RxJava(10-操作符原理&自定义操作符)

②. 自定义序列操作符

  接下来,我们模仿上面map操作符,将一个发射Integer数据的Observable转换成发射StringObservable

示例代码:

//自定义“map”操作符
Observable.just(1,2,3)
        .lift(new Observable.Operator<String, Integer>() {
            @Override
            public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
                return new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer integer) {
                        if(!subscriber.isUnsubscribed()) {
                            subscriber.onNext("CustomMap:" + integer);
                        }
                    }
                    @Override
                    public void onCompleted() {
                        if(!subscriber.isUnsubscribed()) {
                            subscriber.onCompleted();
                        }
                    }
                    @Override
                    public void onError(Throwable e) {
                        if(!subscriber.isUnsubscribed()) {
                            subscriber.onError(e);
                        }
                    }
                };
            }
        }).subscribe(str->Log.v(TAG, "onNext:"+str));;
/*
输出:
onNext:CustomMap:1
onNext:CustomMap:2
onNext:CustomMap:3
 */

上面讲解lift() 的原理只是为了让你深入地理解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map()flatMap() 等)进行组合来实现需求,因为直接使用 lift()非常容易发生一些难以发现的错误。

3. 对Observable整体变换 (compose)

  除了上面的lift()之外,Observable还有一种变换方法compose(Transformer)lift()是对原始Observable的事件序列的每一项做变换,而compose()是将原始Observable自身进行变换成另一个Observable。有人会问lift()不也是产生了一个新的Observable吗?是的,lift()在内部实现确实生成了新的Observable,但是在我们使用上看来更直观的是对每一项数据进行操作,如果不看源码,我们感觉不到产生了新的Observable。举个例子:假如我们有很多个Observable都需要使用一组相同的变换,可以用下面的方式表示:

observable1
        .lift(new Observable.Operator)
        .map(new Func1)
        .subscribe(subscriber1);
observable2
        .lift(new Observable.Operator)
        .map(new Func1)
        .subscribe(subscriber2);
observable3
        .lift(new Observable.Operator)
        .map(new Func1)
        .subscribe(subscriber3);
observable4
        .lift(new Observable.Operator)
        .map(new Func1)
        .subscribe(subscriber4);

感觉上面代码太繁琐,封装一下:

private Observable transAll(Observable observable) {
    return observable
            .lift(new Observable.Operator)
            .map(new Func1);
}
transAll(observable1).subscribe(subscriber1);
transAll(observable2).subscribe(subscriber2);
transAll(observable3).subscribe(subscriber3);
transAll(observable4).subscribe(subscriber4);

  封装之后感觉好多了,这样写起来感觉怪怪的,因为Observable的设计就是”链式调用”,一般形式都是Observable.xx.xx,所以上面的方式违背了Rx的初衷,这时候compose()就派上用场了:

public class AllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
                .lift(new Observable.Operator)
                .map(new Func1);
    }
}
Observable.Transformer transAll = new AllTransformer();
observable1.compose(transAll).subscribe(subscriber1);
observable2.compose(transAll).subscribe(subscriber2);
observable3.compose(transAll).subscribe(subscriber3);
observable4.compose(transAll).subscribe(subscriber4);

Transformer的作用就是将原始Observable转换成另一个新的Observable,下面感受一下:

示例代码:

Observable.Transformer mapAll = new Observable.Transformer<Integer, String>(){
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable.map(integer->{return "map1-"+integer;})
                .map(str->{return "map2-"+str;})
                .map(str->{return "map3-"+str;})
                .map(str->{return "map4-"+str;});
    }
};
Observable.just(1,2).compose(mapAll).subscribe(str->Log.v(TAG, ""+str));
Observable.just(3,4).compose(mapAll).subscribe(str->Log.v(TAG, ""+str));
Observable.just(5,6).compose(mapAll).subscribe(str->Log.v(TAG, ""+str));
Observable.just(7,8).compose(mapAll).subscribe(str->Log.v(TAG, ""+str));
/*
输出:
map4-map3-map2-map1-1
map4-map3-map2-map1-2
map4-map3-map2-map1-3
map4-map3-map2-map1-4
map4-map3-map2-map1-5
map4-map3-map2-map1-6
map4-map3-map2-map1-7
map4-map3-map2-map1-8
 */

  结果怎么这么奇怪?map1不是在最前面,map4不是在最后面吗?不要忘了刚刚讲过,lift()方式的变换是最后一次lift先收到数据(处理数据),然后一次向上,最后发送到目标观察者。看上图↑↑↑↑↑↑

好了,RxJava的操作符就学习到这里,平时多用用,看看源码,相信很快就能掌握它,祝各位学习愉快~

源码下载:

https://github.com/openXu/RxJavaTest