RxJava从入门到精通

时间:2022-10-27 00:38:54

1.简介

RxJava是一套异步编程的API,是基于观察者模式的,而且是链式调用

RxJava三个基本要素:

(1) 被观察者(Observable)

(2) 观察者(Observer)

(3) 订阅(subcribe)


首先在gradle文件中添加依赖:

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
        //1.创建被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i("zhoujian", "currentThread name=" + Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

        //2.创建观察者
        Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i("zhoujian", "onNext=" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        };

        //3.订阅:被观察者订阅观察者
        observable.subscribe(observer);

打印结果:

06-15 11:24:19.331 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onSubscribe
06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: currentThread name=main
06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=1
06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=2
06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=3
06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onComplete

链式调用

        //使用链式调用
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i("zhoujian", "currentThread name=" + Thread.currentThread().getName());
                e.onNext(4);
                e.onNext(5);
                e.onNext(6);
                e.onComplete();

            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i("zhoujian", "onNext=" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        });
        


打印结果:

06-15 11:29:34.376 12082-12082/? I/zhoujian: onSubscribe
06-15 11:29:34.376 12082-12082/? I/zhoujian: currentThread name=main
06-15 11:29:34.376 12082-12082/? I/zhoujian: onNext=4
06-15 11:29:34.377 12082-12082/? I/zhoujian: onNext=5
06-15 11:29:34.377 12082-12082/? I/zhoujian: onNext=6
06-15 11:29:34.377 12082-12082/? I/zhoujian: onComplete


被观察者发送的事件有以下几种

事件种类 作用
onNext() 发送该事件时,观察者会回调 onNext() 方法

onError()

发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

2.各种操作符

(1) create() :创建一个被观察者

 
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("你好!观察者");
                emitter.onComplete();
            }
        });

(2) just() : 创建一个被观察者,并发送事件,发送事件不能超过10个


Observable.just(1,2,3).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i("zhoujian", "onNext="+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        });

打印结果:

06-15 13:30:04.761 14162-14162/? I/zhoujian: onSubscribe
06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=1
06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=2
06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=3
06-15 13:30:04.761 14162-14162/? I/zhoujian: onComplete

(3) fromArray()

和just()方法类似,用于创建一个被观察者,fromArray可以传入多余10个的变量,并且可以传入一个

Integer[] array = {4, 5, 6};
        Observable.fromArray(array).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i("zhoujian", "onNext="+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        });


打印结果:

06-15 14:16:12.734 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onSubscribe
06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=4
06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=5
06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=6
06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onComplete


(4) fromCallable()

Callable和Runnable用法基本一致,只是它返回一个结果值,这个结果值就是发送给观察者的

 Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 6;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i("zhoujian", "accept="+integer);
            }
        });

返回结果:

06-15 14:37:13.855 18327-18327/? I/zhoujian: accept=6


(5) fromFuture()

Future的作用增加了cancel()等方法操作Callable,它可以通过get()方法来获取Callable返回的值

        final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Sucessed";
            }
        });

        Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                futureTask.run();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("zhoujian", "accept="+s);
            }
        });

        
        


返回结果:

06-15 14:57:28.904 19576-19576/com.zhoujian.rxjavademo I/zhoujian: accept=Sucessed


(6) fromIterrable

直接发送一个List集合给观察者

 List<String> list = new ArrayList<>();
        list.add("7");
        list.add("8");
        list.add("9");
        list.add("10");
        Observable.fromIterable(list).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.i("zhoujian", "onNext="+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        });





打印结果:

06-15 15:09:01.927 20242-20242/? I/zhoujian: onSubscribe
06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=7
06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=8
06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=9
06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=10
06-15 15:09:01.927 20242-20242/? I/zhoujian: onComplete


(7) defer()

这个方法作用是直到被观察者被订阅后才会创建被观察者

private Integer integer = 1;
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(integer);
            }
        });
        integer = 2;
        Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("zhoujian", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i("zhoujian", "onNext="+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("zhoujian", "onError");
            }

            @Override
            public void onComplete() {
                Log.i("zhoujian", "onComplete");
            }
        };
        //被观察者订阅观察者
        observable.subscribe(observer);
        integer = 3;
        observable.subscribe(observer);
        
        


打印结果:

06-15 15:35:50.567 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onSubscribe
06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onNext=2
06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onComplete
06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onSubscribe
06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onNext=3
06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onComplete