Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的创建操作符。
create()
方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
作用:
创建一个被观察者
代码:
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----","-----onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("----","-----onNext:"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("----","-----onError");
}
@Override
public void onComplete() {
Log.e("----","-----onComplete");
}
};
observable.subscribe(observer);
just()
方法:
public static <T> Observable<T> just(T item)
...
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
...
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
作用:
创建一个被观察者,可以发送多个事件,但是不能超过10个
代码:
Observable.just("1","2","5").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----","-----onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("----","-----onNext:"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("----","-----onComplete");
}
});
fromArray()
方法:
public static <T> Observable<T> fromArray(T... items)
作用:
和just()方法类似,但是可以传入多个事件,也可以传入一个数组
代码:
String[] str = new String[]{"2","3","8"};
Observable.fromArray(str).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("----","-----onNext:"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
fromCallable()
方法:
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
作用:
将Callable的返回值发送给观察者,Callable用法也Runnable用法一致,一个是 call() 方法,一个是 run() 方法,并且 call() 有返回值。
代码:
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(10000);
return "hello";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("----","-----onNext:"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
fromFuture()
方法:
public static <T> Observable<T> fromFuture(Future<? extends T> future)
作用:
Future的作用是增加了cancel()等方法操作Callable,可以通过get()获取Callable获取返回值,可以cancel()取消Callable
代码:
//创建callable
Callable callable = new Callable<String>() {
@Override
public String call() throws Exception {
return "how are you";
}
};
//创建future
final FutureTask<String> futureTask = new FutureTask<String>(callable);
Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();
//这里可以通过futureTask.get()获取callable返回值
//Log.e("----","-----accept:"+futureTask.get());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("----","-----onNext:"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
doOnSubscribe只是在被订阅之后才会调用,也就是subscribe()之后会执行
fromInterable()
方法:
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
作用:
可以将一个集合给观察者
代码:
ArrayList<String> lists = new ArrayList<>(3);
lists.add("1");
lists.add("2");
lists.add("3");
Observable.fromIterable(lists).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("----","-----onNext:"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
defer()
方法:
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
作用:
当被观察者被订阅的时候再创建被观察者
代码:
int number = 100;
Observable observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("number->"+number);
}
});
}
});
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----", "-----onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("----", "-----onNext:" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("----", "-----onComplete");
}
};
observable.subscribe(observer);
number++;
observable.subscribe(observer);
输出结果
-----onSubscribe -----onNext:number->100 -----onSubscribe -----onNext:number->101
timer()
方法:
public static Observable<Long> timer(long delay, TimeUnit unit)
作用:
再一定时间之后,发送一个0L的值给观察者
代码:
Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.e("----", "-----onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
interval()
方法:
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
initialDelay:onSubscribe 之后调用 onNext 的时间间隔
period:时间间隔
作用:
每隔一段时间发送一个事件,这个事件是从0开始,每次加1
代码:
Observable.interval(5,3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----", "-----onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.e("----", "-----onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
日志:
06-04 15:31:00.916 17550-17550/ E/----: -----onSubscribe
06-04 15:31:05.919 17550-17579/ E/----: -----onNext:0
06-04 15:31:08.919 17550-17579/ E/----: -----onNext:1
06-04 15:31:11.917 17550-17579/ E/----: -----onNext:2
intervalRange()
方法:
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
start:开始数值
count:发送事件总数
initialDelay:onSubscribe 之后调用 onNext 的时间间隔
period:时间间隔
作用:
和interval功能类似,但是可以制定开始数值以及发送事件数量
代码:
Observable.intervalRange(5,3,5,3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----", "-----onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.e("----", "-----onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码代表从5开始,发送3个事件,执行完onSubscribe 5秒之后执行onNext,每3秒发送一个事件,日志如下:
06-04 15:38:32.966 19117-19117 E/----: -----onSubscribe
06-04 15:38:37.970 19117-19148 E/----: -----onNext:5
06-04 15:38:40.970 19117-19148 E/----: -----onNext:6
06-04 15:38:43.968 19117-19148 E/----: -----onNext:7
range() & rangeLong()
方法:
public static Observable<Integer> range(final int start, final int count)
public static Observable<Long> rangeLong(long start, long count)
start:事件起始值
count:事件个数
作用:
同时发送多个事件序列
代码:
Observable.range(3,5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----", "-----onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("----", "-----onNext:" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
empty() & never() & error()
方法:
public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
作用:
empty:调用之后直接发送完成事件,会调用 onComplete 方法
never:不发送事件
error:调用之后会执行 onError 方法
代码:
Observable.error(new NullPointerException()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("----", "-----onSubscribe");
}
@Override
public void onNext(Object o) {
Log.e("----", "-----onNext");
}
@Override
public void onError(Throwable e) {
Log.e("----", "-----onError:"+e.toString());
}
@Override
public void onComplete() {
Log.e("----", "-----onComplete");
}
});