RxSwift系列(二)操作符

时间:2024-10-06 07:52:19

一、变换操作符:buffer、map、compactMap等

1.buffer

buffer方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。

import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
 
        let subject = PublishSubject<String>()
 
        //每缓存3个元素则组合起来一起发出。
        //如果1秒钟内不够3个也会发出(有几个发几个,一个都没有发空数组 [])
        subject
            .buffer(timeSpan: 1, count: 3, 
                    scheduler: MainScheduler.instance)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
 
        subject.onNext("a")
        subject.onNext("b")
        subject.onNext("c")
         
        subject.onNext("1")
        subject.onNext("2")
        subject.onNext("3")
    }
}

2.window

● window 操作符和 buffer 十分相似。不过 buffer 是周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。
● 同时 buffer要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。

import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
         
        let subject = PublishSubject<String>()
         
        //每3个元素作为一个子Observable发出。
        subject
            .window(timeSpan: 1, count: 3, 
                    scheduler: MainScheduler.instance)
            .subscribe(onNext: { [weak self]  in
                print("subscribe: \($0)")
                $0.asObservable()
                    .subscribe(onNext: { print($0) })
                    .disposed(by: self!.disposeBag)
            })
            .disposed(by: disposeBag)
         
        subject.onNext("a")
        subject.onNext("b")
        subject.onNext("c")
         
        subject.onNext("1")
        subject.onNext("2")
        subject.onNext("3")
    }
}

3.map

通过传入一个函数闭包把原来的 Observable 序列转变为一个新的 Observable 序列。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3)
    .map { $0 * 10}
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果
//10
//20
//30

4.flatMap

flatMap 操作符会对源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。即又将其 “拍扁”(降维)成一个 Observable 序列。这个操作符是非常有用的。比如当 Observable 的元素本生拥有其他的 Observable 时,我们可以将所有子 Observables 的元素发送出来。

let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
 
let variable = Variable(subject1)
 
variable.asObservable()
    .flatMap { $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")

//运行结果
//A
//B
//1
//2
//C

注意:flatMap并不保证事件的顺序,需要保证顺序则需要使用 concatMap

5.concatMap

concatMap 与 flatMap 的唯一区别是:当前一个 Observable 元素发送完毕后,后一个Observable 才可以开始发出元素。

6.scan

先给一个初始化的数,然后不断的拿前一个结果和最新的值进行处理操作。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4, 5)
    .scan(0) { acum, elem in
        acum + elem
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果:1 3 6 10 15

7.groupBy

将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。该操作符会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来。

let disposeBag = DisposeBag()
 
//将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
    .groupBy(keySelector: { (element) -> String in
        return element % 2 == 0 ? "偶数" : "基数"
    })
    .subscribe { (event) in
        switch event {
        case .next(let group):
            group.asObservable().subscribe({ (event) in
                print("key:\(group.key)    event:\(event)")
            })
            .disposed(by: disposeBag)
        default:
            print("")
        }
    }
.disposed(by: disposeBag)

二、过滤操作符:filter、take、skip等

1.filter

用来过滤掉某些不符合要求的事件

let disposeBag = DisposeBag()
 
Observable.of(2, 30, 22, 5, 60, 3, 40 ,9)
    .filter {
        $0 > 10
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//运行结果: 30 22,60,40

2.distinctUntilChanged

过滤掉连续重复的事件

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 1, 1, 4)
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果:1 2 3 1 4

3.single

● 限制只发送一次事件,或者满足条件的第一个事件。
● 如果存在有多个事件或者没有事件都会发出一个 error 事件。
● 如果只有一个事件,则不会发出 error事件

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .single{ $0 == 2 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果: 2
 
Observable.of("A", "B", "C", "D")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果 
//A
//Unhandled error happened:Sequence contains more than one element.

4.elementAt

只处理在指定位置的事件.

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .elementAt(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果: 3

5.ignoreElements

可以忽略掉所有的元素,只发出 error或completed 事件。如果我们并不关心 Observable 的任何元素,只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .ignoreElements()
    .subscribe{
        print($0)
    }
    .disposed(by: disposeBag)

6.take

该方法实现仅发送 Observable 序列中的前 n 个事件,在满足数量之后会自动 .completed。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .take(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果: 1 2

7.takeLast

仅发送 Observable序列中的后 n 个事件。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .takeLast(1)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果: 4

8.skip

跳过源 Observable 序列发出的前 n 个事件。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果:3 4

9.sample

● Sample 除了订阅源Observable 外,还可以监视另外一个 Observable, 即 notifier 。
● 每当收到 notifier 事件,就会从源序列取一个最新的事件并发送。而如果两次 notifier 事件之间没有源序列的事件,则不发送值。

let disposeBag = DisposeBag()
 
let source = PublishSubject<Int>()
let notifier = PublishSubject<String>()
 
source
    .sample(notifier)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
source.onNext(1)
 
//让源序列接收接收消息
notifier.onNext("A")
 
source.onNext(2)
 
//让源序列接收接收消息
notifier.onNext("B")
notifier.onNext("C")
 
source.onNext(3)
source.onNext(4)
 
//让源序列接收接收消息
notifier.onNext("D")
 
source.onNext(5)
 
//让源序列接收接收消息
notifier.onCompleted()

//运行结果: 1 2 4 5

10.debounce

● 可以用来过滤掉高频产生的元素,它只会发出这种元素:该元素产生后,一段时间内没有新元素产生。换句话说,队列中的元素如果和下一个元素的间隔小于了指定的时间间隔,那么这个元素将被过滤掉。
● debounce 常用在用户输入的时候,不需要每个字母敲进去都发送一个事件,而是稍等一下取最后一个事件

import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
 
        //定义好每个事件里的值以及发送的时间
        let times = [
            [ "value": 1, "time": 0.1 ],
            [ "value": 2, "time": 1.1 ],
            [ "value": 3, "time": 1.2 ],
            [ "value": 4, "time": 1.2 ],
            [ "value": 5, "time": 1.4 ],
            [ "value": 6, "time": 2.1 ]
        ]
         
        //生成对应的 Observable 序列并订阅
        Observable.from(times)
            .flatMap { item in
                return Observable.of(Int(item["value"]!))
                    .delaySubscription(Double(item["time"]!),
                                       scheduler: MainScheduler.instance)
            }
            .debounce(0.5, 
                      scheduler: MainScheduler.instance) //只发出与下一个间隔超过0.5秒的元素
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
}

//运行结果:1 5 6

三、条件和布尔操作符:amb、takeWhile、skipWhile等

1.amb

当传入多个 Observables 到 amb 操作符时,取第一个发出元素或产生事件的 Observable,然后只发出它的元素。并忽略掉其他的 Observables。

let disposeBag = DisposeBag()
 
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
 
subject1
    .amb(subject2)//只取第一个amb的Observable的事件
    .amb(subject3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject2.onNext(1)
subject1.onNext(20)
subject2.onNext(2)
subject1.onNext(40)
subject3.onNext(0)
subject2.onNext(3)
subject1.onNext(60)
subject3.onNext(0)
subject3.onNext(0)

//运行结果:1 2 3

2.takeWhile

依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。

let disposeBag = DisposeBag()
 
Observable.of(2, 3, 4, 5, 6)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//运行结果:2 3 

3.takeUntil

● 除了订阅源 Observable 外,通过 takeUntil 方法我们还可以监视另外一个 Observable, 即 notifier。
● 如果 notifier 发出值或 complete 通知,那么源 Observable 便自动完成,停止发送事件

let disposeBag = DisposeBag()
 
let source = PublishSubject<String>()
let notifier = PublishSubject<String>()
 
source
    .takeUntil(notifier)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
source.onNext("a")
source.onNext("b")
source.onNext("c")
source.onNext("d")
 
//停止接收消息
notifier.onNext("z")
 
source.onNext("e")
source.onNext("f")
source.onNext("g")

//运行结果:a b c d

4.skipWhile

● 用于跳过前面所有满足条件的事件。
● 一旦遇到不满足条件的事件,之后就不会再跳过了

let disposeBag = DisposeBag()
 
Observable.of(2, 3, 4, 5, 6)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    }
}
//运行结果:4 5 6

5.skipUntil

skipUntil 除了订阅源 Observable 外,还可以监视另外一个 Observable, 即 notifier。与 takeUntil 相反的是:源 Observable 序列事件默认会一直跳过,直到 notifier 发出值或 complete 通知。

let disposeBag = DisposeBag()
 
let source = PublishSubject<Int