有用的Kotlin版Rx代码片段1

时间:2022-01-01 17:00:33

只是几个随机的,可能有用的Rx片段(kotlin版)。

计时器

是的,只是一个简单的计时器,运行5秒。

Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
        .take(5)
        .subscribeBy (
                onNext = {  },
                onComplete = {  }
        )

网络调用重试

如果发生故障,重试网络调用或任何长时间运行的进程(本例为三次),并且每次都增加延迟时间。 (说明

Observable.fromPublisher<String> {
    it.onNext("Doing a network call!")
    Thread.sleep(1000)      // 长时间运行
    it.onError(Exception()) // 抛出错误异常
}.retryWhen { errors ->
    errors.zipWith(Observable.range(1, 3)   
            .concatMap { retryCount ->
                Observable.timer(retryCount.toLong() * 10, TimeUnit.SECONDS)
            }
    )
}.blockingSubscribeBy(
        onNext = { println(it) },
        onError = { println(it) },
        onComplete = { println("Complete") }
)

链接请求

链接请求或长时间运行的进程。

Observable.fromPublisher<String> {
    Thread.sleep(1000)           // 长时间运行
    it.onNext("First Response!") // 响应
}.flatMap { response ->
    Observable.fromPublisher<String> {
        println(response)        // 处理首次响应
        Thread.sleep(1000)       // 长时间运行
        it.onNext("Final Response!")
    }
}.subscribeBy(
        onNext = {
            println(it)          // 处理最后的响应
        }
)

轮询请求

轮询请求,直到某个条件成立。

  var count = 0
  Observable.fromPublisher<Int> {  
      count += 1       
      it.onNext(count)
      it.onComplete()               // pretend this is an api request
  }.repeatWhen {
      it.delay(3, TimeUnit.SECONDS) // poll after 3 seconds delay
  }.takeUntil {
      it == 3                       // condition to stop polling
  }.blockingSubscribeBy (
          onNext = { println(it) },             // 1 - (delay) - 2 - (delay) - 3
          onComplete = { println("Complete") }  // called when the condition is fulfilled 
  )

获取首字母缩写

从名字列表中获取首字母缩写。

Observable.fromArray("Some Name", "Some Other Name")
        .map { it.split(' ') }
        .flatMap { names ->
            Observable.fromIterable(names)
                    .filter { it.isNotEmpty() }
                    .takeLast(2)
                    .reduce("", { acc: String, element: String ->
                        "$acc${element[0]}"
                    })
                    .map { it.toUpperCase() }
                    .filter { it.isNotEmpty() }
                    .toObservable()
        }
        .subscribeBy(
                onNext = {
                    println(it) //SN, ON
                }
        )

获取RecyclerView可见项(RxAndroid)

用户完成滚动后,稍微延迟一下,从RecyclerView(使用RxBinding)获取所有可见项。

RxRecyclerView.scrollEvents(recyclerView)
        .debounce(1, TimeUnit.SECONDS)
        .map {
            (layoutManager.findFirstVisibleItemPosition()..layoutManager.findLastVisibleItemPosition()).map { index ->
               items[index]
            } 
        }.flatMap {
            Observable.fromIterable(it) // list to single item
        }.subscribeBy(onNext = {
                // Item1, Item2... 
        })

一些参考:Dan Lew’s post on Retry vs RepeatA post on long pollingRxBindingRxKotlin

翻译来源:https://android.jlelse.eu/useful-rx-snippets-1-1f390ce727a7