Combining Observables(组合操作符)
本节介绍如何把多个数据源的数据组合为一个数据源的操作函数。
repeat
repeat 顾名思义,可以重复的发射自己。 repeat 不会缓存之前的数据,当再次发射数据的时候,会从新就算数据。
对一个Observable重复发射
repeat和repeatWhen操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。
public final Observable<T> repeat()
public final Observable<T> repeat(long count)
Observable.just(1,2)
// .repeat()//无限循环
.repeat(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer+"");
}
});
结果:
1
2
1
2
repeatWhen
repeatWhen 可以指定一个条件,当该条件满足的时候才重复发射数据流。条件为一个 Observable,当源Observable结束的时候,会等待 条件 Observable 来发射数据通知源 Observable 重复发射数据流。如果条件 Observable结束了,则不会触发源 Observable 重复发射数据。
有时候需要知道一个重复发射的数据量是何时结束的。repeatWhen 提供了一种特别的 Observable 在数据流结束的时候发射一个Void。可以使用这个 Observable来当做一种信号。
public final Observable<T> repeatWhen(
Func1<? super Observable<? extends java.lang.Void>,? extends Observable<?>> notificationHandler)
repeatWhen 的参数是一个函数,该函数的参数为 Observable 返回另外一个 Observable。这两个Observable 发射的数据类型是无关紧要的。输入的 Observable 用了表示重复结束的信号,返回的 Observable 用来表示重新开始的信号。
下一个示例使用 repeatWhen 来自己实现一个 repeat(n) :
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.take(2)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.take(2);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
1
0
1
上面的示例中,当重复发射完成后,ob 就立刻发射信号告诉源 Observable 重新发射。
下面的示例中,创建一个每隔两秒就重复一次的无限循环数据流:
Observable<Long> values2 = Observable.interval(100, TimeUnit.MILLISECONDS);
values2
.take(3)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return Observable.interval(2, TimeUnit.SECONDS);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
1
2
(隔两秒)
0
1
2
...
注意上面的返回的条件 Observable 每隔两秒就发射一次信号。在后面会介绍更多关于时间控制的技巧。
另外一个需要注意的就是 ob.subscribe() 语句,看起来是多余的其实是必不可少的。这样会强制创建 ob 对象,当前 repeatWhen 的实现需要 ob 被订阅,否则是不会触发重复发射数据的。
startWith
startWith 的参数为一个数据流,然后先发射该数据再发射 源 Observable 中的数据。
public final Observable<T> startWith(java.lang.Iterable<T> values)
public final Observable<T> startWith(Observable<T> values)
public final Observable<T> startWith(T t1)
public final Observable<T> startWith(T t1, T t2)
public final Observable<T> startWith(T t1, T t2, T t3)
// up to 10 values
Observable<Integer> values = Observable.range(0, 3);
values.startWith(-1,-2)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
-1
-2
0
1
2
startWith 是使用 参数为 just 的 concat 函数的简写。
Observable.concat(
Observable.just(-1,-2,-3),
values)
// 和下面的是一样的效果
values.startWith(-1,-2,-3)
merge
merge 把多个 Observable 合并为一个,合并后的 Observable 在每个源Observable发射数据的时候就发射同样的数据。所以多个源 Observable 的数据最终是混合是一起的:
public static final <T> Observable<T> merge(
java.lang.Iterable<? extends Observable<? extends T>> sequences)
public static final <T> Observable<T> merge(
java.lang.Iterable<? extends Observable<? extends T>> sequences,
int maxConcurrent)
public static final <T> Observable<T> merge(
Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> merge(
Observable<? extends Observable<? extends T>> source,
int maxConcurrent)
public static final <T> Observable<T> merge(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> merge(
Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
...
示例:
Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS) .map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "First";
}
}),
Observable.interval(150, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "Second";
}
}))
.take(10)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
Second
First
Second
Second
First
Second
Second
First
Second
First
concat 和 merge 的区别是:
- merge 不会等到前面一个 Observable 结束才会发射下一个 Observable 的数据
- merge 订阅到所有的 Observable 上,如果有任何一个 Observable 发射了数据,则 就把该数据发射出来。
mergeWith
同样 还有一个 mergeWith 函数用了串联调用。
Observable.interval(250, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "First";
}
})
.mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "Second";
}
}))
.take(10)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
和上面merge 输出的结果一样。
mergeDelayError
合并几个不同的Observable,merge 中如果任意一个源 Observable 出现错误了,则 merge 后的 Observable 也就出错并结束发射。使用mergeDelayError 可以推迟发生的错误,继续发射其他 Observable 发射的数据。
public static final <T> Observable<T> mergeDelayError(
Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> mergeDelayError(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> mergeDelayError(
Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
...
Observable<Object> failAt200 = Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 =
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(4);
Observable.mergeDelayError(failAt200, completeAt400)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
结果:
0
0
1
1
2
3
Failed
上面的示例中,开始两个 Observable 都发射一样的数据,当发射第二个数据 1 后,第一个 Observable抛出一个异常退出,而合并后的数据流继续发射直到所有的 Observable 完成或者也出现异常。
如果合并多个 Observable,则合并后的 Observable 只有当所有源 Observable 结束后才结束,如果有多个源 Observable 出现了异常,则合并后的 Observable 会用一个 CompositeException 来结束。
Observable<Object> failAt200 = Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
Observable.error(new Exception("Failed")));
Observable<Object> failAt300 = Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(4);
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
结果:
0
0
0
1
1
1
2
2
3
2 exceptions occurred.
switchOnNext
- switchOnNext 的参数为一个返回 Observable 对象的 Observable。
- 也就是说,这个参数为一个Observable, 但是这个 Observable 所发射的数据类型是 Observable 。
switchOnNext 返回的Observable 发射数据的规则如下:
- 在参数 Observable 返回的 Observable 中,把最先发射数据的 Observable 中的数据拿来转发,如果之后又有新的Observable 开始发射数据了,则就切换到新的 Observable 丢弃前一个。
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(final Long aLong1) {
return Observable.interval(30, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong2) {
return aLong1;
}
});
}
}))
.take(9)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
0
0
1
1
1
2
2
2
注意上面示例中 switchOnNext 的参数 每隔 100毫秒返回一个 Observable 。这个返回的 Observable 会每隔 30 毫秒发射一个数字,这个数字被映射为 100毫秒发射一个数据的 Observable返回的数据。所以在第一个100毫秒的时候,switchOnNext 的参数返回的第一个 Observable 可以发射3个数据 0,然后到第100毫秒的时候,switchOnNext 的参数返回的第二个 Observable 开发发射数据1, 所以前一个发射数据 0 的 Observable 就被丢弃了, switchOnNext 切换到新的发射数据的 Observable。
switchMap
就像 flatMap 内部使用 merge 来组合发射的数据;以及 concatMap 使用 concat 来组合数据,而switchMap 内部使用 switchOnNext 来打散组合数据。
public final <R> Observable<R> switchMap(Func1<? super T,? extends Observable<? extends R>> func)
源 Observable 所发射的每一个数据都被 func 函数给转换为一个新的 Observable 了。每次只要 源Observable发射一个数据,func 函数都把该数据转换为一个 Observable 然后 switchMap 返回的 Observable就使用这个新的 Observable 来发射数据。
前面的示例也可以用 switchMap 实现:
Observable.interval(100, TimeUnit.MILLISECONDS)
.switchMap(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(final Long aLong1) {
return Observable.interval(30, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong2) {
return aLong1;
}
});
}
})
.take(9)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
0
0
1
1
1
2
2
2
Pairing sequences
下面几个操作符用来把多个源 Observable 发射的数据组合成一个数据。
zip
zip 是函数式编程中的一个基本概念,参数为多个源 Observable, 返回的结果是把这些源 Observable发射的数据按照顺序给组合起来。
下面的示例中,有两个源 Observable 发射数据的速度是不一样的。
Observable.zip(
Observable.interval(100, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Left emits " + aLong);
}
}),
Observable.interval(150, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Right emits " + aLong);
}
}),
new Func2<Long, Long, Object>() {
@Override
public Object call(Long aLong, Long aLong2) {
return aLong + " - " + aLong2;
}
})
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
Left emits 0
Right emits 0
0 - 0
Left emits 1
Right emits 1
Left emits 2
1 - 1
Left emits 3
Right emits 2
2 - 2
Left emits 4
Left emits 5
Right emits 3
3 - 3
Left emits 6
Right emits 4
4 - 4
Left emits 7
Right emits 5
Left emits 8
5 - 5
从上面示例中可以看到,zip 是按照顺序来组合数据的。
zip 有很多重载函数可以接受多个 Observable :
public static final <R> Observable<R> zip(
java.lang.Iterable<? extends Observable<?>> ws,
FuncN<? extends R> zipFunction)
public static final <R> Observable<R> zip(
Observable<? extends Observable<?>> ws,
FuncN<? extends R> zipFunction)
public static final <T1,T2,R> Observable<R> zip(
Observable<? extends T1> o1,
Observable<? extends T2> o2,
Func2<? super T1,? super T2,? extends R> zipFunction)
public static final <T1,T2,T3,R> Observable<R> zip(
Observable<? extends T1> o1,
Observable<? extends T2> o2,
Observable<? extends T3> o3,
Func3<? super T1,? super T2,? super T3,? extends R> zipFunction)
...
如果有多个 源 Observable,则 zip 会等待最慢的一个 Observable 发射完数据才开始组合这次发射的所有数据。
Observable.zip(
Observable.interval(100, TimeUnit.MILLISECONDS),
Observable.interval(150, TimeUnit.MILLISECONDS),
Observable.interval(050, TimeUnit.MILLISECONDS),
new Func3<Long, Long, Long, Object>() {
@Override
public Object call(Long aLong, Long aLong2, Long aLong3) {
return aLong + " - " + aLong2 + " - " + aLong3;
}
})
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0 - 0 - 0
1 - 1 - 1
2 - 2 - 2
3 - 3 - 3
4 - 4 - 4
5 - 5 - 5
- zip 的任意一个源 Observable 结束标示着 zip 的结束。
- 其他源 Observable 后续发射的数据被忽略了。
下面的例子组合三个 Observable,然后统计下 zip 返回的 Observable 发射了多少个数据:
Observable.zip(
Observable.range(0, 5),
Observable.range(0, 3),
Observable.range(0, 8),
new Func3<Integer, Integer, Integer, Object>() {
@Override
public Object call(Integer integer, Integer integer2, Integer integer3) {
return integer + " - " + integer2 + " - " + integer3;
}
})
.count()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
3
所以:
- zip 返回的Observable发射的速度和最慢的那个 Observable 一样
- 发射的数据和发射最少数据的 那个 Observable 一样。
zipWidth 还可以使用一个 iterable 为参数:
Observable.range(0, 5)
.zipWith(Arrays.asList(0, 2, 4, 6, 8), new Func2<Integer, Integer, Object>() {
@Override
public Object call(Integer integer, Integer integer2) {
return integer + " - " + integer2;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0 - 0
1 - 2
2 - 4
3 - 6
4 - 8
zipWith
zip 还有一个 zipWith 操作函数:
Observable.interval(100, TimeUnit.MILLISECONDS).zipWith(
Observable.interval(150, TimeUnit.MILLISECONDS),
new Func2<Long, Long, Object>() {
@Override
public Object call(Long aLong, Long aLong2) {
return aLong + " - " + aLong2;
}
})
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0 - 0
1 - 1
2 - 2
3 - 3
4 - 4
5 - 5
combineLatest
前面的 zip 使用源 Observable 发射的顺序为组合的标记,而 combineLatest 使用的是时间。只要 combineLatest 的任何一个源 Observable 发射数据,则就使用该数据和其他Observable最后一次发射的数据去组合。
Observable.combineLatest(
Observable.interval(100, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Left emits " + aLong);
}
}),
Observable.interval(150, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Right emits " + aLong);
}
})
, new Func2<Long, Long, Object>() {
@Override
public Object call(Long aLong, Long aLong2) {
return aLong + " - " + aLong2;
}
}
)
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
Left emits
Right emits
0 - 0
Left emits
1 - 0
Left emits
2 - 0
Right emits
2 - 1
Left emits
3 - 1
Right emits
3 - 2
combineLatest 一开始等待所有的源 Observable 发射第一个数据,然后只要有任意一个 Observable发射数据,就用这个数据和其他 Observable 最后一次发射的数据组合。
combineLatest 同样有多个重载函数可以组合多个源 Observable。 combineLatest使用场景是只要有一个条件变化了就需要重新计算当前的数据或者状态。比如在用markdown写博客的时候,编辑器里面有个控制按钮为把单词的每个字母转换为大写字母的开关,输入框旁边有个预览界面。每当你在编辑框中输入内容或者改变大小写状态的时候,combineLatest 就用输入框和大小写状态的最新的值来重新渲染预览界面。
join
join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并,流程图如下:
join方法的用法如下:
observableA.join(observableB,
observableA产生结果生命周期控制函数,
observableB产生结果生命周期控制函数,
observableA产生的结果与observableB产生的结果的合并规则)
上代码:
//产生0,2,4,6,8数列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 2;
}
}).take(5);
//产生0,3,6,9,12数列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 3;
}
}).take(5);
observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable持续600毫秒有效
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable持续600毫秒有效
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong + aLong2;
}
})
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong + "");
}
});
结果:
0
2
5
7
10
12
15
17
20
groupJoin
groupJoin操作符非常类似于join操作符,区别在于join操作符中第四个参数的传入函数不一致,其流程图如下:
//产生100,110,120,130,140数列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
log(System.currentTimeMillis()+"--1--");
return aLong*10+100;
}
}).take(5);
//产生1,3,5,7,9数列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
log(System.currentTimeMillis()+"--2--");
return aLong*2+1;
}
}).take(5);
observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable持续1600毫秒有效
return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable持续600毫秒有效
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Observable<Long>, Observable<Long>>() {
@Override
public Observable<Long> call(final Long aLong, Observable<Long> observable) {
return observable.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong2) {
return aLong + aLong2;
}
});
}
})
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> longObservable) {
longObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong +"");
}
});
}
});
结果:
1462255209637--1--
1462255210138--2--
101
1462255210639--1--
111
1462255211137--2--
103
113
1462255211638--1--
123
1462255212137--2--
115
125
1462255212639--1--
135
1462255213138--2--
127
137
1462255213639--1--
147
1462255214140--2--
149
139
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第三部分:驯服数据流之 组合数据流 - 云在千峰
Android RxJava使用介绍(四) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET