前面已经提到过一部分操作符,下面我们再看看其他操作符
distinct 发被观察者列当中之前没有发射过的数据,也就是去除重复的数据
Observable.just(1, 3, 4, 2, 1, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " accept: " + Thread.currentThread().getName());
}
});
}
07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 1 accept: main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 3 accept: main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 4 accept: main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 2 accept: main
merge merge和concat类似,也是用来连接两个被订阅者,但是它不保证两个被订阅发射数据的顺序。
Observable.merge(Observable.just(11, 22, 44, 33), Observable.just(50, 60, 90))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " onNext: " + Thread.currentThread().getName());
}
});
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 11 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 22 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 44 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 33 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 50 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 60 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 90 onNext: main
replay 使得即使在未订阅时,被订阅者已经发射了数据,订阅者也可以收到被订阅者在订阅之前最多n个数据。
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(1);
connectableObservable.connect();
connectableObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(TAG, value + " onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
source.onNext(11);
source.onNext(12);
source.onNext(13);
source.onNext(14);
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " onNext: two " + Thread.currentThread().getName());
}
});
07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 11 onNext: main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 12 onNext: main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 13 onNext: main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 14 onNext: main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 14 onNext: two main
reduce 所有数 之和
Flowable.just(1, 2, 3, 4)// 所有数 之和
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e(TAG, integer + " apply: " + Thread.currentThread().getName());
return integer2 + integer;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " subscribe: " + Thread.currentThread().getName());
}
});
07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 1 apply: RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 3 apply: RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 6 apply: RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-12858/com.example E/RXActivity: 10 subscribe: main
sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者: call 回掉第一个参数是上次的结算结果,第二个参数是当此的源observable的输入值
Observable.just(1, 2, 3, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e(TAG, integer + " " + integer2 + " apply: " + Thread.currentThread().getName());
return integer + integer2;
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(TAG, value + " onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})
07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 1 2 apply: RxCachedThreadScheduler-1skip 剔除订阅的个数
07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 3 3 apply: RxCachedThreadScheduler-1
07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 6 5 apply: RxCachedThreadScheduler-1
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 1 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 3 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 6 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 11 onNext: main
Observable.just(1, 2, 3, 5, 6, 11).skip(2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " test: " + Thread.currentThread().getName());
}
});
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 3 test: main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 5 test: main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 6 test: main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 11 test: main
filter 过滤
Observable.just(1, 2, 4, 5, 6, 11).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
Log.e(TAG, integer + " test: " + Thread.currentThread().getName());
return integer % 2 == 0;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " accept: " + Thread.currentThread().getName());
}
});
07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 2 accept: main
07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 4 accept: main
07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 6 accept: main
CompositeDisposable 调度控制后台任务队列,可以清空队列,也就没有观察者回调了,如离开页面,清空队列
CompositeDisposable mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(10); Log.e(TAG, "subscribe: " + Thread.currentThread().getName()); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, integer + " subscribe: " + Thread.currentThread().getName()); } }) );@Override protected void onDestroy() { super.onDestroy(); mCompositeDisposable.clear(); }
07-23 13:55:49.611 12858-4439/com.example E/RXActivity: subscribe: RxCachedThreadScheduler-4
07-23 13:55:49.611 12858-12858/com.example E/RXActivity: 10 subscribe: main
zip操作符其实就是通过Observable.zip()方法把多个Observable组合成新的Observable,这个新的Observable对应的数据流由call方法决定:
private Observable<List<User>> one() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getOne());
e.onComplete();
}
}
});
}
private Observable<List<User>> two() { return Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> e) throws Exception { if (!e.isDisposed()) { e.onNext(Utils.getTwo()); e.onComplete(); } } }); }
Observable.zip(one(), two(), new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> o, List<User> o2) throws Exception {
o.addAll(o2);
return o ;
}
}).subscribeOn(Schedulers.io()).forEach(new Consumer<List<User>>() {
@Override
public void accept(List<User> users) throws Exception {
Log.e(TAG, "onNext: " +users);
}
});
Map一般用于对原始的参数进行加工处理,返回值还是基本的类型,可以在subscribe中使用(适用)的类型。
Observable.just(R.drawable.ic_)//fromArray
.map(new Function<Integer, Drawable>() {
@Override
public Drawable apply(Integer integer) throws Exception {
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable md) throws Exception {
findViewById(R.id.bt1).setBackground(md);
}
});
}
flatMap一般用于输出一个Observable,而其随后的subscribe中的参数也跟Observable中的参数一样,注意不是Observable,一般用于对原始数据返回一个Observable,这个Observable中数据类型可以是原来的,也可以是其他的
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 10; i++) {
list.add("改变下" + integer);
}
return Observable.fromIterable(list);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});、
07-23 14:09:26.771 10870-10870/com.example E/RXActivity: 改变下107-23 14:09:26.771 10870-10870/com.example E/RXActivity: 改变下2
concatMap 操作符功能与flatMap操作符一致,不过,它解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们
如果您还想了解更多,可以添加公众号: