Android RxJava创建操作符

时间:2021-01-27 17:48:12

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的创建操作符。

create()

方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

作用:
创建一个被观察者
代码:

        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });
        Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----","-----onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("----","-----onNext:"+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("----","-----onError");
            }

            @Override
            public void onComplete() {
                Log.e("----","-----onComplete");
            }
        };
        observable.subscribe(observer);

just()

方法:

public static <T> Observable<T> just(T item) 
...
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
...
 public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

作用:
创建一个被观察者,可以发送多个事件,但是不能超过10个
代码:

        Observable.just("1","2","5").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----","-----onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.e("----","-----onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("----","-----onComplete");
            }
        });

fromArray()

方法:

public static <T> Observable<T> fromArray(T... items)

作用:
和just()方法类似,但是可以传入多个事件,也可以传入一个数组
代码:

        String[] str = new String[]{"2","3","8"};
        Observable.fromArray(str).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("----","-----onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

fromCallable()

方法:

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

作用:
将Callable的返回值发送给观察者,Callable用法也Runnable用法一致,一个是 call() 方法,一个是 run() 方法,并且 call() 有返回值。
代码:

        Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(10000);
                return "hello";
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("----","-----onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

fromFuture()

方法:

public static <T> Observable<T> fromFuture(Future<? extends T> future)

作用:
Future的作用是增加了cancel()等方法操作Callable,可以通过get()获取Callable获取返回值,可以cancel()取消Callable
代码:

        //创建callable
        Callable callable = new Callable<String>() {

            @Override
            public String call() throws Exception {
                return "how are you";
            }
        };
        //创建future
        final FutureTask<String> futureTask = new FutureTask<String>(callable);
        Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                futureTask.run();
                //这里可以通过futureTask.get()获取callable返回值
                //Log.e("----","-----accept:"+futureTask.get());

            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("----","-----onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

doOnSubscribe只是在被订阅之后才会调用,也就是subscribe()之后会执行

fromInterable()

方法:

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

作用:
可以将一个集合给观察者
代码:

        ArrayList<String> lists = new ArrayList<>(3);
        lists.add("1");
        lists.add("2");
        lists.add("3");
        Observable.fromIterable(lists).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("----","-----onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

defer()

方法:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

作用:
当被观察者被订阅的时候再创建被观察者
代码:

 int number = 100;
         Observable observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {

            @Override
            public ObservableSource<? extends String> call() throws Exception {

                return Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("number->"+number);
                    }
                });
            }
        });
        Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----", "-----onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.e("----", "-----onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("----", "-----onComplete");
            }
        };
        observable.subscribe(observer);
        number++;
        observable.subscribe(observer);

输出结果

    -----onSubscribe     -----onNext:number->100     -----onSubscribe     -----onNext:number->101

timer()

方法:

 public static Observable<Long> timer(long delay, TimeUnit unit)

作用:
再一定时间之后,发送一个0L的值给观察者
代码:

        Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.e("----", "-----onNext:" + aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

interval()

方法:

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

initialDelay:onSubscribe 之后调用 onNext 的时间间隔
period:时间间隔
作用:
每隔一段时间发送一个事件,这个事件是从0开始,每次加1
代码:

        Observable.interval(5,3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----", "-----onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                Log.e("----", "-----onNext:" + aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

日志:

06-04 15:31:00.916 17550-17550/ E/----: -----onSubscribe
06-04 15:31:05.919 17550-17579/ E/----: -----onNext:0
06-04 15:31:08.919 17550-17579/ E/----: -----onNext:1
06-04 15:31:11.917 17550-17579/ E/----: -----onNext:2

intervalRange()

方法:

    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {

start:开始数值
count:发送事件总数
initialDelay:onSubscribe 之后调用 onNext 的时间间隔
period:时间间隔
作用:
和interval功能类似,但是可以制定开始数值以及发送事件数量
代码:

        Observable.intervalRange(5,3,5,3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----", "-----onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                Log.e("----", "-----onNext:" + aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

上述代码代表从5开始,发送3个事件,执行完onSubscribe 5秒之后执行onNext,每3秒发送一个事件,日志如下:

06-04 15:38:32.966 19117-19117 E/----: -----onSubscribe
06-04 15:38:37.970 19117-19148 E/----: -----onNext:5
06-04 15:38:40.970 19117-19148 E/----: -----onNext:6
06-04 15:38:43.968 19117-19148 E/----: -----onNext:7

range() & rangeLong()

方法:

public static Observable<Integer> range(final int start, final int count)
public static Observable<Long> rangeLong(long start, long count)

start:事件起始值
count:事件个数
作用:
同时发送多个事件序列
代码:

       Observable.range(3,5).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("----", "-----onSubscribe");
           }

           @Override
           public void onNext(Integer integer) {
               Log.e("----", "-----onNext:" + integer);
           }

           @Override
           public void onError(Throwable e) {

           }

           @Override
           public void onComplete() {

           }
       });

empty() & never() & error()

方法:

public static <T> Observable<T> empty()
 public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)

作用:
empty:调用之后直接发送完成事件,会调用 onComplete 方法
never:不发送事件
error:调用之后会执行 onError 方法
代码:

        Observable.error(new NullPointerException()).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("----", "-----onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                Log.e("----", "-----onNext");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("----", "-----onError:"+e.toString());
            }

            @Override
            public void onComplete() {
                Log.e("----", "-----onComplete");
            }
        });