RxJava 1.0版本入门篇之--3

时间:2022-01-23 17:45:18

all判断所有元素是否满足某个条件

 //需求:学校需要组件一个篮球队 必须年龄小于等于20岁  判断所有的学生 是否都满足条件
// 只要有一个学生不满足条件 就返回false
Observable
.from(initStudents())
//all函数是用来判断所有的队列里面的元素是否满足某个条件 如果都满足则返回true 如果有一个不满足 就返回false
.all(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getAge() <= 20;
}
})

amb抢占式发送数据

    //1.创建A被观察者
Observable<String> aOvservable = Observable.just("A同学举手了", "A同学回答了问题");
//2.创建B被观察者
Observable<String> bOvservable = Observable.just("B同学举手了", "B同学回答了问题");
//3.最后被订阅的被观察者
//amb会发送那个先发送数据的被观察者里面的所有数据 后面的被观察者发送的数据会被忽略掉
//抢占资源 只要抢占成功 后面的资源就由该成功的对象来处理
Observable<String> ambObservable = Observable.amb(aOvservable, bOvservable);

ambObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: " + s);
}
});

contains判断是否存在某个元素

Observable<Integer> observable = Observable.just(1, 2, 3, 4);
//相当于判断一个队列里面是否存在某个元素
Observable<Boolean> booleanObservable = observable.contains(3);
booleanObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.i(TAG, "call: " + aBoolean);
}
});

defaultIfEmpty默认发送某个数据

 //需求:从网络上读取一个字符串 如果找不到该字符串 则返回一个默认的值
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//总结:如果被观察者有发送数据 则观察者直接接收数据
//如果被观察者不发送数据 而直接调用onCompleted方法 系统会自动使用defaultIfEmpty里面的默认值
String result = "从网络下读取的字符串";
//subscriber.onNext(result);
subscriber.onCompleted();
}
})
.defaultIfEmpty("显示默认的值")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: " + s);
}
});

SequenceEqual判断发送的数据是否相同

    //1.创建两个被观察者对象
Observable<Integer> o1 = Observable.just(1, 2, 3, 4);//网络的数据
Observable<Integer> o2 = Observable.just(1, 2, 4, 3);//数据库中的数据
//2.使用sequenceEqual函数 传入的参数的顺序无所谓
Observable<Boolean> booleanObservable = Observable.sequenceEqual(o1, o2);
//如果发送的元素稍微有点不同 或者说顺序不一样 那就会返回false
booleanObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.i(TAG, "call: " + aBoolean);
}
});

skipWhile&takeWhile

    Observable
.just(1, 2, 3, 4, 5,6)
.skipWhile(new Func1<Integer, Boolean>() {
//skip=跳过
// 跳过发送的每一个数据 直到条件为false才开始让观察者接收发送过来的数据
@Override
public Boolean call(Integer value) {
return value!=4;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});


//takeWhile
Observable
.just(1, 2, 3, 4, 5,6)
.takeWhile(new Func1<Integer, Boolean>() {
//take=取 拿
// 接收每一个数据 直到条件为false才开始停止让观察者接收发送过来的数据
@Override
public Boolean call(Integer value) {
return value!=4;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

Average求平均数

关于数学的计算,这里RxJava提供了一个依赖包,https://github.com/ReactiveX/RxJavaMath, 其开发包提供了一个核心帮助类:MathObservable

添加依赖:

compile 'io.reactivex:rxjava-math:x.y.z'

—x.y.z为版本号 点击项目Binaries文档http://search.maven.org获取

    Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6);
Observable<Integer> integerObservable1 = MathObservable.averageInteger(integerObservable);

integerObservable1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});

Max&Min求最大最小值

    Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6);
MathObservable
//.max(integerObservable)//求数据的最大值
.min(integerObservable)//求数据的最小值
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});

Count求数据个数

    Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6);
//需求:知道到底发送了多少个数据
Observable<Integer> countObservable = integerObservable.count();
countObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

Sum计算队列内数据总和

    Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6);
MathObservable
.sumInteger(integerObservable)//求发送数据的总和
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

RxJava异常捕获的三种方式

//  1.第一种onErrorReturn
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
//有可能是你的代码出现了空指针的异常 在这里我们也可以模拟的异常
subscriber.onError(new NullPointerException("mock nullPoint Exception !"));
subscriber.onNext(3);
subscriber.onCompleted();
}
})
//onErrorReturn 他就是用来捕获异常 并且返回一个默认的值
.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
Log.i(TAG, "call: " + throwable.getLocalizedMessage());
return 666;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});

// 2.第二种onErrorResumeNext
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
//有可能是你的代码出现了空指针的异常 在这里我们也可以模拟的异常
subscriber.onError(new NullPointerException("mock nullPoint Exception !"));
//只要抛出了异常 后面的代码都不会执行的
subscriber.onNext(3);
subscriber.onCompleted();
}
})
//只要有异常 onErrorResumeNext就会被调用 该方法返回的被观察者就会重新订阅到下面的Action1并打印出对应的数据
.onErrorResumeNext(Observable.just(666))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});

//第三种onExceptionResumeNext
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
//有可能是你的代码出现了空指针的异常 在这里我们也可以模拟的异常
subscriber.onError(new NullPointerException("mock nullPoint Exception !"));
//只要抛出了异常 后面的代码都不会执行的
subscriber.onNext(3);
subscriber.onCompleted();
}
})
//onExceptionResumeNext 跟 onErrorResumeNext效果是一样 都是返回一个新的被观察者对象并订阅到观察者
.onExceptionResumeNext(Observable.just(2333))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});

Retry重试机制

    Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
//假如此刻 网络请求的读写超时 并抛出了一个异常
subscriber.onError(new NullPointerException("mock nullPoint Exception !"));
//只要抛出了异常 后面的代码都不会执行的
subscriber.onNext(3);
subscriber.onCompleted();
}
})
//.retry()//无限的尝试 直到不会发生异常为止
.retry(3)//最大的操作次数n+1 如果n+1次后还失败 则直接抛出异常
.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 666;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: " + integer);
}
});