https://github.com/ReactiveX/RxAndroid
引入依赖: compile 'io.reactivex:rxjava:1.0.14' compile 'io.reactivex:rxandroid:1.0.1'
1、概念 RxJava基本概念:Observable(被观察者),Observer(观察者),subscribe(订阅) ,被观察者和观察者通过subscribe方法实现订阅关系。RxJava的事件回调方法有OnNext(),onCompleted(),onError()。 onCompleted:事件队列完成是执行。RxJava不仅把每个事件单独处理,还会把他们看做一个队列。RxJava规定,当不会有新的onNext()发出时,需要触发onCompleted()方法作为标志。 onError:事件队列异常时执行。在事件处理过程中出现异常时onError会被触发,同时队列自动终止。不允许在有事件发出。 onCompleted和onError在一个正确运行的队列中二者是互斥的,整个队列的执行只会调用其中的一个方法。
2、基本实现(1) 创建观察者Observer RxJava中的Observer的接口实现如下
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
除了以上的实现方式之外 RxJava还内置了一个实现了
Observer
的抽象类:Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
@Override
public void onStart() {
super.onStart();
}
};
此实现类多了一个onStart()方法 可以用于一些初始化的操作。
(2) 创建被观察者 Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("111");
subscriber.onNext("222");
subscriber.onNext("333");
subscriber.onCompleted();
}
});
Observable.create()是最基本的创建Observable方式,除此之外 还有just() from()等方式。 在返回的observable中可以通过isUnsubscribed()来判断是否已经订阅 最后通过unsubscribe()来解除订阅。
(3) 通过subscribe将二者关联起来
observable.subscribe(observer);
最后通过subscribe将oberver和observable进行关联。 这样一个订阅事件就完成了。当Observable一被订阅就会触发observer的call方法。此时 将执行三次onNext()和一次onCompleted()。
3、Scheduler Scheduler是RxJava的线程调度器。主要有以下几个:
Schedulers.immediate():直接在当前线程执行,相当于不指定线程。是默认的Scheduler。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():I/O操作(读写文件 读写数据库 网络信息交互) 所使用的Scheduler。行为模式和newThread()差不多,区别在于IO()的内部实现是用在一个无数量上限的线程池,可以重用空闲的线程。因此,多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。 Schedulers.computation: 计算所使用的
Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()
中,否则 I/O 操作的等待时间会浪费 CPU。除此之外,android还有一个专门的AndroidSchedulers.mainThread(),它指定的操作将在Android主线程运行。
有了切换线程的方法后专门使用呢? RxJava提高了subscribeOn() 和 observeOn() 两个方法对线程进行控制。
subscribeOn():指定subscribe所发生的线程,即Observable.OnSubscribe被激活时所处的线程。也叫作时间产生的线程。
observeOn():指定Subscriber所运行的线程。也叫作事件消费的线程。
String[] names = {"AA", "BB", "CC", "DD", "EE", "FF"};
Observable.from(names)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("lw", "ACtion" + s);
}
});
Action1
这个方法是无参无返回值的;由于 onCompleted()
方法也是无参无返回值的,因此 Action1
可以被当成一个包装对象,将 onCompleted()
的内容打包起来将自己作为一个参数传入 subscribe()
以实现不完整定义的回调。除了Action1之外 RxJava还提供了其他的actionX系列方法(Action0...........Action9)4、变换 (1)、map:一对一变换
Observable.just("/image/pp.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
return null;
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
}
});
map将一个字符串转换成bitMap后返回了。 这里出现了一个叫做
Func1
的类。它和 Action1
非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。
Func1
和 Action
的区别在于, Func1
包装的是有返回值的方法。另外,和 ActionX
一样, FuncX
也有多个,用于不同参数个数的方法。FuncX
和ActionX
的区别在 FuncX
包装的是有返回值的方法。
(2)、flatMap:一对多变换 原理: 1. 使用传入的事件对象创建一个
Observable
对象;2. 并不发送这个 Observable
, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable
发送的事件,都被汇入同一个 Observable
,而这个 Observable
负责将这些事件统一交给Subscriber
的回调方法。
List<Student.Course> list = new ArrayList<>();
Student student = new Student();
Student.Course c = student.new Course("语文");
Student.Course c1 = student.new Course("数学");
Student.Course c2 = student.new Course("英语");
list.add(c);
list.add(c1);
list.add(c2);
List<Student.Course> list1 = new ArrayList<>();
Student student11 = new Student();
Student.Course c3 = student.new Course("物理");
Student.Course c4 = student.new Course("化学");
Student.Course c5 = student.new Course("生物");
list.add(c3);
list.add(c4);
list.add(c5);
Student student1 = new Student(10, "zhangsan", list);
Student student2 = new Student(10, "zhangsan", list1);
Student[] students = {student1, student2};
Subscriber<Student.Course> subscriber1 = new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student.Course course) {
Log.e("ssssssss", course.courseName);
}
};
Observable.from(students).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.course);
}
}).subscribe(subscriber1);
将一组学生中的课程全部打印。