一、变换操作符:buffer、map、compactMap等
1.buffer
buffer方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
let subject = PublishSubject<String>()
//每缓存3个元素则组合起来一起发出。
//如果1秒钟内不够3个也会发出(有几个发几个,一个都没有发空数组 [])
subject
.buffer(timeSpan: 1, count: 3,
scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
2.window
● window 操作符和 buffer 十分相似。不过 buffer 是周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。
● 同时 buffer要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
let subject = PublishSubject<String>()
//每3个元素作为一个子Observable发出。
subject
.window(timeSpan: 1, count: 3,
scheduler: MainScheduler.instance)
.subscribe(onNext: { [weak self] in
print("subscribe: \($0)")
$0.asObservable()
.subscribe(onNext: { print($0) })
.disposed(by: self!.disposeBag)
})
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
3.map
通过传入一个函数闭包把原来的 Observable 序列转变为一个新的 Observable 序列。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果
//10
//20
//30
4.flatMap
flatMap 操作符会对源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。即又将其 “拍扁”(降维)成一个 Observable 序列。这个操作符是非常有用的。比如当 Observable 的元素本生拥有其他的 Observable 时,我们可以将所有子 Observables 的元素发送出来。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
//运行结果
//A
//B
//1
//2
//C
注意:flatMap并不保证事件的顺序,需要保证顺序则需要使用 concatMap
5.concatMap
concatMap 与 flatMap 的唯一区别是:当前一个 Observable 元素发送完毕后,后一个Observable 才可以开始发出元素。
6.scan
先给一个初始化的数,然后不断的拿前一个结果和最新的值进行处理操作。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5)
.scan(0) { acum, elem in
acum + elem
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果:1 3 6 10 15
7.groupBy
将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。该操作符会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来。
let disposeBag = DisposeBag()
//将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
.groupBy(keySelector: { (element) -> String in
return element % 2 == 0 ? "偶数" : "基数"
})
.subscribe { (event) in
switch event {
case .next(let group):
group.asObservable().subscribe({ (event) in
print("key:\(group.key) event:\(event)")
})
.disposed(by: disposeBag)
default:
print("")
}
}
.disposed(by: disposeBag)
二、过滤操作符:filter、take、skip等
1.filter
用来过滤掉某些不符合要求的事件
let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 3, 40 ,9)
.filter {
$0 > 10
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果: 30 22,60,40
2.distinctUntilChanged
过滤掉连续重复的事件
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 1, 1, 4)
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果:1 2 3 1 4
3.single
● 限制只发送一次事件,或者满足条件的第一个事件。
● 如果存在有多个事件或者没有事件都会发出一个 error 事件。
● 如果只有一个事件,则不会发出 error事件
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.single{ $0 == 2 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果: 2
Observable.of("A", "B", "C", "D")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果
//A
//Unhandled error happened:Sequence contains more than one element.
4.elementAt
只处理在指定位置的事件.
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.elementAt(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果: 3
5.ignoreElements
可以忽略掉所有的元素,只发出 error或completed 事件。如果我们并不关心 Observable 的任何元素,只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.ignoreElements()
.subscribe{
print($0)
}
.disposed(by: disposeBag)
6.take
该方法实现仅发送 Observable 序列中的前 n 个事件,在满足数量之后会自动 .completed。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果: 1 2
7.takeLast
仅发送 Observable序列中的后 n 个事件。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.takeLast(1)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果: 4
8.skip
跳过源 Observable 序列发出的前 n 个事件。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果:3 4
9.sample
● Sample 除了订阅源Observable 外,还可以监视另外一个 Observable, 即 notifier 。
● 每当收到 notifier 事件,就会从源序列取一个最新的事件并发送。而如果两次 notifier 事件之间没有源序列的事件,则不发送值。
let disposeBag = DisposeBag()
let source = PublishSubject<Int>()
let notifier = PublishSubject<String>()
source
.sample(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext(1)
//让源序列接收接收消息
notifier.onNext("A")
source.onNext(2)
//让源序列接收接收消息
notifier.onNext("B")
notifier.onNext("C")
source.onNext(3)
source.onNext(4)
//让源序列接收接收消息
notifier.onNext("D")
source.onNext(5)
//让源序列接收接收消息
notifier.onCompleted()
//运行结果: 1 2 4 5
10.debounce
● 可以用来过滤掉高频产生的元素,它只会发出这种元素:该元素产生后,一段时间内没有新元素产生。换句话说,队列中的元素如果和下一个元素的间隔小于了指定的时间间隔,那么这个元素将被过滤掉。
● debounce 常用在用户输入的时候,不需要每个字母敲进去都发送一个事件,而是稍等一下取最后一个事件
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
//定义好每个事件里的值以及发送的时间
let times = [
[ "value": 1, "time": 0.1 ],
[ "value": 2, "time": 1.1 ],
[ "value": 3, "time": 1.2 ],
[ "value": 4, "time": 1.2 ],
[ "value": 5, "time": 1.4 ],
[ "value": 6, "time": 2.1 ]
]
//生成对应的 Observable 序列并订阅
Observable.from(times)
.flatMap { item in
return Observable.of(Int(item["value"]!))
.delaySubscription(Double(item["time"]!),
scheduler: MainScheduler.instance)
}
.debounce(0.5,
scheduler: MainScheduler.instance) //只发出与下一个间隔超过0.5秒的元素
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
//运行结果:1 5 6
三、条件和布尔操作符:amb、takeWhile、skipWhile等
1.amb
当传入多个 Observables 到 amb 操作符时,取第一个发出元素或产生事件的 Observable,然后只发出它的元素。并忽略掉其他的 Observables。
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
subject1
.amb(subject2)//只取第一个amb的Observable的事件
.amb(subject3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject2.onNext(1)
subject1.onNext(20)
subject2.onNext(2)
subject1.onNext(40)
subject3.onNext(0)
subject2.onNext(3)
subject1.onNext(60)
subject3.onNext(0)
subject3.onNext(0)
//运行结果:1 2 3
2.takeWhile
依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。
let disposeBag = DisposeBag()
Observable.of(2, 3, 4, 5, 6)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//运行结果:2 3
3.takeUntil
● 除了订阅源 Observable 外,通过 takeUntil 方法我们还可以监视另外一个 Observable, 即 notifier。
● 如果 notifier 发出值或 complete 通知,那么源 Observable 便自动完成,停止发送事件
let disposeBag = DisposeBag()
let source = PublishSubject<String>()
let notifier = PublishSubject<String>()
source
.takeUntil(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext("a")
source.onNext("b")
source.onNext("c")
source.onNext("d")
//停止接收消息
notifier.onNext("z")
source.onNext("e")
source.onNext("f")
source.onNext("g")
//运行结果:a b c d
4.skipWhile
● 用于跳过前面所有满足条件的事件。
● 一旦遇到不满足条件的事件,之后就不会再跳过了
let disposeBag = DisposeBag()
Observable.of(2, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
//运行结果:4 5 6
5.skipUntil
skipUntil 除了订阅源 Observable 外,还可以监视另外一个 Observable, 即 notifier。与 takeUntil 相反的是:源 Observable 序列事件默认会一直跳过,直到 notifier 发出值或 complete 通知。
let disposeBag = DisposeBag()
let source = PublishSubject<Int