RxJava 与RxAndroid 的线程控制

时间:2022-11-06 17:45:04

通过之前的学习,并没发现他们说的能取代AsyncTask的优势,接下来将介绍他如何实现线程控制

在RxJava的默认规则中,事件的发出与消费都是在同意线程中,也就是是说默认观察者和被观察者事件的处理与传递都是在一个线程中,这似乎不和观察者本身的意愿就是异步机制,这将会牵扯出宁一个类Scheduler

在不指定线程的情况下,RxJava遵循的是线程不变原则,即:在哪个线程中调用subscirbe(),就在那个线程生产事件,也就在那个线程消费事件,如果需要切换线程,就需要Scheduler 调度器

相当于线程控制器,RxJava通过它来指定每一段代码运行什么样的线程,RxJava已经内置了几个Schedulers,他们适合大多数场景,都可以从Schedulers类的静态方法获取


Schedulers.immediate()
//Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
意思是都在当前线程执行

     rx.Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("sss");
LogUtils.d("------->call线程:" + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.immediate())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
LogUtils.d("------->onNext线程:" + Thread.currentThread().getName());
}
});
结果:

12-26 03:16:58.295 1857-1857/com.rxandroid.test1 D/----->: ------->onNext线程:main
12-26 03:16:58.295 1857-1857/com.rxandroid.test1 D/----->: ------->call线程:main


关于RxJava的线程分类;请参考下面这个例子:

@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.button1:
SchedulersThread(Schedulers.immediate()); // Schedulers.immediate();当前线程
break;
case R.id.button2:
SchedulersThread(Schedulers.newThread());//Schedulers.newThread() 新线程
break;
case R.id.button3:
SchedulersThread(Schedulers.io());//Schedulers.io() I/0 操作(读写文件,读写数据库。网络信息交互等)所使用的线程,行为模式与newThread(0差不多
/**
* 区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数io()比newThread()更有效率,不要把计算工作放在io()中,可以避免创建不必要的线程
*
*/
break;
case R.id.button4:
SchedulersThread(Schedulers.computation());
/**
* 计算所使用的线程,这个计算的是cpu密集计算,即不会被I/0等操作限制性能的操作
* 例如图形计算,这个scheduler使用固定的线程池,大小为cpu核数,不要把I/0放在computationx中,
* 否则Io操作的等待时间会浪费CPU
*/
break;
case R.id.button5:
SchedulersThread(AndroidSchedulers.mainThread());
/**
* 操作在android的UI线程中
*/
break;
}
}

private void SchedulersThread(Scheduler scheduler) {
rx.Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("sss");
LogUtils.d("------->call线程:" + Thread.currentThread().getName());
}
}).subscribeOn(scheduler)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
LogUtils.d("------->onNext线程:" + Thread.currentThread().getName());
}
});
}
打印结果依次:

12-26 03:57:34.811 11395-11395/com.rxandroid.test1 D/----->: ------->onNext线程:main
12-26 03:57:34.811 11395-11395/com.rxandroid.test1 D/----->: ------->call线程:main


12-26 03:57:37.180 11395-11565/com.rxandroid.test1 D/----->: ------->onNext线程:RxNewThreadScheduler-1
12-26 03:57:37.180 11395-11565/com.rxandroid.test1 D/----->: ------->call线程:RxNewThreadScheduler-1


12-26 03:57:39.810 11395-11582/com.rxandroid.test1 D/----->: ------->onNext线程:RxCachedThreadScheduler-1
12-26 03:57:39.812 11395-11582/com.rxandroid.test1 D/----->: ------->call线程:RxCachedThreadScheduler-1


12-26 03:57:42.112 11395-11601/com.rxandroid.test1 D/----->: ------->onNext线程:RxComputationThreadPool-1
12-26 03:57:42.113 11395-11601/com.rxandroid.test1 D/----->: ------->call线程:RxComputationThreadPool-1


12-26 03:57:44.373 11395-11395/com.rxandroid.test1 D/----->: ------->onNext线程:main
12-26 03:57:44.374 11395-11395/com.rxandroid.test1 D/----->: ------->call线程:main


了解了这几个Scheduler,我们就可以使用subscribeOn()和observeOb()两个方法来对线程精选控制了

subscribeOn(): 是指subscribe()所发生的线程,即Observable.OnSubscibe被激活时所处的线程,或者事件产生的的线程

observeOn() 指定Subscriber所运行的线程,或者叫做事件消费的线程


实例:常见模型 后台处理 前台展示

public void ObserverMainThreadSubscribeIo() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("sss");
LogUtils.d("------->call线程:" + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
LogUtils.d("------->onNext线程:" + Thread.currentThread().getName());
}
});
}

打印结果:

12-26 04:10:22.691 17917-17997/com.rxandroid.test1 D/----->: ------->call线程:RxCachedThreadScheduler-1
12-26 04:10:22.712 17917-17917/com.rxandroid.test1 D/----->: ------->onNext线程:main



比较好的一个例子:加载图片

 /**
* 加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。
*/
private void loadIamge() {
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.drawable.ic_launcher);
subscriber.onNext(drawable);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Drawable drawable) {
imgView.setImageDrawable(drawable);
}
});
}