重温RxJava(使用Kotlin编写)

时间:2024-11-17 16:22:30

前言

使用RxJava挺久的了,但是这后面接触的少,所以有一些淡忘了,于是今天再一次总结了一下,刚好最近都在用Kotlin,所以以下代码是使用Kotlin编写的。

RxJava可以说是这样的:

  • 异步:可以很方便地切换线程
  • 简洁:在复杂的逻辑中保持代码简洁

基本使用

1.创建一个Observer
  • onNext:观察数据
  • onError:事件队列失败调用的方法
  • onCompleted:事件队列完成的方法
    onError和onCompleted只能有一个被调用

Subscriber:是Observer的一个子类,使用它观察的时候,最开始会调用onStart方法,其他方法不变

2.创建Observable:
val observable = Observable.create(<String> {
    //这里的it为observer
    it.onNext("1")
    it.onNext("2")
    it.onNext("3")
    it.onCompleted()
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
3.开始监听
observable.subscribe(observer)
  • 1

当调用subscribe方法后,如果观察者是Observer,那么会将它封装到一个Subscriber中。然后最后会调用OnSubscribe的call方法。

方法
//相当于多次调用Subscriber对象的onNext()方法
val observable = ("1", "2", "...")
  • 1
  • 2
方法
val A= arrayOf("1","2","3")
val observable = (A)
  • 1
  • 2

效果跟just相同

val nextAction = Action1<String> {
    //call方法
    Log.d(TAG, "next:$it")
}

val errorAction = Action1<Throwable> {
    //call方法
    Log.d(TAG, "error:${}")
}

val completeAction = Action0 {
    //call方法
    Log.d(TAG, "complete:")
}

observable.subscribe(nextAction)//当onNext方法
observable.subscribe(nextAction, errorAction)//当onNext、onError方法
observable.subscribe(nextAction, errorAction, completeAction)//当onNext、onError、onComplete方法
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

变换

方法(适合一对一的转换)
//将图片的路径转换为bitmap
("")
        .map(object : Func1<String, Bitmap> {
            override fun call(t: String?): Bitmap {
                return getBitmap(t!!)
            }
        }).subscribe(object : Action1<Bitmap> {
            override fun call(t: Bitmap?) {
                //显示bitmap
            }
        })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
(适合一对多的转换)
val subModels1 = arrayOf("11", "12")
val subModels2 = arrayOf("21", "22")
val models = arrayOf(Model(subModels1), Model(subModels2))

(models)
        .flatMap(object : Func1<Model, Observable<String>> {
            override fun call(t: Model?): Observable<String> {
                //逐个数据发射
                return (t?.subModels)
            }
        }).subscribe {
    (TAG, "next:$it")
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

创建Observable,但并不发送Observable对象,而是激活然后将激活的Observable的数据汇集到同一个Observable进行发送。

大致的原理:创建一个新的Observable(代理),用于接收原Observable,然后发送给新的Subscriber(代理),新的Subscriber会将结果发送给目标Subscriber(我们创建的)。

线程控制

  • (): 默认,直接在当前线程中执行。
  • (): 启动新线程
  • (): 启动IO线程
  • (): 密集计算使用的线程
  • (): 在Android的主线程中使用
Observable.just("url1", "url2", "url3")
        .map {
            getBitmap(it)
        }.subscribeOn(Schedulers.io())//io线程转换
        .observeOn(AndroidSchedulers.mainThread())//在主线程中显示bitmap
        .subscribe {
            //显示bitmap
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • subscribeOn():只能指定一次,它的切换发生在OnSubscribe中,后面指定无效。
  • observeOn():可以指定多次,指定后在它后面的操作都是在该指定的线程中执行的。控制在它后面的线程。

Retrofit结合使用

1.封装Observer
abstract class BaseObserver<T> : Subscriber<T>() {
    override fun onError(e: Throwable?) {
        onFail(e)
        onEnd()
    }

    override fun onNext(t: T) {
        onSuccess(t)
    }

    override fun onCompleted() {
        onEnd()
    }

    /**
     * 结束回调
     */
    abstract fun onEnd()

    /**
     * 请求成功回调
     */
    abstract fun onSuccess(data: T)

    /**
     * 请求失败回调
     */
    abstract fun onFail(error: Throwable?)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
2.定义接口
interface Api {
    /**
     * 假设根据id获取用户信息
     */
    @GET("path")
    fun getUser(@Query("id") id: Int): Observable<User>
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
3.创建接口实例,并发送请求
//创建retrofit对象时,需要添加:addCallAdapterFactory(()),可让api方
//法的返回类型转换为Observable<T>
fun getUser(id: Int) {
    (id).subscribeOn(())
            .observeOn(())
            .subscribe(object : BaseObserver<User>() {
                override fun onStart() {
                    ()
                    //toDo:显示加载中
                }

                override fun onEnd() {
                    //toDo:做相关的操作,比如让加载提示消失
                }

                override fun onSuccess(data: User) {
                    //toDo:更新UI
                }

                override fun onFail(error: Throwable?) {
                    //toDo:提示获取失败
                }
            })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

以上便是自己对RxJava的二次学习,没有很深,还有一些没有写出来,但在平常中以上的知识点可以用到挺多场景中。