Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的组合操作符。
merge()
原理图:
方法:
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
作用:
将两个Observable发射的事件序列组合并成一个时间序列,就想是一个Observable发射的一样,合并后数据是无序的。
代码:
//这里的ob1和ob2将在下述所有示例中使用,将不再特殊描述
final String[] str = new String[]{"a","b","c"};
final int[] ints = new int[]{1,2,3,4,5};
Observable ob1 = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return str[aLong.intValue()];
}
}).take(str.length);
Observable ob2 = Observable.interval(300,TimeUnit.MILLISECONDS)
.map(new Function<Long,Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return ints[aLong.intValue()];
}
}).take(ints.length);
Observable.merge(ob1,ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述代码中第一个Observable作用是每500毫秒从字符串数组中取一个元素,第二个Observable的作用是每300毫秒从整型数组中取一个元素,打印结果为:
06-06 22:48:53.410 17668-17707/ E/—: 1
06-06 22:48:53.610 17668-17706/ E/—: a
06-06 22:48:53.710 17668-17707/ E/—: 2
06-06 22:48:54.010 17668-17707/ E/—: 3
06-06 22:48:54.110 17668-17706/ E/—: b
06-06 22:48:54.310 17668-17707/ E/—: 4
06-06 22:48:54.610 17668-17706/ E/—: c
06-06 22:48:54.610 17668-17707/ E/—: 5
mergeArray()
原理图:
方法:
public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
作用:
作用和merge类似,只不过是组合多个Observable
concat()
原理图:
方法:
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
...
public static <T> Observable<T> concat(
ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
作用:
功能和merge类似,也是用于将多个Observable合并,最多支持4个,但是concat是有序的,也就是说前一个Observable没发射完是不会发射后一个Observable的数据的。
代码:
将上述代码里面的merge直接换成concat
Observable.concat(ob1,ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
代码执行结果如下:
06-06 23:01:16.189 27785-27822/ E/---: a
06-06 23:01:16.689 27785-27822/ E/---: b
06-06 23:01:17.189 27785-27822/ E/---: c
06-06 23:01:17.490 27785-27872/ E/---: 1
06-06 23:01:17.790 27785-27872/ E/---: 2
06-06 23:01:18.090 27785-27872/ E/---: 3
06-06 23:01:18.390 27785-27872/ E/---: 4
06-06 23:01:18.690 27785-27872/ E/---: 5
concatArray()
方法:
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
作用:
作用同concat类似,将多个Observable进行数据合并
mergeArrayDelayError() & concatArrayDelayError()
方法:
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
作用:
在mergeArray()和concatArray()两个方法中,如果其中一个Observable发送了一个Error事件,那么就会停止发送事件,如果想onError()事件延迟到所有Observable都发送完事件后再执行,就可以使用mergeArrayDelayError()和concatArrayDelayError()
代码:
下面通过代码测试下如果中途发送onError,Observable是否会中断发送
Observable.mergeArray(ob1,Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
})).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
上述执行结果为如下:
06-06 23:18:58.930 8575-8575/ E/---: 1
06-06 23:18:58.930 8575-8575/ E/---: --onError
可以发现使用mergeArray()时如果中途发送onError()会中断数据的发送,下面将mergeArray改成mergeArrayDelayError
Observable.mergeArrayDelayError(ob1,Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
})).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
}
});
更改之后的执行结果如下:
06-06 23:14:18.191 5668-5668/ E/---: 1
06-06 23:14:18.691 5668-5700/ E/---: a
06-06 23:14:19.191 5668-5700/ E/---: b
06-06 23:14:19.691 5668-5700/ E/---: c
06-06 23:14:19.691 5668-5700/ E/---: --onError
startWith() & startWithArray()
原理图:
方法:
public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWithArray(T... items)
作用:
用于在源Observable发射的数据前插入另一个Observable发射的数据
代码:
ob1.startWith(ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
代码执行结果如下:
06-06 23:07:16.014 32379-32405/ E/---: 1
06-06 23:07:16.314 32379-32405/ E/---: 2
06-06 23:07:16.614 32379-32405/ E/---: 3
06-06 23:07:16.913 32379-32405/ E/---: 4
06-06 23:07:17.214 32379-32405/ E/---: 5
06-06 23:07:17.716 32379-32444/ E/---: a
06-06 23:07:18.215 32379-32444/ E/---: b
06-06 23:07:18.714 32379-32444/ E/---: c
zip()
原理图:
方法:
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper)
作用:
用来合并两个Observable发射的事件,根据BiFunction函数生成一个新的值发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发送数据。也就是说正常的情况下数据长度会与两个Observable中最少事件的数量一样。
代码:
简单的将两个Observable的数据进行拼接
Observable.zip(ob1, ob2, new BiFunction<String,Integer,String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s+integer;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
运行结果如下:
06-06 23:29:56.547 14888-14926/ E/---: a1
06-06 23:29:57.049 14888-14926/ E/---: b2
06-06 23:29:57.547 14888-14926/ E/---: c3
combineLatest() & combineLatestDelayError()
原理图:
可能上面这张图不是太好理解,可以看下面这张图
方法:
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
作用:
用于将两个Observable最近发射的数据经BiFunction函数的规则进行组合,combineLatest()发送事件的序列是与发送的时间线有关的。拿上图解释当发送A之后会从上一个Observ拿最近发送的1进行组合生成‘1A’,当发送2时拿第二个Observable最近发送的数据B组合成‘2B’,接下来到事件C时还是取第一个Observable最近发送的时间2进行组合成‘2C’,以此类推。
代码:
Observable.combineLatest(ob1, ob2, new BiFunction<String,Integer,String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
执行结果如下:
06-06 23:39:58.087 22418-22447/ E/---: a1
06-06 23:39:58.190 22418-22448/ E/---: a2
06-06 23:39:58.488 22418-22448/ E/---: a3
06-06 23:39:58.589 22418-22447/ E/---: b3
06-06 23:39:58.788 22418-22448/ E/---: b4
06-06 23:39:59.088 22418-22447/ E/---: c4
06-06 23:39:59.088 22418-22448/ E/---: c5
reduce()
方法:
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
作用:
与scan()操作符类似,作用是将数据以一定的逻辑聚合起来,这两个的区别在于scan()没处理一次数据将会发送一个事件给观察者,但是reduce()会将所有数据聚合在一起之后才会发送给观察者,还有一点区别就是scan的返回值是Observable,而reduce的返回值是Maybe
代码:
Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("---",integer+"");
}
});
上述代码的作用就是对数组数据进行相加处理,最终输出数据为15
count()
方法:
public final Single<Long> count()
作用:
统计要发送事件的总数
代码:
Observable.just(1,1,2,2).count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("---",aLong+"");
}
});
运行结果为:
E/---: 4
collect()
方法:
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
作用:
收集数据到一个可变的数据结构中
代码:
Observable.just("1","2","3","2")
.collect(new Callable<List<Integer>>() { //创建数据结构
@Override
public List<Integer> call() throws Exception {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, String>() {//收集器
@Override
public void accept(List<Integer> integers, String s) throws Exception {
integers.add(Integer.valueOf(s));
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e("---",integers+"");
}
});
打印结果为:
E/---: [1, 2, 3, 2]