1.简介
RxJava是一套异步编程的API,是基于观察者模式的,而且是链式调用
RxJava三个基本要素:
(1) 被观察者(Observable)
(2) 观察者(Observer)
(3) 订阅(subcribe)
首先在gradle文件中添加依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.1.4' implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
//1.创建被观察者 Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { Log.i("zhoujian", "currentThread name=" + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }); //2.创建观察者 Observer observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.i("zhoujian", "onNext=" + integer); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } }; //3.订阅:被观察者订阅观察者 observable.subscribe(observer);
打印结果:
06-15 11:24:19.331 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onSubscribe 06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: currentThread name=main 06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=1 06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=2 06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onNext=3 06-15 11:24:19.332 11911-11911/com.zhoujian.rxjavademo I/zhoujian: onComplete
链式调用
//使用链式调用 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { Log.i("zhoujian", "currentThread name=" + Thread.currentThread().getName()); e.onNext(4); e.onNext(5); e.onNext(6); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.i("zhoujian", "onNext=" + integer); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } });
打印结果:
06-15 11:29:34.376 12082-12082/? I/zhoujian: onSubscribe 06-15 11:29:34.376 12082-12082/? I/zhoujian: currentThread name=main 06-15 11:29:34.376 12082-12082/? I/zhoujian: onNext=4 06-15 11:29:34.377 12082-12082/? I/zhoujian: onNext=5 06-15 11:29:34.377 12082-12082/? I/zhoujian: onNext=6 06-15 11:29:34.377 12082-12082/? I/zhoujian: onComplete
被观察者发送的事件有以下几种
事件种类 | 作用 |
onNext() | 发送该事件时,观察者会回调 onNext() 方法 |
onError() |
发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送 |
onComplete() | 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送 |
2.各种操作符
(1) create() :创建一个被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("你好!观察者"); emitter.onComplete(); } });
(2) just() : 创建一个被观察者,并发送事件,发送事件不能超过10个
Observable.just(1,2,3).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.i("zhoujian", "onNext="+integer); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } });
打印结果:
06-15 13:30:04.761 14162-14162/? I/zhoujian: onSubscribe 06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=1 06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=2 06-15 13:30:04.761 14162-14162/? I/zhoujian: onNext=3 06-15 13:30:04.761 14162-14162/? I/zhoujian: onComplete
(3) fromArray()
和just()方法类似,用于创建一个被观察者,fromArray可以传入多余10个的变量,并且可以传入一个
Integer[] array = {4, 5, 6}; Observable.fromArray(array).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.i("zhoujian", "onNext="+integer); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } });
打印结果:
06-15 14:16:12.734 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onSubscribe 06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=4 06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=5 06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onNext=6 06-15 14:16:12.735 16231-16231/com.zhoujian.rxjavademo I/zhoujian: onComplete
(4) fromCallable()
Callable和Runnable用法基本一致,只是它返回一个结果值,这个结果值就是发送给观察者的
Observable.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return 6; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("zhoujian", "accept="+integer); } });
返回结果:
06-15 14:37:13.855 18327-18327/? I/zhoujian: accept=6
(5) fromFuture()
Future的作用增加了cancel()等方法操作Callable,它可以通过get()方法来获取Callable返回的值
final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { return "Sucessed"; } }); Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { futureTask.run(); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("zhoujian", "accept="+s); } });
返回结果:
06-15 14:57:28.904 19576-19576/com.zhoujian.rxjavademo I/zhoujian: accept=Sucessed
(6) fromIterrable
直接发送一个List集合给观察者
List<String> list = new ArrayList<>(); list.add("7"); list.add("8"); list.add("9"); list.add("10"); Observable.fromIterable(list).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(String s) { Log.i("zhoujian", "onNext="+s); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } });
打印结果:
06-15 15:09:01.927 20242-20242/? I/zhoujian: onSubscribe 06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=7 06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=8 06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=9 06-15 15:09:01.927 20242-20242/? I/zhoujian: onNext=10 06-15 15:09:01.927 20242-20242/? I/zhoujian: onComplete
(7) defer()
这个方法作用是直到被观察者被订阅后才会创建被观察者
private Integer integer = 1;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(integer); } }); integer = 2; Observer observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i("zhoujian", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.i("zhoujian", "onNext="+integer); } @Override public void onError(Throwable e) { Log.i("zhoujian", "onError"); } @Override public void onComplete() { Log.i("zhoujian", "onComplete"); } }; //被观察者订阅观察者 observable.subscribe(observer); integer = 3; observable.subscribe(observer);
打印结果:
06-15 15:35:50.567 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onSubscribe 06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onNext=2 06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onComplete 06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onSubscribe 06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onNext=3 06-15 15:35:50.568 21116-21116/com.zhoujian.rxjavademo I/zhoujian: onComplete