Android RxJava2(五)功能操作符

时间:2021-04-04 17:51:28

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的功能操作符

delay()

方法:

public final Observable<T> delay(long delay, TimeUnit unit)

作用:
被观察者延迟发送事件
代码:

        Observable.just(1, 2, 3)
                .delay(5, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("---","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("---", integer + "");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("---","onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("---","onComplete");
                    }
                });

上诉代码将事件延迟5秒发送,执行结果为:

06-07 16:52:56.386 29498-29498/ E/---: onSubscribe
06-07 16:53:01.401 29498-29527/ E/---: 1
    2
    3
    onComplete

doOnEach()

方法:

public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)

作用:
被观察者在每次发送事件之前都会调用这个方法,并且可以通过onNotification获取本次事件的信息
代码:

        Observable.just(1, 2, 3)
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.e("---","integerNotification:"+integerNotification.getValue());
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("---","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("---", integer + "");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("---","onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("---","onComplete");
                    }
                });

打印日志为:

    onSubscribe
    integerNotification:1
    1
    integerNotification:2
    2
    integerNotification:3
    3
    integerNotification:null
    onComplete

doOnNext()

方法:

public final Observable<T> doOnNext(Consumer<? super T> onNext)

作用:
在每次发送onNext()方法之前调用
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---", "accept:" + integer);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    accept:1
    1
    accept:2
    2
    onComplete

doAfterNext()

方法:

public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)

作用:
在每次发送onNext()方法之后调用
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doAfterNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---", "accept:" + integer);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    accept:1
    2
    accept:2
    onComplete

doOnComplete()

方法:

public final Observable<T> doOnComplete(Action onComplete) 

作用:
调用onComplete()之前会调用这个方法
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doOnComplete");
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    2
    doOnComplete
    onComplete

doOnError()

方法:

public final Observable<T> doOnError(Consumer<? super Throwable> onError)

作用:
在调用onError()方法之后会调用这个方法
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e("---","doOnError");
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    doOnError
    onError

doOnSubscribe()

方法:

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)

作用:
每发送 onSubscribe() 之前都会回调这个方法
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e("---","doOnSubscribe");
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    doOnSubscribe
    onSubscribe
    1
    onError

doOnDispose()

方法:

public final Observable<T> doOnDispose(Action onDispose)

作用:
当取消订阅也就是调用 Disposable 的 dispose() 之后回调该方法。
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doOnDispose");
            }
        }).subscribe(new Observer<Integer>() {
            Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
                if(integer == 1){
                    disposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    doOnDispose

doOnLifecycle()

方法:

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)

作用:
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅,第二个参数的回调方法的作用与 doOnDispose() 是一样的
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e("---","doOnLifecycle--accept");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doOnLifecycle--Action");
            }
        }).doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doOnDispose");
            }
        }).subscribe(new Observer<Integer>() {
            Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
                disposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
                disposable.dispose();
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

在doOnLifecycle里面不取消订阅,打印结果如下:

    doOnLifecycle--accept
    onSubscribe
    1
    doOnDispose
    doOnLifecycle--Action

可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。
如果我们在doOnLifecycle里面调用dispose,则打印结果如下:

    doOnLifecycle--accept
    onSubscribe

可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调.

doOnTerminate() & doAfterTerminate()

方法:

public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)

作用:
doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doOnTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doOnTerminate");
            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doAfterTerminate");
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    2
    doOnTerminate
    onComplete
    doAfterTerminate

doFinally()

方法:

public final Observable<T> doFinally(Action onFinally) 

作用:
在所有事件发送完毕之后回调该方法。但是和doAfterTerminate()有一定却别,通过上述代码可知如果取消订阅,则doAfterTerminate不会被调用,但是doFinally()无论怎样都会被调用。
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doAfterTerminate");
            }
        }).doFinally(new Action() {
            @Override
            public void run() throws Exception {
                Log.e("---","doFinally");
            }
        }).subscribe(new Observer<Integer>() {
            Disposable d;
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
                this.d = d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
                d.dispose();
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    doFinally

onErrorReturn()

方法:

public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)

作用:
当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException());
            }
        }).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                return 3;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("---", integer + "");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("---", "onComplete");
            }
        });

打印结果:

    onSubscribe
    1
    2
    3
    onComplete

onErrorResumeNext() & onExceptionResumeNext()

方法:

public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)

作用:
onErrorResumeNext:当收到onError()事件时,返回一个新的Observable,并正常结束事件序列
onExceptionResumeNext:方法功能同onErrorResumeNext一样,只不过是捕获Exception信息
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException());
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                return Observable.just(3,4);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 21:02:54.095 22707-22707/ E/---: onSubscribe
06-07 21:02:54.096 22707-22707/ E/---: onNext:1
06-07 21:02:54.096 22707-22707/ E/---: onNext:2
06-07 21:02:54.097 22707-22707/ E/---: onNext:3
06-07 21:02:54.097 22707-22707/ E/---: onNext:4
06-07 21:02:54.097 22707-22707/ E/---: onComplete

retry()

方法:
···
public final Observable retry()
public final Observable retry(long times)
···
作用:
如果出现错误事件,则会重新发送所有事件,times代表重发次数,如果不传则默认一直重发所有事件
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException());
            }
        }).retry(1)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

重发1次,打印结果:

06-07 21:18:34.479 3102-3102/ E/---: onSubscribe
06-07 21:18:34.479 3102-3102/ E/---: onNext:1
06-07 21:18:34.479 3102-3102/ E/---: onNext:2
06-07 21:18:34.479 3102-3102/ E/---: onNext:1
06-07 21:18:34.479 3102-3102/ E/---: onNext:2
06-07 21:18:34.479 3102-3102/ E/---: onError

retryUntil()

方法:

public final Observable<T> retryUntil(final BooleanSupplier stop)

作用:
出现错误事件之后,可以通过此方法判断是否继续发送事件。默认返回false代表重发事件,返回true代表终止会调用onError()方法
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                if(i > 5){
                    return true;
                }
                return false;
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
                i += value;
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 21:26:29.735 8914-8914/? E/---: onSubscribe
06-07 21:26:29.735 8914-8914/? E/---: onNext:1
06-07 21:26:29.736 8914-8914/? E/---: onNext:2
06-07 21:26:29.736 8914-8914/? E/---: onNext:3
06-07 21:26:29.736 8914-8914/? E/---: onError

retryWhen()

方法:

public final void safeSubscribe(Observer<? super T> s)

作用:
当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送Error事件,则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断尝试发送事件
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("404"));
                e.onNext(3);
            }
        }).retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
                    @Override
                    public ObservableSource <? > apply(Throwable throwable) throws Exception {
                        if(!throwable.toString().equals("java.lang.Exception: 404")) {
                            return Observable.just("可以忽略的异常");
                        } else {
                            return Observable.error(new Throwable("终止啦"));
                        }
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 21:44:03.576 19745-19745/ E/---: onSubscribe
06-07 21:44:03.578 19745-19745/ E/---: onNext:1
06-07 21:44:03.578 19745-19745/ E/---: onNext:2
06-07 21:44:03.579 19745-19745/ E/---: onError

将 onError(new Exception(“404”)) 改为 onError(new Exception(“303”)) 看看打印结果:

06-07 21:45:23.865 21091-21091/? E/---: onNext:2
06-07 21:45:23.865 21091-21091/? E/---: onNext:1
06-07 21:45:23.865 21091-21091/? E/---: onNext:2
06-07 21:45:23.865 21091-21091/? E/---: onNext:1
...

repeat()

方法:

public final Observable<T> repeat()
public final Observable<T> repeat(long times)

作用:
重复发送被观察者事件,times为发送总次数。如果不传times则默认无限重复发送
代码:

        Observable.just(1,2,3)
                .repeat(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 21:47:22.649 23139-23139/ E/---: onSubscribe
06-07 21:47:22.650 23139-23139/ E/---: onNext:1
06-07 21:47:22.650 23139-23139/ E/---: onNext:2
06-07 21:47:22.650 23139-23139/ E/---: onNext:3
06-07 21:47:22.650 23139-23139/ E/---: onNext:1
06-07 21:47:22.650 23139-23139/ E/---: onNext:2
06-07 21:47:22.650 23139-23139/ E/---: onNext:3
06-07 21:47:22.650 23139-23139/ E/---: onComplete

repeatWhen()

方法:

public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

作用:
这个方法会返回一个新的被观察者,通过设定一定的逻辑来决定是否重复发送事件。一共分为三种情况,如果的被观察者返回onComplete或者onError事件,则的被观察者不会继续发送事件。如果被观察者返回其它事件,则会重复发送事件。
代码:
例如校验onComplete

        Observable.just(1,2,3).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return Observable.empty();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 21:53:34.625 25947-25947/ E/---: onSubscribe
06-07 21:53:34.627 25947-25947/ E/---: onComplete

发送onError打印结果:

06-07 21:54:59.959 27588-27588/ E/---: onSubscribe
06-07 21:54:59.961 27588-27588/ E/---: onError

发送其它事件打印结果:

06-07 21:55:39.898 28269-28269/ E/---: onSubscribe
06-07 21:55:39.899 28269-28269/ E/---: onNext:1
06-07 21:55:39.899 28269-28269/ E/---: onNext:2
06-07 21:55:39.900 28269-28269/ E/---: onNext:3
06-07 21:55:39.900 28269-28269/ E/---: onComplete

subscribeOn()

方法:

public final Observable<T> subscribeOn(Scheduler scheduler)

作用:
指定被观察者的线程,有一点需要注意就是如果多次调用此方法,只有第一次有效。
代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.e("---","threadName:"+Thread.currentThread().getName());
                e.onNext(1);
            }
        })
// .subscribeOn(Schedulers.newThread())
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

如果不指定被观察者运行线程,打印结果:

06-07 22:05:22.422 32737-32737/ E/---: onSubscribe
06-07 22:05:22.422 32737-32737/ E/---: threadName:main
06-07 22:05:22.422 32737-32737/ E/---: onNext:1

如果调用subscribeOn(Schedulers.newThread()),打印结果:

06-07 22:08:00.349 3772-3772/ E/---: onSubscribe
06-07 22:08:00.352 3772-3812/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:08:00.352 3772-3812 E/---: onNext:1

observerOn()

方法:

public final Observable<T> observeOn(Scheduler scheduler)

作用:
指定观察者的线程,每指定一次就会生效一次。
代码:

        Observable.just(1).observeOn(Schedulers.newThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.e("---","threadName:"+Thread.currentThread().getName());
                        return integer * 2;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
                Log.e("---","threadName:"+Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 22:21:10.473 15136-15136/ E/---: onSubscribe
06-07 22:21:10.474 15136-15163/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:21:10.492 15136-15136/ E/---: onNext:2
06-07 22:21:10.492 15136-15136/ E/---: threadName:main
06-07 22:21:10.492 15136-15136/ E/---: onComplete

下表总结了 RxJava 中的调度器:

调度器 作用
Schedulers.computation( ) 用于使用计算任务,如时间循环和回调处理
Schedulers.immediate( ) 当前线程
Schedulers.io( ) 用于IO 密集型任务
Schedulers.newThread( ) 创建一个新的线程
AndroidSchedulers.mainThread() Android 的UI 线程,用于操作UI