基本结构
我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext( "1" );
subscriber.onCompleted();
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.create(onSubscriber1)
.subscribe(subscriber1);
|
首先我们来看一下Observable.create的代码
1
2
3
4
5
6
7
|
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this .onSubscribe = f;
}
|
直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。
接下来我们来看看subscribe方法。
1
2
3
4
5
6
7
8
9
10
|
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this );
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}
|
可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?
1
2
3
4
|
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext( "1" );
subscriber.onCompleted();
}
|
RxJava使用场景小结
1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
final Observable<String> memory = Observable.create( new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null ) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create( new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString( "cache" ).get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just( "network" );
//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory" ;
System.out.println( "--------------subscribe: " + s);
});
|
2.界面需要等到多个接口并发取完数据,再更新
1
2
3
4
5
6
7
8
9
|
//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
private void testMerge() {
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
}
|
3.一个接口的请求依赖另一个API请求返回的数据
举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。
这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callback hell
1
2
3
4
5
|
NetworkService.getToken( "username" , "password" )
.flatMap(s -> NetworkService.getMessage(s))
.subscribe(s -> {
System.out.println( "message: " + s);
});
|
4.界面按钮需要防止连续点击的情况
1
2
3
4
5
|
RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst( 1 , TimeUnit.SECONDS)
.subscribe(aVoid -> {
System.out.println( "click" );
});
|
5.响应式的界面
比如勾选了某个checkbox,自动更新对应的preference
1
2
3
4
5
6
7
8
|
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences( this );
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean( "checked" , true );
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());
|
6.复杂的数据变换
1
2
3
4
5
6
7
|
Observable.just( "1" , "2" , "2" , "3" , "4" , "5" )
.map(Integer::parseInt)
.filter(s -> s > 1 )
.distinct()
.take( 3 )
.reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
.subscribe(System.out::println); //9
|