RxJava 变换、 组合 、 合并操作符

时间:2021-09-18 17:49:43
/**
* @author houde
* 时间:2018/1/23
* DesRxJava 变换操作符
*/

public class RxOperateActivity extends AppCompatActivity {
private final String TAG = "RxOperateActivity";
Observable<Integer> observable1 = Observable.just(1,2,3,4);
Observable<String> observable2 = Observable.just("A","B","C");
private Observer<String> stringObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(String s) {
Log.e(TAG,s);
}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"Complete事件作出响应");
}
};
private Observer<Integer> intObserver = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(Integer integer) {
Log.e(TAG,"事件 = " + integer);

}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"Complete事件作出响应");
}
} ;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_image);
//转换操作符
/**
* 作用
* 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件
* :将被观察者发送的事件转换为任意的类型事件。
* 应用场景
* 数据类型转换
* 具体使用
* 下面以将 使用Map()将事件的参数从整型变换成字符串类型为例子说明
*/
map();
/**
* 作用:
* 将被观察者发送的事件序列进行拆分&单独转换,再合并成一个新的事件序列,最后再进行发送
*
* 原理
* 1.为事件序列中每个事件都创建一个 Observable 对象;
* 2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
* 3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
* 4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer
* 应用场景
* 无序的将被观察者发送的整个事件序列进行变换
*/
flatMap();
/**
* 作用:类似FlatMap()操作符
* FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序
* 应用场景
* 有序的将被观察者发送的整个事件序列进行变换
*/
concatMap();
/**
* 作用
* 定期从 被观察者(Observable)需要发送的事件中获取一定数量的事件&放到缓存区中,
* 最终发送
*
* 应用场景
* 缓存被观察者发送的事件
*/
buffer();
//组合和并操作符
/**
* 作用
* 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
* 二者区别:
* 组合被观察者的数量,即concat()组合被观察者数量≤4个,
* concatArray()则可>4
*/
concat();
concatArray();
/**
* 作用
* 组合多个被观察者一起发送数据,合并后 按时间线并行执行
*
* 二者区别:
* 组合被观察者的数量,即merge()组合被观察者数量≤4个,
* mergeArray()则可>4
*
* 区别上述concat()操作符:
* 同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
*/
merge();
mergeArray();
/**
* 背景:
* 使用mergeconcat操作符时,
* 冲突:
* 若其中一个被观察者发出onError事件,则会终止其他被观察者继续发送事件
* 解决方案:
* 若希望onError事件推迟到其他被观察者发送完事件之后再触发
* 即需要使用对应的mergeDelayError()concatDelayError操作符
*
*/
concatDelayError();
mergeDelayError();
//事件的合并
/**
* 作用
* 合并多个被观察者(Observable)发送的事件,
* 生成一个新的事件序列(即组合过后的事件序列),并最终发送
* 特别注意:
* 事件组合方式 = 严格按照原先事件序列 进行对位合并
* 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
* 特别注意:
* 尽管被观察者2的事件D没有事件与其合并,但还是会继续发送
* 若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,
* 则被观察者2的事件D也不会发送,测试结果如下
* 定义:
* 属于Rxjava中的组合
* 作用:
* 1.合并多个被观察者(Observable)发送的事件
* 2.生成一个新的事件序列(即合并之后的序列),并最终发送
* 原理:
* 1.事件组合方式 = 严格按照原先事件序列进行对位合并
* 2.最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
*
* 应用场景:
* 1.当展示的信息需要从多个地方获取(即 信息 = 信息1 + 信息2& 统一结合后再展示
* 2.如:合并网络请求的发送 & 统一展示结果
*/
zip();
/**
* 作用
* 当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables
* 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该
* 函数的结果发送数据
* Zip()的区别:
* Zip() = 按个数合并,
* 11合并;CombineLatest() = 按时间合并,即在同一个时间点上合并
*
* combineLatestDelayError()
* 作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
*/
combineLatest();
/**
* 作用
* 把被观察者需要发送的事件聚合成1个事件 & 发送
*
* 聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
*/
reduce();
/**
*作用
* 将被观察者Observable发送的数据事件收集到一个数据结构里
*/
collect();
//发送事件前追加发送事件
/**
* 作用
* 在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者
*/
startWith();
startWithArray();
//统计发送事件数量
/**
* 作用
* 统计被观察者发送事件的数量
*/
count();

}

private void count() {
observable1.count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG,"发送事件的次数 = " + aLong);
}
});
}

private void startWithArray() {
Observable.just(4,5,6,7)
.startWith(0)
.startWithArray(1,2,3)
.subscribe(intObserver);
}

private void startWith() {
Observable.just(1,2,3,4)
.startWith(0)
.subscribe(intObserver);
}

private void collect() {
observable1.collect(
// 1. 创建数据结构(容器),用于收集被观察者发送的数据
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
// 参数说明:list = 容器,integer = 后者数据
list.add(integer);
// 对发送的数据进行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
Log.e(TAG, "本次发送的数据是: " + list);
}
});
}

private void reduce() {
observable1.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer s1, Integer s2) throws Exception {
Log.e(TAG, "本次计算的数据是: "+s1 +" "+ s2);
return s1 * s2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "最终计算的结果是: " + integer);
}
});
}

private void combineLatest() {
Log.e(TAG,"-------------------combineLatest-------------------");

Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
}).subscribe(stringObserver);

}

private void zip() {
Log.e(TAG,"-------------------zip-------------------");


Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(stringObserver);
}

private void mergeDelayError() {
Log.e(TAG,"-------------------mergeDelayError-------------------");
Observable.mergeArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 发送Error事件,因为使用了concatDelayError,所以第2Observable将会发送事件,等发送完毕后,再发送错误事件
emitter.onError(new NullPointerException("这里发送了一个onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);
}

private void concatDelayError() {
Log.e(TAG,"-------------------concatDelayError-------------------");
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 发送Error事件,因为无使用concatDelayError,所以第2Observable将不会发送事件
emitter.onError(new NullPointerException("这里发送了一个onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);

}

private void mergeArray(){
Log.e(TAG,"-------------------mergeArray-------------------");
Observable.mergeArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(7,8,9),
Observable.just(10,11,12),
Observable.just(13,14,15),
Observable.just(16,17,18)
).subscribe(intObserver);
}
private void merge(){
Log.e(TAG,"-------------------merge-------------------");

Observable.merge(Observable.just(1,2,3,4),
Observable.just(5,6),
Observable.just(7,8,9),
Observable.just(10,11,12,13)
)
.subscribe(intObserver);
}
private void concatArray(){
Log.e(TAG,"-------------------concatArray-------------------");
Observable.concatArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10),
Observable.just(11,12,13),
Observable.just(14,15,16)
).subscribe(intObserver);
}
private void concat(){
// concat():组合多个被观察者(≤4个)一起发送数据
// 注:串行执行
Log.e(TAG,"-------------------concat-------------------");

Observable.concat(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10)
).subscribe(intObserver);
}

private void buffer() {
Log.e(TAG,"-------------------buffer-------------------");

Observable.just(1,2,3,4,5,6,7,8)
// 设置缓存区大小 & 步长
// 缓存区大小 = 每次从被观察者中获取的事件数量
// 步长 = 每次获取新事件的数量
.buffer(3,1)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"开始采用subscribe连接");
}

@Override
public void onNext(List<Integer> ints) {
Log.e(TAG,"缓存区里的事件个数" + ints.size());
for(int i = 0 ,size = ints.size(); i < size;i++){
Log.e(TAG,"事件 = " + i);
}
}

@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"Complete事件作出响应");
}
});
}

private void concatMap() {
Log.e(TAG,"-------------------concatMap-------------------");
Observable.just(4,3,2,1)
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
})
.subscribe(stringObserver);
}

private void flatMap() {
Log.e(TAG,"-------------------flatMap-------------------");
Observable.just(1,2,3,4)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
}).subscribe(stringObserver);
}

@NonNull
private List<String> getEvents(Integer integer) {
List<String> event = new ArrayList<>(3);
for(int i = 0 ; i < 3 ; i++){
event.add("我是事件 " + integer + "拆分后的子事件" + i);
}
return event;
}

private void map() {
Log.e(TAG,"-------------------map-------------------");
Observable.just(1,2,3,4).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer;
}
}).subscribe(stringObserver);
}
}