Rxjava 的过滤操作符

时间:2022-01-17 23:11:24
public class RxFilterActivity extends AppCompatActivity {
private final static String TAG = RxFilterActivity.class.getSimpleName();
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

// 根据指定条件过滤事件 有 filter() ofType() skip()
// skipLast() distinct() distinctUntilChanged()
/**
* 作用:过滤特定的事件,根据书写的判断条件过滤
*/
filter();
/**
* 作用:过滤特定的数据类型
*/
ofType();
/**
* 跳过某个事件
*/
skipAndSkipLast();
/**
* 过滤事件中重复的事件/连续重复的事件 distinct / distinctUntilChanged
*/
distinct();
//根据指定事件数量过滤事件 有 take() takeLast()
/**
* 通过设置指定事件的数量,只发送特定数量的事件
* 指定观察者最多能接收到的事件数量
*/
take();
/**
* 指定观察者只能接收到被观察者发送的最后几个事件
*/
takeLast();
//根据指定时间过滤事件 有throttleFirst() throttleLast() sample()
//throttleWithTimeout debounce()
/**
* 在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
*/
throttleFirst();
throttleLast();
/**
* 在某段时间内,只发送该段时间内最新(最后)1次事件
* simple() throttleLast 相似,此处就不做详细介绍了
*/

/**
* 发送数据事件时,若两次发送事件的间隔 < 指定时间,就会丢弃前一次的数据,直到指定
* 时间内都没有新数据发射时才会发送后一次的数据
*/
throttleWithTimeout();
debounce();
//根据指定事件位置过滤事件 有firstElement() lastElement() elementAt() elementAtOrError()
/**
* 仅选取第1个元素 / 最后一个元素
*/
firstElement();
lastElement();
/**
* 指定接收某个元素(通过 索引值 确定)
* 注:允许越界,即获取的位置索引 > 发送事件序列长度
*/
elementAt();
/**
* elementAt()的基础上,
* 当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
*/
elementAtOrError();
}

private void elementAtOrError() {
Log.e(TAG,"-----------------------elementAtOrError-----------------------");

Observable.just(1,2,3,4,5)
.elementAtOrError(6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"这是个玩笑,不会走到这的");
}
});
}

private void elementAt() {
Log.e(TAG,"-----------------------elementAt-----------------------");

Observable.create(getSource())
.elementAt(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"事件通过的value = " + integer);
}
});
Observable.create(getSource())
.elementAt(12,-10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"事件通过的value = " + integer);
}
});
}

private void lastElement() {
Log.e(TAG,"-----------------------lastElement-----------------------");

Observable.just(1,2,3,4,5)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG," lastElement 通过的事件value = " + integer);
}
});

}

private void firstElement() {
Log.e(TAG,"-----------------------firstElement-----------------------");
Observable.just(1,2,3,4,5,6)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG," firstElement 通过的事件value = " + integer);
}
});
}

private void debounce() {
Log.e(TAG,"-----------------------debounce-----------------------");

Observable.create(getSource())
.debounce(2,TimeUnit.SECONDS,Schedulers.io())
.subscribe(getObserver());
}

private void throttleWithTimeout() {
Log.e(TAG,"-----------------------throttleWithTimeout-----------------------");
Observable.create(getSource())
//1秒中采用数据
.throttleWithTimeout(1,TimeUnit.SECONDS,Schedulers.io())
.subscribe(getObserver());
}

private void throttleLast() {
Log.e(TAG,"-----------------------throttleLast-----------------------");
Observable.create(getSource()).observeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.subscribe(getObserver());
}

@NonNull
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"订阅成功,准备接受事件");
}

@Override
public void onNext(Integer integer) {
Log.e(TAG,"接受到的事件 value = " + integer);
}

@Override
public void onError(Throwable e) {
Log.e(TAG,"事件报错"+e.getMessage());
}

@Override
public void onComplete() {
Log.e(TAG,"事件发送完成");
}
};
}

@NonNull
private ObservableOnSubscribe<Integer> getSource() {
return new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(300);
e.onNext(2);
Thread.sleep(300);
e.onNext(3);
Thread.sleep(300);
e.onNext(4);
Thread.sleep(300);
e.onNext(5);
Thread.sleep(300);
e.onNext(6);
Thread.sleep(300);
e.onNext(7);
Thread.sleep(300);
e.onNext(8);
Thread.sleep(300);
e.onNext(9);
Thread.sleep(300);
e.onComplete();
}
};
}

private void throttleFirst() {
Log.e(TAG,"-----------------------throttleFirst-----------------------");

Observable.create(getSource()).observeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(getObserver());
}

private void takeLast() {
Log.e(TAG,"-----------------------takeLast-----------------------");
Observable.just(1,2,3,4,5,6,7,8)
.takeLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过takeLast过滤事件,接收被观察者最后发送的几个事件,通过value = " + integer);
}
});
}

private void take() {
Log.e(TAG,"-----------------------take-----------------------");

Observable.just(1,2,3,4,5,6,7,8)
.take(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过take过滤发送事件的数量,通过value = " + integer);
}
});
}

private void distinct() {
Log.e(TAG,"-----------------------distinct-----------------------");

Observable.just(1,2,3,1,2,3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过distinct过滤掉数据源中重复的数据 通过的value = " + integer);
}
});
Log.e(TAG,"-----------------------distinctUntilChanged-----------------------");

Observable.just(1,2,3,3,3,1,2,3)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过distinctUntilChanged过滤掉数据源中连续重复的数据,通过的value = " + integer);
}
});
}

private void skipAndSkipLast() {
Log.e(TAG,"-----------------------skipAndSkipLast-----------------------");

Observable.just(1,2,3,4,5,6)
//跳过正序的第一项
.skip(1)
//跳过正序的最后两项
.skipLast(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过skipskipLast跳过事件,通过的为value = " + integer);
}
});
}

private void ofType() {
Log.e(TAG,"-----------------------ofType-----------------------");

Observable.just(1,"abc",2d,3f,4L,'a',2)
.ofType(Integer.class)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
Log.e(TAG,"通过ofType过滤事件,通过的 value = " + s);
}
});
}

private void filter() {
Log.e(TAG,"-----------------------filter-----------------------");

Observable.just(1,2,3,4,5)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 3;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"通过Filter过滤事件,通过的 value = " + integer);
}
});
}
}