RxJava入门——概念篇

时间:2021-09-29 22:39:36
gitHubd地址:https://github.com/ReactiveX/RxJava 
https://github.com/ReactiveX/RxAndroid 

引入依赖:   compile 'io.reactivex:rxjava:1.0.14'       compile 'io.reactivex:rxandroid:1.0.1'

1、概念 RxJava基本概念:Observable(被观察者),Observer(观察者),subscribe(订阅) ,被观察者和观察者通过subscribe方法实现订阅关系。RxJava的事件回调方法有OnNext(),onCompleted(),onError()。  onCompleted:事件队列完成是执行。RxJava不仅把每个事件单独处理,还会把他们看做一个队列。RxJava规定,当不会有新的onNext()发出时,需要触发onCompleted()方法作为标志。   onError:事件队列异常时执行。在事件处理过程中出现异常时onError会被触发,同时队列自动终止。不允许在有事件发出。 onCompleted和onError在一个正确运行的队列中二者是互斥的,整个队列的执行只会调用其中的一个方法。
    RxJava入门——概念篇RxJava入门——概念篇 2、基本实现(1) 创建观察者Observer RxJava中的Observer的接口实现如下
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}
};

除了以上的实现方式之外 RxJava还内置了一个实现了Observer 的抽象类:Subscriber
  Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(String s) {
}

@Override
public void onStart() {
super.onStart();
}
};

此实现类多了一个onStart()方法 可以用于一些初始化的操作。

(2) 创建被观察者 Observable
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("111");
subscriber.onNext("222");
subscriber.onNext("333");
subscriber.onCompleted();
}
});

Observable.create()是最基本的创建Observable方式,除此之外 还有just()  from()等方式。 在返回的observable中可以通过isUnsubscribed()来判断是否已经订阅  最后通过unsubscribe()来解除订阅。
(3) 通过subscribe将二者关联起来
  observable.subscribe(observer);

最后通过subscribe将oberver和observable进行关联。 这样一个订阅事件就完成了。当Observable一被订阅就会触发observer的call方法。此时 将执行三次onNext()和一次onCompleted()。
RxJava入门——概念篇

3、Scheduler     Scheduler是RxJava的线程调度器。主要有以下几个:
    Schedulers.immediate():直接在当前线程执行,相当于不指定线程。是默认的Scheduler。
    Schedulers.newThread():总是启用新线程,并在新线程执行操作。
    Schedulers.io():I/O操作(读写文件 读写数据库 网络信息交互) 所使用的Scheduler。行为模式和newThread()差不多,区别在于IO()的内部实现是用在一个无数量上限的线程池,可以重用空闲的线程。因此,多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。     Schedulers.computation: 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    除此之外,android还有一个专门的AndroidSchedulers.mainThread(),它指定的操作将在Android主线程运行。

有了切换线程的方法后专门使用呢?     RxJava提高了subscribeOn() 和 observeOn() 两个方法对线程进行控制。
      subscribeOn():指定subscribe所发生的线程,即Observable.OnSubscribe被激活时所处的线程。也叫作时间产生的线程。
       observeOn():指定Subscriber所运行的线程。也叫作事件消费的线程。
 String[] names = {"AA", "BB", "CC", "DD", "EE", "FF"};
Observable.from(names)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("lw", "ACtion" + s);
}
});
RxJava入门——概念篇

Action1 这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action1 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。除了Action1之外 RxJava还提供了其他的actionX系列方法(Action0...........Action9)

4、变换  (1)、map:一对一变换       
  Observable.just("/image/pp.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {


return null;
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {

}
});

map将一个字符串转换成bitMap后返回了。
这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。  Func1 和 Action的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和ActionX 的区别在 FuncX 包装的是有返回值的方法。 
(2)、flatMap:一对多变换 原理: 1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给Subscriber 的回调方法。         RxJava入门——概念篇
        List<Student.Course> list = new ArrayList<>();
Student student = new Student();
Student.Course c = student.new Course("语文");
Student.Course c1 = student.new Course("数学");
Student.Course c2 = student.new Course("英语");

list.add(c);
list.add(c1);
list.add(c2);

List<Student.Course> list1 = new ArrayList<>();
Student student11 = new Student();
Student.Course c3 = student.new Course("物理");
Student.Course c4 = student.new Course("化学");
Student.Course c5 = student.new Course("生物");

list.add(c3);
list.add(c4);
list.add(c5);

Student student1 = new Student(10, "zhangsan", list);
Student student2 = new Student(10, "zhangsan", list1);
Student[] students = {student1, student2};

Subscriber<Student.Course> subscriber1 = new Subscriber<Student.Course>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Student.Course course) {
Log.e("ssssssss", course.courseName);
}
};

Observable.from(students).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.course);
}
}).subscribe(subscriber1);


将一组学生中的课程全部打印。