RxJava2.0学习笔记2 2018年7月3日 周二

时间:2023-01-07 00:04:27

摘记:

  1.map -- 转换  

   有些服务端的接口设计,会在返回的数据外层包裹一些额外信息,这些信息对于调试很有用,但本地显示是用不到的。使用 map() 可以把外层的格式剥掉,只留下本地会用到的核心格式。代码大致形式:

api.getData()
.map(response -> response.data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

当然,map() 也可以用于基于其他各种需求的格式转换。

  2.flatMap(token)-- concatMap(有序)  

  出于安全性、性能等方面的考虑,多数服务器会有一些接口需要传入 token 才能正确返回结果,而 token 是需要从另一个接口获取的,这就需要使用两步连续的请求才能获取数据(①token -> ②目标数据)。使用 flatMap() 可以用较为清晰的代码实现这种连续请求,避免 Callback 嵌套的结构。代码大致形式:

api.getToken()
.flatMap(token -> api.getData(token))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

  3.zip -- 压合/拉链  

  zip内部用的队列实现。有的时候,app 中会需要同时访问不同接口,然后将结果糅合后转为统一的格式后输出(例如将第三方广告 API 的广告夹杂进自家平台返回的数据 List 中)。这种并行的异步处理比较麻烦,不过用了 zip() 之后就会简单得多。代码大致形式:

Observable.zip(api.getData(),adApi.getAds(), zipFunc())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

注意:

  zip是有发送顺序的,默认情况下两个Observable如果在同一线程,先发送的水管一再发送的水管二。 因为我们两根水管都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序。通常我们使用时,需要指定这两个Obervable为Schedulers.io() 。

  zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 笔者的例子中我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?

  

Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io()); Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});

  4.retryWhen -- token高级   

  有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token 并继续访问之前由于 token 失效而失败的请求。如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。代码大致形式:

api.getData(token)
.retryWhen(observable ->
observable.flatMap( ->
api.getToken()
.doOnNext(->updateToken())))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

  5.BehaviorSubject -- 缓存  

  RxJava 中有一个较少被人用到的类叫做 Subject,它是一种『既是 Observable,又是 Observer』的东西,因此可以被用作中间件来做数据传递。例如,可以用它的子类 BehaviorSubject 来制作缓存。代码大致形式:

api.getData()
.subscribe(behaviorSubject); // 网络数据会被缓存 behaviorSubject.subscribe(observer); // 之前的缓存将直接送达 observer

  6.Disposable -- 一次性用品    

  它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不应该再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可。

那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable,

每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中,

在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管。

  7.BackPressure -- 背压概念 

  所谓的Backpressure其实就是为了控制流量, 水缸存储的能力毕竟有限, 因此我们还得从源头去解决问题, 既然你发那么快, 数据量那么大, 那我就想办法不让你发那么快呗.

    ①通过filter操作符对上游发来的数据进行过滤之后在发送给下游,通过减少进入水缸的事件数量缓解上下游流速不均衡的问题;

    ②sample操作符 -- 取样   eg:每隔指定2秒就从上游中取出一个事件发送给下游:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.sample(2, TimeUnit.SECONDS) //sample对上游事件每隔2s放到水缸中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});

     虽然上游仍然一直在不停的发事件, 但是我们只是每隔一定时间取一个放进水缸里, 并没有全部放进水缸里,

以上两种方式都是通过减少放进水缸的事件数量来间接解决流速不均衡的问题,但是会丢失大部分的事件

  ③控制上游的发送速度,Thread.sleep(2000),减缓事件发送进水缸的速度 ;

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); //每次发送完事件延时2秒
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});

  8.Flowable--响应式拉取

    采用响应式拉取的方式来更好的解决上下游流速不均衡的问题

    上游 Observable 和下游 Observer   :Disposable.dispose()方法可以切断水管;

    上游 Flowable  和下游 Subscriber :Subscription.cancel()也可以切断水管,不同之处在于Subscription增加了一个void request(long n)方法。

    ①同步订阅:上下游默认处在同一线程情况下,如果下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待, 如果这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常(MissingBackpressureException)来提醒我们。解决方式:下游直接调用request(Long.MAX_VALUE), 或者根据上游发送事件的数量来request也可以。

    ②异步订阅:

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
          .subscribeOn(Schedulers.io())      //异步订阅
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() { @Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
               // s.request(Long.MAX_VALUE);  //此时去掉了request这句代码
} @Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
} @Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
} @Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

结果:

04-24 09:58:29.318 11464-11464/? D/TAG: onSubscribe
04-24 09:58:29.327 11464-11530/? D/TAG: emit 1
04-24 09:58:29.328 11464-11530/? D/TAG: emit 2
04-24 09:58:29.328 11464-11530/? D/TAG: emit 3
04-24 09:58:29.329 11464-11530/? D/TAG: emit complete

  然后我们看运行结果,为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢? 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游。验证

   Flowabel中无限发送+BUFFER -- 可能有朋友也注意到了, 之前使用Observable测试的时候内存增长非常迅速, 几秒钟就OOM, 但这里增长速度却比较缓慢, 可以翻回去看之前的文章中的GIF图进行对比, 这也看出FLowable相比Observable, 在性能方面有些不足, 毕竟FLowable内部为了实现响应式拉取做了更多的操作, 性能有所丢失也是在所难免, 因此单单只是说因为FLowable是新兴产物就盲目的使用也是不对的, 也要具体分场景。

   FLowable中,通过控制数量解决上游发送事件太快方式是BackpressureStrategy.DROPBackpressureStrategy.LATEST。(Oberverable--7 ①、②)

   Drop就是直接把存不下的事件丢弃,Latest就是只保留最新的事件。

面对有些FLowable并不是我自己创建的, 该怎么办呢? 比如RxJava中的interval操作符, 这个操作符并不是我们自己创建的, 来看下面这个例子吧():

Flowable.interval(1, TimeUnit.MICROSECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
} @Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000); //延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
} @Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
} @Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

interval操作符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 我们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么 →  MissingBackpressureException

虽然不是我们自己创建的, 但是RxJava给我们提供了其他的方法:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

在上面的代码中做修改:

Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上背压策略
.observeOn(AndroidSchedulers.mainThread())
......

教程九  https://www.jianshu.com/p/36e0f7f43a51

  从上上篇文章中我们知道并不是这样的,上游仍然是一开始就发送了所有的事件,也就是说小日本并没有等叶问打死一个才拿出一个,而是一开始就拿出了所有的鬼子,这些鬼子从一开始就在这儿排队等着被打死。

  那么上游从哪里得知下游的处理能力呢?我们来看看上游最重要的部分,肯定就是FlowableEmitter了啊,我们就是通过它来发送事件的啊,来看看它的源码:

public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c); /**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested(); boolean isCancelled();
FlowableEmitter<T> serialize();
}

FlowableEmitter是个接口,继承Emitter,Emitter里面就是我们的onNext(),onComplete()和onError()三个方法。我们看到FlowableEmitter中有这么一个方法:

long requested();

方法注释的意思就是当前外部请求的数量。

下游调用request(n) 告诉上游它的处理能力,上游每发送一个next事件之后,requested就减一,注意是next事件,complete和error事件不会消耗requested,当减到0时,则代表下游没有处理能力了,这个时候你如果继续发送事件,会发生什么后果呢?当然是MissingBackpressureException。

  总结一下同步的情况:当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,继续发送事件便会抛异常了。

  异步情况:当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。
  现在我们就能理解为什么没有调用request,上游中的值是128了,因为下游在一开始就在内部调用了request(128)去设置了上游中的值,因此即使下游没有调用request(),上游也能发送128个事件,这也可以解释之前我们为什么说Flowable中默认的水缸大小是128,其实就是这里设置的。

当下游消费掉第96个事件之后,上游又开始发事件了,而且可以看到当前上游的requested的值是96(打印出来的95是已经发送了一个事件减一之后的值),最终发出了第223个事件之后又进入了等待区,而223-127 正好等于 96。这是不是说明当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值啊!

  在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待。

 public static void main(String[] args) {
practice1();
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} public static void practice1() {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
try {
System.out.println("First requedted: " + emitter.requested());
System.out.println("First isCancelled: " + emitter.isCancelled());
String filePath = "C:\\fastwork\\Work\\RfStudy\\test.txt";
FileReader reader = new FileReader(filePath);
BufferedReader br = new BufferedReader(reader); String str; while ((str = br.readLine()) != null && !emitter.isCancelled()) {
while (emitter.requested() == 0) {
if (emitter.isCancelled()) {
break;
}
}
emitter.onNext(str);
} br.close();
reader.close();
System.out.println("emit requested = " + emitter.requested()); emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() { @Override
public void onSubscribe(Subscription s) {
mSubscription = s;
System.out.println("onSubscribe...");
s.request(1);//告诉上游,【我】要从你那个128里取值。。。
} @Override
public void onNext(String string) {
System.out.println(string);
try {
Thread.sleep(2000);
mSubscription.request(1);//每隔2s从水缸里取出来一条readline的话,回传到onNext里,在做输出
} catch (InterruptedException e) {
e.printStackTrace();
}
} @Override
public void onError(Throwable t) {
} @Override
public void onComplete() {
System.out.println("onComplete...");
}
});
}

运行结果:

RxJava2.0学习笔记2 2018年7月3日 周二

RxJava2.0学习笔记2 2018年7月3日 周二

   9.Flowable一些操作【转】

fromIterable

RxJava 2.0 提供了fromIterable方法,可以接收一个 Iterable 容器作为输入,每次发射一个元素。

eg:Flowable.fromIterable(list) .subscribe(num -> System.out.println(num));

  再通过flatMap将一个Flowable转换成另一个Flowable:

List<Integer> list = new ArrayList<>();
list.add(10);
list.add(1);
list.add(5); Flowable.just(list)
.flatMap(new Function<List<Integer>, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(List<Integer> integers) throws Exception {
return Flowable.fromIterable(integers);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

和 map 不同之处在于 flatMap 返回的是一个 Flowable 对象。这正是我们想要的,我们可以把从List发射出来的一个一个的元素发射出去。

filter

filter 是用于过滤数据的,返回false表示拦截此数据。

如果我们想要订阅者只能收到大于5的数据,那么你可以这样做:

Flowable.fromArray(1, 20, 5, 0, -1, 8)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() > 5;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

take

take 用于指定订阅者最多收到多少数据。

如果我们只想要2个数据:

Flowable.fromArray(1, 2, 3, 4)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

doOnNext

doOnNext 允许我们在每次输出一个元素之前做一些额外的事情。

如果我们想在订阅者接收到数据前干点事情,比如记录日志:

Flowable.just(1, 2, 3)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("保存:" + integer);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

===========================================================================================》以下内容记录日期2018年3月29日 星期四 

  一、【入门首选】简书九篇博客 给初学者的RxJava2.0教程系列

  Season_zlchttps://www.jianshu.com/u/c50b715ccaeb

  RxJava2.0学习笔记2 2018年7月3日 周二

  二、【推荐看完入门首选后,看这篇】Rxjava2的学习与总结 - 泡在网上的日子

    http://www.jcodecraeer.com/a/chengxusheji/java/2017/0731/8315.html

  三、【视频】RxJava与RxAndroid基础入门-

   慕课网 - RxJava与RxAndroid基础入门

RxJava2.0学习笔记2 2018年7月3日 周二

   视频学习笔记 - http://www.cnblogs.com/jooy/p/8674816.html

  四、RxJava最友好的系列文章

   https://juejin.im/collection/582c23fc1e35c9488c282dde

  五、这可能是最好的RxJava 2.x 入门教程系列

    https://blog.csdn.net/nanchen_lsl/article/details/73527851