Rxjava之旅-入门篇

时间:2022-01-31 17:45:50

前言

  • Rxjava越来越火,现在是Android里面一个重要的框架,想要进阶安卓就想必要去了解一下什么是Rxjava。看了很多关于Rxjava的文章,故此留下学习的印记,让自己或者有需要的人去学习了解接触。然后一步一步去揭开Rxjava神秘的面纱~

Rxjava是什么

  • 引用官方的一句话- “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。其实总的来说就是异步操作。说起异步,我们难免会想到AsyncTask / Handler等这些。那么Rxjava和它们的一些区别又在什么地方呢?使用Rxjava的好处就在于它能在逻辑十分复杂的情况下依然能够保持简洁明了的链式代码。这个具体需要通过代码去体会它的好处,本文的目的是先入门Rxjava,因此在后续的文章中再来详细说说这个问题。

组成

  • Rxjava主要由三部分组成:Observable(被观察者)、Subscriber(观察者)、subscribe(订阅)。
  • 乍一看这就跟观察者模式有相像之处,如果还不知道观察者模式的话,可以看看这篇文章,或者去Google一下。

使用

  • 创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
 // 通过create创建observable对象,在call中调用subscriber的onnext方法
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("World");
        subscriber.onCompleted();
    }
});
  • 创建观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Rxjava: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

其实还有一种创建观察者的方式

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Rxjava: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

从本质上看这没有什么区别,因为在 RxJava 的 subscribe 过程中,Observer会被转换成Subscriber。其中更具体的区别在本文中先不说,在后续系列文章中说明。先知道一般使用Subscriber
* 最后,就是订阅事件

// 通过调用subscribe方法使观察者和订阅者产生关联,一旦订阅观察者就开始发送消息
observable.subscribe(observer);
// 或者
observable.subscribe(subscriber);
  • 输出结果
Rxjava:Hellow World Completed!

Just方法

Observable.just(1,2,3,4).subscribe(new Observer<String>() {

    @Override
    public void onCompleted() {
        Log.d(tag ,"onCompleted");
    }

    @Override
    public void onError(Throwable arg0) {

    }

    @Override
    public void onNext(String string) {
        Log.d(tag ,string+",");
    }
});
  • 输出结果
1,2,3,4,Completed!

from方法

private void Rxjava() {
        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        Observable
            .from(list)
            .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d(tag , s+",");
                  }
        });
}
  • 输出结果
1,2,3,

repeat方法

private void Rxjava() {
        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        Observable
            .from(list)
            .repeat(2)/重复执行两次
            .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d(tag , s+",");
                  }
        });
}
  • 输出结果
1,2,3,1,2,3

range方法

Observable.range(0,10).subscribe(new Observer<Integer>() {

            @Override
            public void onCompleted() {
                log.d(tag , "onCompleted");
            }

            @Override
            public void onError(Throwable arg0) {

            }

            @Override
            public void onNext(Integer arg0) {
                log.d(tag , arg0+",");
            }
        });
  • 输出结果
0,1,2,3,4,5,6,7,8,9,onCompleted

interval方法

 private void Rxjava() {
        // 第一个参数为指定轮询时间,第二个参数为轮询时间单位(这里以秒为单位)
        Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long time) {
                Log.d(tag , time);
            }
        });
    }
  • 输出结果
10-10 11:45:22.599 27973-20494/com.kid.rxjavademo rxjava:  0
10-10 11:45:24.600 27973-20494/com.kid.rxjavademo rxjava:  1
10-10 11:45:26.600 27973-20494/com.kid.rxjavademo rxjava:  2
    ...

timer方法

private void rxjava() {
        // 指定一定时间后才发射(这里是4秒钟),将会在4秒后收到事件
        Observable.timer(4, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long timer) {
                Log.d(tag , "receiver");
            }
        });
    }

另外timer还有一个三个参数的方法timer(3,3,TimeUnit.SECONDS)意思是 延迟3秒之后,每隔3秒发射一次

总结

  • 以上那些都是Rxjava比较常用的操作符,自己打过一遍上面的demo之后也就会对rxjava有了初步的理解,接下来的内容就会更加容易理解。接下来的系列文章中,将会继续探讨Rxjava。