rxjava2.0使用教程(二)

时间:2021-12-02 17:47:45


前面已经提到过一部分操作符,下面我们再看看其他操作符

distinct 发被观察者列当中之前没有发射过的数据,也就是去除重复的数据

  Observable.just(1, 3, 4, 2, 1, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " accept: " + Thread.currentThread().getName());
}
});
}
07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 1  accept:  main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 3  accept:  main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 4  accept:  main07-23 13:22:24.731 9083-9083/com.example E/RXActivity: 2  accept:  main

merge merge和concat类似,也是用来连接两个被订阅者,但是它不保证两个被订阅发射数据的顺序。

 Observable.merge(Observable.just(11, 22, 44, 33), Observable.just(50, 60, 90))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " onNext: " + Thread.currentThread().getName());
}
});

07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 11  onNext:  main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 22 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 44 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 33 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 50 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 60 onNext: main
07-23 13:27:01.831 9083-9083/com.example E/RXActivity: 90 onNext: main


replay  使得即使在未订阅时,被订阅者已经发射了数据,订阅者也可以收到被订阅者在订阅之前最多n个数据。

 PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(1);
connectableObservable.connect();

connectableObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer value) {
Log.e(TAG, value + " onNext: " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});
source.onNext(11);
source.onNext(12);
source.onNext(13);
source.onNext(14);
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " onNext: two " + Thread.currentThread().getName());
}
});
 
07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 11  onNext:  main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 12  onNext:  main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 13  onNext:  main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 14  onNext:  main07-23 13:29:48.691 9083-9083/com.example E/RXActivity: 14  onNext:  two  main


reduce  所有数 之和

 Flowable.just(1, 2, 3, 4)// 所有数 之和
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e(TAG, integer + " apply: " + Thread.currentThread().getName());
return integer2 + integer;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {

Log.e(TAG, integer + " subscribe: " + Thread.currentThread().getName());
}
});

07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 1  apply:  RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 3 apply: RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-29492/com.example E/RXActivity: 6 apply: RxCachedThreadScheduler-2
07-23 13:49:29.011 12858-12858/com.example E/RXActivity: 10 subscribe: main



sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者: call 回掉第一个参数是上次的结算结果,第二个参数是当此的源observable的输入值

Observable.just(1, 2, 3, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e(TAG, integer + " " + integer2 + " apply: " + Thread.currentThread().getName());
return integer + integer2;
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer value) {
Log.e(TAG, value + " onNext: " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
})

07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 1  2  apply:  RxCachedThreadScheduler-1
07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 3 3 apply: RxCachedThreadScheduler-1
07-23 13:35:36.671 12858-14231/com.example E/RXActivity: 6 5 apply: RxCachedThreadScheduler-1
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 1 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 3 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 6 onNext: main
07-23 13:35:36.681 12858-12858/com.example E/RXActivity: 11 onNext: main
skip 剔除订阅的个数

  Observable.just(1, 2, 3, 5, 6, 11).skip(2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " test: " + Thread.currentThread().getName());
}
});

07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 3  test:  main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 5 test: main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 6 test: main
07-23 13:37:14.071 12858-12858/com.example E/RXActivity: 11 test: main

filter 过滤
Observable.just(1, 2, 4, 5, 6, 11).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
Log.e(TAG, integer + " test: " + Thread.currentThread().getName());

return integer % 2 == 0;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " accept: " + Thread.currentThread().getName());
}
});

07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 2  accept:  main
07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 4 accept: main
07-23 13:38:24.781 12858-12858/com.example E/RXActivity: 6 accept: main


CompositeDisposable  调度控制后台任务队列,可以清空队列,也就没有观察者回调了,如离开页面,清空队列


CompositeDisposable mCompositeDisposable = new CompositeDisposable();
 mCompositeDisposable.add(Observable.create(new ObservableOnSubscribe<Integer>() {                    @Override                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {                        e.onNext(10);                        Log.e(TAG, "subscribe: " + Thread.currentThread().getName());                    }                }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {                    @Override                    public void accept(Integer integer) throws Exception {                        Log.e(TAG, integer + "  subscribe: " + Thread.currentThread().getName());                    }                })        );
 @Override    protected void onDestroy() {        super.onDestroy();        mCompositeDisposable.clear();    }

 
07-23 13:55:49.611 12858-4439/com.example E/RXActivity: subscribe: RxCachedThreadScheduler-4
07-23 13:55:49.611 12858-12858/com.example E/RXActivity: 10 subscribe: main

zip操作符其实就是通过Observable.zip()方法把多个Observable组合成新的Observable,这个新的Observable对应的数据流由call方法决定:

 private Observable<List<User>> one() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getOne());
e.onComplete();
}
}
});
}
    private Observable<List<User>> two() {        return Observable.create(new ObservableOnSubscribe<List<User>>() {            @Override            public void subscribe(ObservableEmitter<List<User>> e) throws Exception {                if (!e.isDisposed()) {                    e.onNext(Utils.getTwo());                    e.onComplete();                }            }        });    }

 

 
Observable.zip(one(), two(), new BiFunction<List<User>, List<User>, List<User>>() {

@Override
public List<User> apply(List<User> o, List<User> o2) throws Exception {
o.addAll(o2);
return o ;
}
}).subscribeOn(Schedulers.io()).forEach(new Consumer<List<User>>() {
@Override
public void accept(List<User> users) throws Exception {
Log.e(TAG, "onNext: " +users);
}
});


Map一般用于对原始的参数进行加工处理,返回值还是基本的类型,可以在subscribe中使用(适用)的类型。


 Observable.just(R.drawable.ic_)//fromArray
.map(new Function<Integer, Drawable>() {
@Override
public Drawable apply(Integer integer) throws Exception {
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable md) throws Exception {
findViewById(R.id.bt1).setBackground(md);
}
});


}


flatMap一般用于输出一个Observable,而其随后的subscribe中的参数也跟Observable中的参数一样,注意不是Observable,一般用于对原始数据返回一个Observable,这个Observable中数据类型可以是原来的,也可以是其他的


  Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 10; i++) {
list.add("改变下" + integer);
}
return Observable.fromIterable(list);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});、
07-23 14:09:26.771 10870-10870/com.example E/RXActivity: 改变下107-23 14:09:26.771 10870-10870/com.example E/RXActivity: 改变下2

 concatMap  操作符功能与flatMap操作符一致,不过,它解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们



如果您还想了解更多,可以添加公众号:

rxjava2.0使用教程(二)