RxJava----操作符:条件和布尔操作符

时间:2021-08-02 17:50:47

Conditional and Boolean Operators(条件和布尔操作符)

all

  • all 函数用来判断 observable 中发射的所有数据是否都满足一个条件。
public final Observable<java.lang.Boolean> all(Func1<? super T,java.lang.Boolean> predicate)

满足条件

        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onNext(10);
subscriber.onNext(20);
subscriber.onNext(2);
subscriber.onCompleted();
}
});

Subscription evenNumbers = values
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean+"");
}
});

结果:

true
Complete!
  • all 函数返回的是一个发射一个 布尔值的 Observable,而不是直接返回一个 布尔值。
  • 原因在于我们并不知道源 Observable 何时才结束数据流的发射,只有当源 Observable 发射结束的时候, all 函数才知道结果是否都满足条件。
  • 只要遇到一个不满足条件的数据,all 函数就立刻返回 false。
  • 只有当源 Observable 结束发射并且所发射的所有数据都满足条件的时候才会产生 true。
  • 在 observable 内返回结果可以方便的实现非阻塞操作。 在下个示例中可以看到 all 在遇到不满足的数据的时候就立刻结束了。

不满足条件

        Observable<Long> values = Observable.interval(150, TimeUnit.MILLISECONDS).take(5);
Subscription subscription = values
.all(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong<3;
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("First:Complete!");
}
@Override
public void onError(Throwable e) {
log("First:"+e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log("First:"+aBoolean);
}
});
Subscription subscription2 = values
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Second:Complete!");
}
@Override
public void onError(Throwable e) {
log("Second:"+e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log("Second:"+aLong);
}
});

结果:

Second:0
Second:1
Second:2
First:false
First:Complete!
Second:3
Second:4
Second:Complete!

数据错误

  • 如果源 Observable 出现了错误,则 all 操作就没有意义了,all 会直接发射一个 error 然后结束。
        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onNext(2);
subscriber.onError(new Exception("Oops"));
}
});
Subscription subscription = values
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean+"");
}
});

结果:

Oops

不满足条件且数据错误

将上面的subscriber.onNext(0);改为subscriber.onNext(1);
结果:

false
Complete!
  • 也就是说数据不满足条件,则直接completed()。

ofType

ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤,其流程图如下:
RxJava----操作符:条件和布尔操作符

Observable.just(0, "1", 2, "3").ofType(String.class)
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
log(object.toString() + ":" + object.getClass());
}
});

结果:

1:class java.lang.String
3:class java.lang.String

single

  • single 只会发射源 Observable 中的一个数据,如果使用重载的带过滤条件的函数,则发射符合该过滤条件的那个数据。
  • 和 first 、last 不一样的地方是,single 会检查数据流中是否只包含一个所需要的的数据,如果有多个则会抛出一个错误信息。
  • 所以 single 用来检查数据流中是否有且仅有一个符合条件的数据。所以 single 只有在源 Observable 完成后才能返回。

RxJava----操作符:条件和布尔操作符

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(10) // 获取前 10 个数据 的 Observable
.single(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong == 5L;
}
}) // 有且仅有一个 数据为 5L
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Single1:Complete!");
}
@Override
public void onError(Throwable e) {
log("Single1:" + e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log("Single1:" + aLong);
}
});
values
.single(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong == 5L;
}
}) // 由于源 Observable 为无限的,所以这个不会打印任何东西
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Single2:Complete!");
}
@Override
public void onError(Throwable e) {
log("Single2:" + e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log("Single2:" + aLong);
}
});

结果:

Single1: 5
Single1: Complete!
和前面的类似,使用 singleOrDefault 可以返回一个默认值。

exists

  • 如果源 exists 发射的数据中有一个满足条件,则 exists 就返回 true。 exists 和 all 一样也是返回一个 Observable 而不是直接返回 布尔值。

RxJava----操作符:条件和布尔操作符

        Observable<Integer> values = Observable.range(0, 2);
Subscription subscription = values
.exists(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean + "");
}
});

结果:

false
Complete!
上面示例中只发射了 0 和1 两个数据,而这两个数据都不满足大于2的条件,所以返回结果为 false。
如果我们多发射几个数据,则就会满足条件了。
Observable<Integer> values = Observable.range(0, 4);

结果:

true
Complete!

isEmpty

  • 顾名思义,判断一个 Observable 是否是空的,也就是没有发射任何数据就结束了。

RxJava----操作符:条件和布尔操作符

        Observable<Long> values = Observable.timer(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values
.isEmpty()
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean + "");
}
});

结果:

false
Complete!
只要源 Observable 发射了一个数据,isEmpty 就立刻返回 false, 只有当源 Observable 完成了并且没有发射数据,isEmpty 才返回 true。

contains

  • contains 使用 Object.equals 函数来判断源 Observable 是否发射了相同的数据。
    只要遇到相同的数据,则 contains 就立刻返回。

RxJava----操作符:条件和布尔操作符

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.contains(4L)
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean + "");
}
});

结果:

true
Complete!
  • 注意上面使用的是 contains(4L), 而不是 contains(4), 由于 values 是 Observable 类型的, 所以需要使用 Long 类型而不能是 Integer 类型。
  • 如果使用 contains(4) 则什么都不会打印出来, 由于 values 是一个无限的数据流,所以 contains 一直在等待一个相同的数据发射出来,但是在 values 里面是没有一样的数据的,导致 contains 一直等待下去。

defaultIfEmpty

  • 如果你不想单独处理没有发射任何数据的情况(需要用 isEmpty 函数来检查是否为空),则可以使用 defaultIfEmpty 函数来强制一个空的 Observable 发射一个默认数据。

RxJava----操作符:条件和布尔操作符

        Observable<Integer> values = Observable.empty();
Subscription subscription = values
.defaultIfEmpty(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});

结果:

2
Complete!
  • 只有当 onCompleted 事件发生了,并且 Observable 没有发射任何数据的时候,才会使用默认值;
  • 否则不会使用默认值。 如果发生了错误,则还会有错误的结果。
        Observable<Integer> values = Observable.error(new Exception("Oops"));
Subscription subscription = values
.defaultIfEmpty(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});

结果:

Oops

sequenceEqual

  • sequenceEqual是用来比较两个 Observable 发射的数据是否是一样的,同样位置的数据是一样的。
  • 要求两个 Observable 发射的数据个数是一样的并且每个位置上的数据也是一样的。
  • 该函数内部用 Object.equals 来比较数据,当然你也可以自己指定一个比较函数。
        Observable<String> strings = Observable.just("1", "2", "3");
Observable<Integer> ints = Observable.just(1, 2, 3);
Observable.sequenceEqual(strings, ints, new Func2<Serializable, Serializable, Boolean>() {
@Override
public Boolean call(Serializable serializable, Serializable serializable2) {
return serializable.equals(serializable2.toString());
}
})//Observable.sequenceEqual(strings, ints)
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean + "");
}
});

结果:

true
Complete!
  • 上面使用了自定义的比较函数,所以结果是一样的。
  • 如果不指定自定义的 比较函数的话, 使用默认的 Object.equals 来比较,则返回的结果就是 False 了。
  • 如果一个源 Observable 出现了 错误,则 比较结果的 Observable 也会出现 错误并结束。
        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new Exception("Oops"));
}
});

Observable.sequenceEqual(values, values)
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Boolean aBoolean) {
log(aBoolean + "");
}
});

结果:

Oops

项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 检查数据 - 云在千峰