前言
现在rx系列似乎是火的不行了,一打开群就是各种rxJava、rxAndroid。
最近正恶补各种新技术来充实自己,所以前些天写完两篇介绍JuheNews项目的文章之后,马上又开始加入了学习rxJava的阵营当中。
欢迎来到rxJava
刚开始看rxJava的系列文章的时候,一万头*在心里来回奔腾:这tm跟屎一样的东西写的都是个啥?现在开始找到了一点感觉了。
网上很多大神都把rxJava看成设计模式中的观察者模式,刚开始接触它的时候,它的链式调用我反而感觉有点像建造者模式(比如AlertDialog),不过有区别的是建造者模式中方法调用的顺序并不影响最后的建造结果,而在rxJava的链式调用中,各种Operators的顺序会影响后面的结果。这一点我们可以在后面验证。
在我看来,rxJava核心的部分有3个:
Observables 被观察者,数据源。
Operators 操作符,对Observables的各种加工的操作。
Subscribers 观察者,负责对整个流程的把控,包括接收Observables被Operators加工后的结果、处理加工时的异常、监听处理加工完成时的事件等。
所以rxJava风格的代码一般按下面四个步骤来写:
1、创建数据源Observables
2、对Observables添加各种你想要的加工操作(这一步可以没有,视具体业务而定)
3、创建Subscribers
4、绑定Observables和Subscribers(不绑定怎么接收Observables传过来的数据呢?)
初学时可能看的一头雾水,但是可以不用过于纠结里面的细节,先熟悉这种写法就行。举个栗子,我们现在要用rxJava的风格来输出一句hello rx
,该怎么写呢?
Hello RxJava
要使用rxJava我们需要在项目中添加依赖库,即在build.gradle文件中添加如下代码:
compile 'io.reactivex:rxjava:1.1.5'
配置完成后,我们先来看一段最简单的rxJava风格的代码。
private void hello() {
//创建数据源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//添加数据
subscriber.onNext("hello rx");
//数据添加完成
subscriber.onCompleted();
}
});
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//接受数据源添加过来的数据
v(s);//这只是一句打印log的普通操作,我自己写的方法
}
};
//绑定数据源和观察者
observable.subscribe(subscriber);
}
上面创建数据源时用的Observable.create()
静态方法,传入一个Observable.OnSubscribe
对象,并且重写了它的call
方法。如果你不理解,暂时把它当成这样一个模板即可,总之,这里只是为了创建一个observable对象,不必深究细节。
需要注意的是call()
方法的参数subscriber
是哪里来的呢?看代码第33行,就是绑定Observable和Subscriber时传进来的,所以代码第8行调用完成之后,就会跳到29行。
是不是看的一头雾水,没关系,rxJava的使用模板就在这里了,不管rxJava有多牛逼多强大,再多复杂的操作都是按这种套路写的。
Simple Hello
如果你觉得上面的代码有点多,看看下面的写法:
private void simpleHello() {
//创建Observable和绑定subscribe过程写在一句话里面了
Observable.just("hello rx").subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
v(s);
}
});
}
和上面那个Observable.create()
方法一样,Observable.just()
也是Observable的一个静态方法,并且它返回一个Observable对象。上面的写法就等同于下面:
private void simpleHello() {
//创建数据源
Observable<String> observable = Observable.just("hello rx");
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//接受数据源添加过来的数据
v(s);//这只是一句打印log的普通操作,我自己写的方法
}
};
//绑定数据源和观察者
observable.subscribe(subscriber);
}
还是觉得有点多,不过这里只是为了举例说明rxJava的用法。
operate
操作符是rxJava最核心、最牛逼的功能之一,说这句话没有人会打我吧。。
我理解的操作符,其实是rxJava中封装好的一系列方法,然后开发者可以很简单方便的api调用来实现以往那些需要复杂逻辑的操作。
比如要过滤一些数据,可以通过filter
操作符,其实就是调用filter()
方法;
比如要遍历一个集合,可以通过from
操作符,其实就是调用from()
方法;
比如要将数据源从一种类型转换成另一种类型,可以通过map
操作符,其实就是调用map()
方法;
还有很多···
举个栗子,我们要实现一个功能:传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来。
private void operate() {
//创建监听者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer value) {
v(value + "");
}
};
//创建被监听者、数据源
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
});
//绑定数据源和观察者
observable.subscribe(subscriber);
}
为了避免思维混乱,上面的代码暂时没有加入过滤、乘以2的操作,如果你现在能看懂这些代码,说明现在你对rxJava已经了解一二了。
对于过滤奇数的操作,需要调用filter
操作符,如下:
observable.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
上面call()
方法中返回false的数据将会被丢弃,所以我们只需写integer % 2 == 0
即可把所有奇数过滤掉了。
对于乘以2的操作,需要调用map
操作符来完成,如下:
observable.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer * 2;
}
})
call()
方法的返回值类型可以自定义,如果你想把过滤后的偶数除以2,返回double类型,则代码如下:
observable.map(new Func1<Integer, Double>() {
@Override
public Double call(Integer integer) {
return integer / 2;
}
})
结合上面两步,实现传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来
的这个功能最后的代码如下:
private void operate() {
//创建监听者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer value) {
v(value + "");
}
};
//创建被监听者、数据源
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
});
//操作数据源
observable.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer * 2;
}
})
//必须将这些操作及时绑定到监听者,否则这些操作将丢失
.subscribe(subscriber);
}
一开始的时候,我们说过rxJava中的监听者模式跟建造者模式不一样,其中有一点就是rxJava中操作符的顺序会影响最后的结果,而建造者模式中不会。
如果上面传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来
的这个需求中,改成传入一些整数,将这些数乘以2,然后过滤掉其中的奇数,最后打印出来
,两种需求最后打印的结果是一样吗?(你们觉得呢?)
传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来的结果如下:
传入一些整数,将这些数乘以2,然后过滤掉其中的奇数,最后打印出来的结果如下:
结果一目了然,大家可以把代码中的filter
和map
操作符的顺序调换一下看看是不是上面的结果。
跟map
操作符类似的还有flatMap
,不过flatMap
返回的是个 Observable 对象。
假设有个Student
对象,现在要打印学生的所有课程Course
的名字。它们的类结构如下:
public static class Student {
String name;
List<Course> courseList;
public static Student create() {
Student student = new Student();
student.name = "";
student.courseList = new ArrayList<>(1);
for (int i = 0; i < 10; i++) {
student.courseList.add(new Course(i + ""));
}
return student;
}
public static class Course {
String name;//课程名
double score;//学分
public Course(String n) {
name = n;
score = 0;
}
}
}
如果按照上面的介绍,你是不是已经想到手动取出所有课程,然后用from和map取出课程名?
no no no ,那样太low了,不符合rxJava的风格。看看flatMap
是怎么大展身手的吧:
private void flatMap() {
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
Student student = Student.create();
subscriber.onNext(student);
}
}).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.courseList);
}
}).subscribe(new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student.Course course) {
v(course.name);
}
});
}
吓尿了没?flatMap
操作符的具体原理可以去看扔物线大大的那篇文章。
Schedulers
还有两个特别的操作符需要单独拿出来,那就是subscribeOn
和observeOn
。
先来看看扔物线大大给 Android 开发者的 RxJava 详解文章中的描述:
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
说起来有点绕,具体什么意思呢?那就是切换线程。
还拿之前的hello rxJava说事,只是比之前加了切换线程的操作,如下:
private void schedulers() {
//打印线程名
v("schedulers(): " + Thread.currentThread().getName());
//创建数据源
Observable<String> observable = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//打印线程名
v("create Observable: " + Thread.currentThread().getName());
//添加数据
subscriber.onNext("hello rx");
//数据添加完成
subscriber.onCompleted();
}
})
//** 在work线程订阅事件
.subscribeOn(Schedulers.io())
//** 在main线程观察操作
.observeOn(AndroidSchedulers.mainThread());
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
//打印线程名
v("subscriber onCompleted():" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//打印线程名
v("subscriber onNext():" + Thread.currentThread().getName());
}
};
//绑定数据源和观察者
observable.subscribe(subscriber);
}
(注意:要使用AndroidSchedulers.mainThread(),必须在build.gradle中添加rxAndroid的依赖库哦:compile 'io.reactivex:rxandroid:1.2.0'
)
下面是控制台的打印结果:
可以看到:
由于指定了
subscribeOn(Schedulers.io())
,所以Observable.create()
创建Observable对象时,Observable的call()
方法在rx的IO线程执行;由于指定了
observeOn(AndroidSchedulers.mainThread())
,所以subscriber接收数据以及回调完成事件时,均在主线程执行。
我们把两个线程换一下,再来看打印结果。代码如下:
//** 在main线程订阅事件
.subscribeOn(AndroidSchedulers.mainThread())
//** 在work线程观察操作
.observeOn(Schedulers.io());
日志的第一句schedulers(): main
是由于整段代码都是在Activity发起的,当然是main线程,所以不用管,只看后面3句。
可以看到,它们的线程变成了我们指定的线程。rxJava中切换线程两句话就搞定,妈妈再也不用担心我阻塞UI了~
结合我们的实际开发经验,如果我们要在sd卡中Pictures目录中取出最多10张.png格式的图片,然后设置到ImageView中,代码就可以这么写:
private void from() {
//得到所有图片的路径
String[] dirArr = getExternalFilesDir(Environment.DIRECTORY_PICTURES).list();
Observable
//from操作符,遍历dirArr数组
.from(dirArr)
//filter得到上面传下来的单个元素,然后过滤非.png格式的图片
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.endsWith(".png");
}
})
//经过上面两步得到合适的图片之后,将图片路径转成bitmap
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//外部方法:根据图片路径获取bitmap
return readBitmapFromPath(s);
}
})
//最多取10张图片
.take(10)
//防止阻塞ui,指定 subscribe() 发生在 IO 线程
.subscribeOn(Schedulers.io())
//最后指定 Subscriber 的回调发生在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Bitmap bitmap) {
//主线程
setBitmap2Imageview(bitmap);
}
});
}
private void setBitmap2Imageview(Bitmap bitmap) {
//根据实际业务修改逻辑
}
private Bitmap readBitmapFromPath(String path) {
//根据实际业务修改逻辑
return Bitmap.createBitmap(10, 10, Bitmap.Config.ALPHA_8);
}
除了Schedulers.io()、AndroidSchedulers.mainThread(),还有其他几个调度器,如下:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread():Android专有,它指定的操作将在 Android 主线程运行。
链式调用的这种行云流水的感觉有没有吸引到你呢?逻辑清晰,代码简洁,最重要的是很多事情已经不需要你自己去考虑了:省心。
unsubscribing
之前用AsyncTask或者Handler时,处理不好就会碰到内存泄露的问题,比如Activity被结束之后网络请求返回结果,程序就crash掉了。
为了避免发生上述问题,在RxJava中你可以在Activity的onStop或者其他合适的时机来执行取消订阅的操作,即unsubscribing()
。
RxJava会停止整个调用链。如果你使用了一串很复杂的操作符,调用
unsubscribing()
将会在他当前执行的地方终止。不需要做任何额外的工作!
同时还提供了isUnsubscribed()
用来检测当前是否取消订阅,如果已经取消则返回true
。
error
如果上面的这些还吸引不了你,我想你胃口有点大了,那就再给你来点点心吧。
不知道平时开发中大家是怎么处理crash的,很多时候我们都是到处try-catch,但是现在你已经不需要担心这些问题了,rxJava调用的过程中,所有的异常都会跑到onError()
的回调当中。
不信?
private void error() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("");
}
}).filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.charAt(2) > 'a';
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
v(e.toString());
}
@Override
public void onNext(String s) {
s.substring(3);
}
});
}
代码第11行和第28行中,如果放在平时是不是已经数组越界然后程序就crash了,但是在rxJava中会直接进入onError中,程序依然可以正常运行。
大大降低了我们程序crash的概率啊有木有!!再也不会动不动就被产品经理骂的狗血淋头了有木有!!!T^T
当然并不是教大家投机取巧,只是感慨rxJava中这种优雅的处理crash的方式让人觉得很舒服,难道你不心动吗?
最后
不知不觉就啰嗦了这么多,祝大家的进阶之路一切顺利。