RxJava
官方GitHub地址:https://github.com/ReactiveX/RxJava
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
译:RxJava是Reactive Extensions的Java VM实现:用于通过使用observable序列来组合异步和基于事件的程序的库。
与RxJava1.x对比
RxJava 2.0 已经按照Reactive-Streams specification规范完全的重写了。2.0 已经独立于RxJava 1.x而存在。
因为Reactive-Streams有一个不同的构架,它改变了一些以往RxJava的用法。这份文档尝试去总结所发生的变化,并描述如何把1.x的代码改写成符合2.x规则的代码。
RxJava2.0与RxJava1.0的区别,请看:http://blog.csdn.net/qq_35064774/article/details/53045298
RxJava的四个组成部分:
观察者(observer),被观察者(Observable),订阅(subscribe),响应事件。
RxJava的强大的核心:
- 异步:两个比较核心的方法subscribeOn和observeOn这两个方法都传入一个Scheduler对象,subscribeOn指定发射事件的线程,observeOn指定消费事件的线程。
- 操作符:提供了一系列的转换操作符,如map(),flatMap(),filter(),merge(),concat(),lift(),compose()等操作符可以将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
- 链式调用:链式调用最大的好处就是逻辑清晰,代码简洁。在应对较复杂的逻辑的时候,也能展现出清晰的思路!正如文章中戏称:迷之缩进!
添加依赖
dependencies {
...
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
...
}
Data类
下面代码中使用的Data数据类。
public class Data {
private int id;
private String name;
public Data(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
1.最基本的使用
//创建观察者或者订阅者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//Disposable是1.x的Subscription改名的,因为Reactive-Streams规范用这个名称,为了避免重复
//这个回调方法是在2.0之后新添加的
//可以使用d.dispose()方法来取消订阅
}
@Override
public void onNext(String value) {
Log.d("onNext", value);
}
@Override
public void onError(Throwable e) {
Log.d("onError", e.getMessage());
}
@Override
public void onComplete() {
Log.d("onComplete", "complete");
}
};
//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("Hello World!");
}
});
observable.subscribe(observer);
由于1.x中Observable不能合理的背压,导致了无法意料的 MissingBackpressureException,所以在2.x中,添加了Flowable来支持背压,而把Observable设计成非背压的。
还有一点需要注意的就是,在上边注释中也有,onSubscribe(Disposable d)这个回调方法是在2.x中添加的,Dispose参数是由1.x中的Subscription改名的,为了避免名称冲突!
2.RxJava2.x中的基本用法
所以在2.x中,最好这么写:
Subscriber<Data> subscriber = new Subscriber<Data>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe: ");
s.request(1);
}
@Override
public void onNext(Data data) {
Log.d(TAG, "onNext: first:" + data.getId() + data.getName());
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
Flowable.create(new FlowableOnSubscribe<Data>() {
@Override
public void subscribe(FlowableEmitter<Data> e) throws Exception {
Data data = new Data(1, "+Java");
Data data1 = new Data(2, "+Android");
e.onNext(data);
e.onNext(data1);
e.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(subscriber);
3.Consumer消费者
如果我们只关注onNext()方法的内容,而忽略其他方法,可以使用Consumer代替Subscriber:
Data data = new Data(1, "+Java");
Data data1 = new Data(2, "+Android");
List<Data> list = new ArrayList<>();
list.add(data);
list.add(data1);
Flowable<List<Data>> flowable = Flowable.just(list);
Consumer consunmer = new Consumer<List<Data>>() {
@Override
public void accept(List<Data> datas) throws Exception {
for (Data data : datas) {
Log.d(TAG, "onNext: second:" + data.getId() + data.getName());
}
}
};
flowable.subscribe(consunmer);
4.更简洁的写法
用just发射指定内容,可以是字符串,数组等各种对象类型。
Flowable.just("Hello,I am China!")
//替代1.x中的action1,接收一个参数,如果是两个参数action2使用BiCustomer,而且删除了action3-9
//多个参数用Custom<Object[]>
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("consumer", s);
}
});
1.map:转换操作
如果我们想修改Data数据中id为2时的name,如下:
Data data = new Data(1, "+Java");
Data data1 = new Data(2, "+Android");
List<Data> list = new ArrayList<>();
list.add(data);
list.add(data1);
Flowable.just(list).map(new Function<List<Data>, List<Data>>() {
@Override
public List<Data> apply(List<Data> datas) throws Exception {
for (Data data : datas) {
if (data.getId() == 2) {
data.setName(data.getName() + "---DeMon");
}
}
return datas;
}
}).subscribe(new Consumer<List<Data>>() {
@Override
public void accept(List<Data> datas) throws Exception {
for (Data data : datas) {
Log.d(TAG, "onNext: three:" + data.getId() + data.getName());
}
}
});
2.flatMap:批量转换
如果我们需要批量修改(一般来说是容器的数据)数据,需要使用fromIterable()发射数据,flatMap()进行批量修改:
Data data = new Data(1, "+Java");
Data data1 = new Data(2, "+Android");
List<Data> list = new ArrayList<>();
list.add(data);
list.add(data1);
Flowable.fromIterable(list).flatMap(new Function<Data, Publisher<?>>() {
@Override
public Publisher<?> apply(Data data) throws Exception {
data.setName(data.getName() + "---DeMon");
return Flowable.fromArray(data);
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Data data = (Data) o;
Log.d(TAG, "onNext: four:" + data.getId() + data.getName());
}
});
3.filter:过滤器(筛选器)
range()方法,第一个参数为开始值,第二个参数为数量;
filter()方法用于对数据进行过滤;
take(n)方法用于取前n个值。
Flowable.range(1, 100)//从5开始的10个数(5——14)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;//过滤为偶数,得到偶数
}
})
.take(3)//只要前3个数据
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
4.Schedulers:线程控制
我们可以通过subscribeOn/observeOn来制定观察者和被观察者说锤子啊的线程,然后使用chedulers进行线程控制。
这一点在Android的UI线程用尤为重要。
Observable.range(1, 9)
.subscribeOn(Schedulers.io())// IO 线程,由 subscribeOn() 指定
.observeOn(Schedulers.newThread())//新线程,由 observeOn() 指定
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return integer * 10;
}
})
.observeOn(Schedulers.io()) // IO 线程,由 observeOn() 指定
.map(new Function<Object, Object>() {
@Override
public Object apply(Object o) throws Exception {
int a = (int) o;
return a / 2;
}
}).observeOn(Schedulers.newThread())//新线程,由 observeOn() 指定
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: " + o);
}
});
RxAndroid
Android上的响应式扩展,在RxJava基础上添加了Android线程调度。
官方GitHub:https://github.com/ReactiveX/RxAndroid
针对RxJava的 Android特定绑定2。
该模块将RxJava的最小类添加到Android应用程序中编写无效组件,方便无忧。更具体地说,它提供了Scheduler在主线程上或任何给定的时间表Looper。
曾几何时,RxAndroid确实提供了很多的实用的方法,但后来社区上很多人对这库的结构有意见,然后作者便进行了重构,现在只保留了AndroidSchedulers, 现在基本RxAndroid只有一个功能,那就是AndroidSchedulers.mainThread 方法用于指定主线程。
所以就在根据官方文档简单介绍一下RxAndroid的使用。
添加依赖
因为rxandroid发行很少,建议你也依赖于rxjava最新版本的bug修复和新功能。
dependencies {
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
}
Observing on the main thread
在Android上处理异步任务时最常见的操作之一是在主线程上观察任务的结果或结果。使用香草Android,这通常会用一个完成 AsyncTask。使用RxJava,您将声明Observable在主线程上被观察:
Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//text.setText(s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
这将执行Observable一个新的线程,并通过onNext主线程发出结果。
Observing on arbitrary loopers
以前的示例只是一个更通用的概念的专门化:将异步通信绑定到Android消息循环,或者Looper。为了观察一个Observable任意的 Looper,Scheduler通过调用创建一个关联AndroidSchedulers.from:
Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
.observeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe(/* an Observer */);
这将在新线程上执行Observable,并通过onNext任何正在运行的线程发出结果backgroundLooper。
官方Demo
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
private final CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
findViewById(R.id.button).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
onRunSchedulerExampleButtonClicked();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
void onRunSchedulerExampleButtonClicked() {
disposables.add(sampleObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
Log.d(TAG, "onComplete()");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError()", e);
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext(" + string + ")");
}
}));
}
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(5000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
}
结语
本文简单介绍RxJava/RxAndroid 2.0的一些基本概念和简单使用,接下来还会继续学习RxJava在Android中的使用,以及经典的 RxAndroid(RxJava)和Retrofit的结合。
参考:
http://blog.csdn.net/aiynmimi/article/details/53382567
http://gank.io/post/560e15be2dca930e00da1083#toc_19