RxJava漫谈-RxAndroid使用

时间:2022-09-10 20:47:47

RxJava在github上的地址:https://github.com/ReactiveX/RxJava

RxAndroid在github上的地址:https://github.com/ReactiveX/RxAndroid

本文主要介绍RxAndroid的使用,如果对于RxJava还不熟悉的可以先看一下RxJava的介绍文章。

Android的程序是用Java书写的,Android也有一些自己的线程模型,例如AsyncTask和Handler等。RxJava正是结合了前面的这几项,在此基础上推出了RxAndroid。下面介绍使用。

首先,我们在项目中引入RxAndroid,主要是在gradle脚本中引入下面两句话即可。

dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
testCompile 'junit:junit:4.12'
compile 'com.android.support:appcompat-v7:23.1.1'
compile 'com.android.support:design:23.1.1'
//引入RxAndroid----begin
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'
//引入RxAndroid----end
}

这样就可以在Android代码中使用RxAndroid了,下面的例子展示了一段使用RxAndroid书写的代码:

Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);

这一段代码是在RxAndroid的官网上写的一段示例的代码。这里可以看出来,与RxJava相比较,其主要增加了如下的几个java文件:

    AndroidSchedulers
BuildConfig
HandlerScheduler
MainThreadSubscription
RxAndroidPlugins
RxAndroidSchedulersHook

下面分别对这几个文件的源码看一下:

1.AndroidSchedulers类的源码

public final class AndroidSchedulers {
private AndroidSchedulers() {
throw new AssertionError("No instances");
} private static class MainThreadSchedulerHolder {
static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
} public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
}
}

这个类主要提供了MainThread调度器,在RxAndroid中需要在主线程中处理的食物都需要用到这个类的mainThread。

2.BuildConfig类的源码:

public final class BuildConfig {
public static final boolean DEBUG = false;
public static final String APPLICATION_ID = "rx.android";
public static final String BUILD_TYPE = "release";
public static final String FLAVOR = "";
public static final int VERSION_CODE = -1;
public static final String VERSION_NAME = "";
}

主要是一些常量配置。

3.HandlerScheduler类的源码:

public final class HandlerScheduler extends Scheduler {

    public static HandlerScheduler from(Handler handler) { //从一个Handler中创建一个Scheduler
if (handler == null) throw new NullPointerException("handler == null");
return new HandlerScheduler(handler);
} private final Handler handler; HandlerScheduler(Handler handler) {
this.handler = handler;
} @Override
public Worker createWorker() {//覆盖Scheduler的createWorker函数,创建基于Handler的Worker
return new HandlerWorker(handler);
} static class HandlerWorker extends Worker { private final Handler handler; private final CompositeSubscription compositeSubscription = new CompositeSubscription(); HandlerWorker(Handler handler) {
this.handler = handler;
} @Override
public void unsubscribe() {
compositeSubscription.unsubscribe();
} @Override
public boolean isUnsubscribed() {
return compositeSubscription.isUnsubscribed();
} @Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {//覆盖Worker的调度函数schedule
if (compositeSubscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
} action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action); final ScheduledAction scheduledAction = new ScheduledAction(action);
scheduledAction.addParent(compositeSubscription);
compositeSubscription.add(scheduledAction); handler.postDelayed(scheduledAction, unit.toMillis(delayTime));//使用Handler处理这个调度动作ScheduleAction scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
handler.removeCallbacks(scheduledAction);//这句话保证当调度动作被取消的时候,能够及时把这个action从Handler中移除
}
})); return scheduledAction;
} @Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}
}
HandlerScheduler 类就是使用Handler作为处理核心的Scheduler类。

4.MainThreadSubscription类的源码:

public abstract class MainThreadSubscription implements Subscription {

  public static void verifyMainThread() { //静态方法,判断当前线程是否是主线程
if (Looper.myLooper() != Looper.getMainLooper()) {
throw new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread().getName());
}
} private final AtomicBoolean unsubscribed = new AtomicBoolean(); @Override public final boolean isUnsubscribed() {
return unsubscribed.get();
} @Override public final void unsubscribe() {//主线程的取消订阅
if (unsubscribed.compareAndSet(false, true)) {
if (Looper.myLooper() == Looper.getMainLooper()) {//如果是主线程直接进行
onUnsubscribe();
} else {
AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {//如果不是主线程,就创建创建一个Action放到主线程中去执行
@Override public void call() {
onUnsubscribe();
}
});
}
}
} protected abstract void onUnsubscribe();
}
MainThreadSubscription类主要在意的就是unsubscribe的执行线程,这里采取一切方式保证其在主线程中执行。

5.RxAndroidPlugins类的源码:

public final class RxAndroidPlugins {//这个类的主要作用就是维护了一个RxAndroidSchedulersHook
private static final RxAndroidPlugins INSTANCE = new RxAndroidPlugins(); public static RxAndroidPlugins getInstance() {
return INSTANCE;
} private final AtomicReference<RxAndroidSchedulersHook> schedulersHook =
new AtomicReference<RxAndroidSchedulersHook>(); RxAndroidPlugins() {
} @Beta
public void reset() {
schedulersHook.set(null);
} public RxAndroidSchedulersHook getSchedulersHook() {
if (schedulersHook.get() == null) {
schedulersHook.compareAndSet(null, RxAndroidSchedulersHook.getDefaultInstance());//如果原来是null,就设置一个RxAndroidSchedulersHook
// We don't return from here but call get() again in case of thread-race so the winner will
// always get returned.
}
return schedulersHook.get();
} public void registerSchedulersHook(RxAndroidSchedulersHook impl) {
if (!schedulersHook.compareAndSet(null, impl)) {//如果原来的RxAndroidSchedulerHook是空,则直接持有,否则抛出异常
throw new IllegalStateException(
"Another strategy was already registered: " + schedulersHook.get());
}
}
}

可以看出RxAndroidSchedulerHook必须在使用前注册,一旦使用就不能再注册了。

6.RxAndroidSchedulersHook类的源码:

public class RxAndroidSchedulersHook {
private static final RxAndroidSchedulersHook DEFAULT_INSTANCE = new RxAndroidSchedulersHook(); public static RxAndroidSchedulersHook getDefaultInstance() {
return DEFAULT_INSTANCE;
} public Scheduler getMainThreadScheduler() {
return null;
} public Action0 onSchedule(Action0 action) {
return action;
}
}

这是RxAndroid提供的一个默认的RxAndroidSchedulerHook类,程序员也可以自己定义一个这样的类注册到RxAndroidPlugins中,但是必须在使用RxAndroidPlugins之前注册。

自定义的RxAndroidSchedulerHook类可以覆盖onSchedule函数,在这里进行一些处理,例如日志记录等。

以上只是说明了RxAndroid与RxJava中不一样的地方,并没有尝试说明RxJava是什么,在阅读本文之前,读者应该先弄明白这个问题。

现在再来看之前的两个例子就很明白了:

public class ReactiveFragment extends Fragment {//在UI线程中的例子
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
}
new Thread(new Runnable() {//在其他线程中的例子
@Override
public void run() {
final Handler handler = new Handler(); //绑定到这个线程的Handler
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(HandlerScheduler.from(handler))
.subscribe(/* an Observer */) // perform work, ...
}
}, "custom-thread-1").start();

RxJava漫谈-RxAndroid使用的更多相关文章

  1. &lbrack;Android&rsqb;基于RxJava、RxAndroid的EventBus实现

    以下内容为原创,欢迎转载,转载请注明 来自天天博客:http://www.cnblogs.com/tiantianbyconan/p/4578699.html  Github:https://gith ...

  2. RxJava 和 RxAndroid 五(线程调度)

    对rxJava不了解的同学可以先看 RxJava 和 RxAndroid 一 (基础)RxJava 和 RxAndroid 二(操作符的使用)RxJava 和 RxAndroid 三(生命周期控制和内 ...

  3. RxJava 和 RxAndroid 四(RxBinding的使用)

    对Rxjava不熟悉的同学可以先看我之前写的几篇文章 RxJava 和 RxAndroid 一 (基础) RxJava 和 RxAndroid 二(操作符的使用) RxJava 和 RxAndroid ...

  4. RxJava 和 RxAndroid 三(生命周期控制和内存优化)

    rxjava rxandroid 赵彦军 前言:对Rxjava.Rxandroid不了解的同学可以先看看 RxJava 和 RxAndroid RxJava 和 RxAndroid 二(操作符的使用) ...

  5. RxJava 和 RxAndroid 二(操作符的使用)

    前言:对Rx不了解的朋友可以先看我的第一篇博文 RxJava 和 RxAndroid 一 (基础),是对Rxjava的基本介绍 1.merge操作符,合并观察对象 List<String> ...

  6. RxJava 和 RxAndroid 一 &lpar;基础&rpar;

    1.RxJava 项目地址 https://github.com/ReactiveX/RxJava 2.RxAndroid 项目地址    https://github.com/ReactiveX/R ...

  7. RxJava和RxAndroid

    现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...

  8. RxJava 和 RxAndroid (生命周期控制和内存优化)

    RxJava使我们很方便的使用链式编程,代码看起来既简洁又优雅.但是RxJava使用起来也是有副作用的,使用越来越多的订阅,内存开销也会变得很大,稍不留神就会出现内存溢出的情况,这篇文章就是介绍Rxj ...

  9. RxJava RxAndroid【简介】

    资源 RxJava:https://github.com/ReactiveX/RxJava RxAndroid :https://github.com/ReactiveX/RxAndroid 官网:h ...

随机推荐

  1. SQL&ast;Loader之CASE6

    CASE6 1. SQL脚本 [oracle@node3 ulcase]$ cat ulcase6.sql set termout off rem host write sys$output &quo ...

  2. Entity Framework Code First 常用方法集成

    using System; using System.Collections.Generic; using System.Data.Entity; using System.Linq; using S ...

  3. dup和dup2函数

    下面两个函数都可用来复制一个现存的文件描述符: #include<unistd.h> int dup(int filedes); int dup2(int filedes,int file ...

  4. Beej网络socket编程指南

    bind()函数 一旦你有一个套接字,你可能要将套接字和机器上的一定的端口关联 起来.(如果你想用listen()来侦听一定端口的数据,这是必要一步--MUD 告 诉你说用命令 "telne ...

  5. ccrendertexture

    int bgHeight=150; CCSprite *sp=CCSprite::create("HelloWorld.png"); sp->setAnchorPoint(c ...

  6. Android开发手记&lpar;32&rpar; 使用摄像头拍照

    在Android中,使用摄像头拍照一般有两种方法, 一种是调用系统自带的Camera,另一种是自己写一个摄像的界面. 我们要添加如下权限: <uses-permission android:na ...

  7. 关于数据库报Packet for query is too large &lpar;1986748 &gt&semi; 1048576&rpar;&lpar;mysql写入数据过大&rpar;的解决办法

    方法2 (很妥协,很纠结的办法) 进入mysql server 在mysql 命令行中运行 set global max_allowed_packet = 2*1024*1024*10 然后关闭掉这此 ...

  8. unittest各个组件之间的关系

    各个组件的含义: TestCase:测试用例,测试用例里面会有很多测试方法,是单元测试中最小维度的测试行为. TestSuite:测试套件,是测试用例的集合. TestFixure:测试固件,测试准备 ...

  9. git 的 cat-file 的命令用法

    命令选项 git cat-file 的命令显示版本库对象的内容.类型.及大小信息. -t  Instead of the content, show the object type identifie ...

  10. 资源推荐:特意挑选了11个可以称得上&OpenCurlyDoubleQuote;神器”的Windows工具下载

    特意挑选了11个可以称得上“神器”的Windows工具,安装包获取方式在文末. 以下神器包含:OCR文字识别.百度云超速下载工具.本地文件搜索工具.软件卸载器.本地视频播放器.图片去水印神器.百度文库 ...