概述:
rxjava是用java实现ReactiveExtensions,可观测的序列来组成异步的、基于事件的程序的库。它扩展了观察者模式来支持数据/事件序列,而抽象掉的事情,如低级别的线程,同步,线程安全和并发数据结构.
这跟Handler、AsyncTask差不多干嘛还要去学这个呢?问题就在这,这也是rxjava的最大优点,rxjava能极大的简化各个模块类之间的耦合度
能在程序逻辑变得很复杂的情况下,依然保持简洁,本博客不会去具体讲Rxjava,只是将里面的一些常用函数列举。
接下来就来看实际应用:
首先引入哪些依赖就不用多讲了。
Rxjava中有很多操作符,这里列举几个常用的操作符,文档会在文章底部给出:
public static void createObsevable() {
//定义被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("hello");
subscriber.onNext("world");
subscriber.onNext(downLoadJson());
subscriber.onNext("HPC");
subscriber.onCompleted();
}
}
});
//观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
//关联被观察者
observable.subscribe(subscriber);
}
public static String downLoadJson() {
return "json data";
}
打印输出:helloworldjson dataHPC上面的public void call方法中有三个回调方法:onNext,onError,onCompleted,这三个方法对应着Subscriber中的三个回调方法,
会将onNext,onError,onCompleted中的参数传递到Subscriber中,有人在想:靠!这么复杂,干嘛还用啊!这里我要说明的是rxjava
是将整个代码的耦合度变低,将这个逻辑变的简洁,并不是说能节省代码量,更多的是以增加代码量来换取逻辑简洁。
public static void createPrint() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (!subscriber.isUnsubscribed()) {
for (int i = 1; i < 10; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
}
打印输出:123456789
这里的方式与上面方式的区别在于这种方式是通过.的方式来回调给观察者的。
//使用在被观察者,返回的对象一般都是数值类型
public static void from() {
Integer[] item = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Observable observable = Observable.from(item);
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
System.out.println(o.toString());
}
});
}
打印输出:123456789
from()它接收一个集合作为输入,然后每次输出一个元素给subscriber
//指定某一时刻进行数据发送
public static void interval() {
Integer[] items = {1, 2, 3, 4, 5};
Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
System.out.println(o.toString());
}
});
}
这里会每隔1秒打印出一个1个字符
//使用范围数据,指定输出数据的范围
public static void range() {
Observable observable = Observable.range(1, 40);
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
}
这里会从根据指定的范围,输出相应的数字
//使用过滤功能
public static void filter() {
Observable observable = Observable.just(1, 2, 3, 4, 5);
observable.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer o) {
return o < 5;
}
}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer.toString());
}
});
}
将其中小于5的数字给过滤掉,进行输出。
//延迟操作,只有创建订阅者并且订阅之后才会创建Observable
public void defer() {
Observable.defer(() -> Observable.just(1, 2, 3, 4)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
}
//将list数组中的数据一个个取出来
private void fromList() {
List<String> stringList = new ArrayList<>();
for (int i = 0; i < 20; i++) {
stringList.add("" + i);
}
Observable<String> observable = (Observable<String>) Observable.from(stringList).subscribe(s -> Toast.makeText(MainActivity.this, "", Toast.LENGTH_LONG).show());
}
//有限循环,将指定的内容循环发送指定的次数
private void repeat() {
Observable<Integer> observable = (Observable<Integer>) Observable.just(1, 2, 3, 4).repeat(10).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Toast.makeText(MainActivity.this, "" + integer, Toast.LENGTH_SHORT).show();
}
});
}
//可以将类型进行转换
private void flatMap() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
Observable<String> observable1 = (Observable<String>) observable.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just(integer + "");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
}
Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable
//转换为可以迭代的数据
private void flatMapIterable() {
Observable<Integer> observable = (Observable<Integer>) Observable.just(1, 2, 3, 4, 5, 6).flatMapIterable(new Func1<Integer, List<Integer>>() {
ArrayList<Integer> arrayList = new ArrayList<Integer>();
@Override
public List<Integer> call(Integer integer) {
arrayList.add(integer);
return arrayList;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Toast.makeText(MainActivity.this, "" + integer, Toast.LENGTH_SHORT).show();
}
});
}
//对值直接进行操作
private void map() {
Observable.just(1, 2, 3, 4).map(integer -> integer + "").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
}
//直接强制类型转换,和map区别是:map是通过某种方式将一种类型转换成另一种类型
//而cast是通过java的强制类型转换变成对应的类型
private void cast() {
Object o = new String();
Observable.just(o).cast(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
}
到这里就讲这么多,关于rxjava线程的切换主要就是调用ObserverOn()和SubscribeOn()两个方法,会在后面的RxAndroid中提及。
总结:
1.Observable和Subscriber可以做任何事情
Observable可以是一个数据库查询,Subscriber用来显示查询结果;
Observable可以是屏幕上的点击事件,Subscriber用来响应点击事件;
Observable可以是一个网络请求,Subscriber用来显示请求结果。(使用最多)
2.Observable和Subscriber是独立于中间的变换过程的。
在Observable和Subscriber中间可以增减任何数量的map。整个系统是高度可组合的,操作数据是一个很简单的过程。