美文网首页RxSwift学习
RxSwift 中所有操作符的使用示例

RxSwift 中所有操作符的使用示例

作者: ATrucouradentor | 来源:发表于2017-09-27 17:22 被阅读141次

示例代码均来自 RxSwift 项目源码

目录

创建 ObservableType 和 Subscribe 操作

一个标准的自定义可观察序列 Observable sequence

let disposeBag = DisposeBag()

let myJust = { (element: String) -> Observable<String> in
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}
    
myJust("🔴")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

generate

在某个条件不满足之前将一直发送 next 事件。

example("generate") {
    let disposeBag = DisposeBag()
    
    Observable.generate(
            initialState: 0,
            condition: { $0 < 3 },
            iterate: { $0 + 1 }
        )
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
}
// 打印:
0 
1
2

deffered

为每一个订阅者创建一个新的可观察序列。(暂时称作:『固定式事件发送』)每一次订阅都会执行一次闭包内容。

let disposeBag = DisposeBag()
var count = 1

let deferredSequence = Observable<String>.deferred {
    print("Creating \(count)")
    count += 1
    
    return Observable.create { observer in
        print("Emitting...")
        observer.onNext("🐶")
        observer.onNext("🐱")
        observer.onNext("🐵")
        return Disposables.create()
    }
}

deferredSequence
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

deferredSequence
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    
// 打印:
Creating 1
Emitting...
🐶
🐱
🐵

Creating 2
Emitting...
🐶
🐱
🐵

Subject

extension ObservableType {
    
    /**
     Add observer with `id` and print each emitted event.
     - parameter id: an identifier for the subscription.
     */
    func addObserver(_ id: String) -> Disposable {
        return subscribe { print("Subscription:", id, "Event:", $0) }
    }
    
}

PublishSubject

一旦执行订阅(addObserver),会向所有 observer 广播新的事件。便于发送消息????

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

// 打印:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)

Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

disposeBag 销毁的时候,会自动清理资源,而不是 .disposed(by: disposeBag) 时立即清理。

ReplaySubject

设置 bufferSize 为新增的订阅者广播 bufferSize 数量的之前事件。

let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

// 打印:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)

Subscription: 2 Event: next(🐱) // 获得一个之前事件的广播
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

BehaviorSubject

广播新事件给订阅者的同时,新增的订阅者将获得最近的一个值(或初始值)的事件。

let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

subject.addObserver("3").disposed(by: disposeBag)
subject.onNext("🍐")
subject.onNext("🍊")

// 打印:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)

Subscription: 2 Event: next(🐱)// 获取最近的值事件
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

Subscription: 3 Event: next(🅱️)// 获取最近的值事件
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)

<u>PublishSubjectReplaySubjectBehaviorSubject 都不会在将要 dispose 时自动发送 Completed 事件。</u>

Variable

封装了一个 BehaviorSubject ;不会发送 Error 事件,但会在其销毁时,自动发送 Completed 事件。

let disposeBag = DisposeBag()
let variable = Variable("🔴")

variable.asObservable().addObserver("1").disposed(by: disposeBag)
variable.value = "🐶"
variable.value = "🐱"

variable.asObservable().addObserver("2").disposed(by: disposeBag)
variable.value = "🅰️"
variable.value = "🅱️"

// 打印:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)

Subscription: 2 Event: next(🐱)// 获取最近的值事件
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

Subscription: 1 Event: completed
Subscription: 2 Event: completed

要点:
调用 asObservable 是为了获取其隐藏的 BehaviorSubject 序列;Variable 不可以使用 ononNext(_:) 等操作符,而是可以直接赋值。

Combination Operators

startWith

源 Observable 发送元素之前,先发送指定的元素。

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱")
    .startWith("1️⃣")
    .startWith("2️⃣")
    .startWith("3️⃣", "🅰️", "🅱️")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    
// 打印:
3️⃣
🅰️
🅱️
2️⃣
1️⃣
🐶
🐱

startWith 可以串连使用,形成 『栈』 式结构(last-in-first-out),最后加入的将优先发送元素。

merge

将多个源 Observable 序列的元素合并为一个新的 Observable 序列,元素的发送顺序按照源 Observable 发送元素的先后展开。

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")

// 打印:
🅰️
🅱️
①
②
🆎
③

zip

将最多8个 Observable 序列合并至一个 Observable 序列,新的序列发送的元素是『由源序列对应下标发送的元素组成』。

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")

intSubject.onNext(1)
intSubject.onNext(2)

stringSubject.onNext("🆎")
intSubject.onNext(3)

// 打印:
🅰️ 1
🅱️ 2
🆎 3

combineLatest

将最多8个 Observable 序列合并至一个 Observable 序列,新的序列发送的元素的条件是:每个源序列至少已经发送了一个元素,接下来任意序列发送了一个新元素,新的序列都会发送『由所有源序列的最新元素组成』的新元素。

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
        "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("🆎")

// 打印:
🅱️ 1
🅱️ 2
🆎 2

Array.combineLatest

let disposeBag = DisposeBag()

let stringObservable = Observable.just("❤️")
let fruitObservable = Observable.from(["🍎", "🍐", "🍊"])
let animalObservable = Observable.of("🐶", "🐱", "🐭", "🐹")

Observable.combineLatest([stringObservable, fruitObservable, animalObservable]) {
        "\($0[0]) \($0[1]) \($0[2])"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)   

// 打印:
❤️ 🍎 🐶
❤️ 🍐 🐶
❤️ 🍐 🐱
❤️ 🍊 🐱
❤️ 🍊 🐭
❤️ 🍊 🐹

由于合并后传递给新序列的元素通过一个数组传递,所以源序列的元素类型必须相同。

switchLatest

当你的事件序列是一个事件序列的序列 (Observable<Observable<T>>) 的时候,可以使用switch将序列的序列平铺成一维,并且在出现新的序列的时候,自动切换到最新的那个序列上。和merge相似的是,它也是起到了将多个序列『拍平』成一条序列的作用。『降维』

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")

let variable = Variable(subject1)
    
variable.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🏈")
subject1.onNext("🏀")

variable.value = subject2

subject1.onNext("⚾️")// subject1 已经中断。
subject2.onNext("🍐")

// 打印:
⚽️
🏈
🏀
🍎
🍐

Transforming Operators

转换 next 事件的元素。

map

略。

flatMap and flatMapLatest

当一个元素发出之后,我们想将该元素转换为一个序列(暂时称之为 『元素序列』),这时可以用到 flatMap。

在 Swift 中,我们可以用flatMap过滤掉map之后的nil结果。在 Rx 中,flatMap可以把一个序列转换成一组序列,然后再把这一组序列『拍扁』成一个序列。
flatMapLatest 等价于 map + switchLatest。

let disposeBag = DisposeBag()

struct Player {
    var score: Variable<Int>
}

let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))

let player = Variable(👦🏻)

player.asObservable()
    .flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

👦🏻.score.value = 85

player.value = 👧🏼

👦🏻.score.value = 95 // Will be printed when using flatMap, but will not be printed when using flatMapLatest

👧🏼.score.value = 100

// 打印:
80
85
90
95 // 如果是flatMapLatest,这个值不会打印。
100

scan

相当于 Swift 中的 reduce。

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .scan(1) { aggregateValue, newValue in
        aggregateValue + newValue
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

// 打印:
11
111
1111

Filtering and Conditional Operators </h2>

filter

public func filter(_ predicate:
    @escaping (Self.E) throws -> Bool) -> 
     RxSwift.Observable<Self.E>

distinctUntilChanged

过滤掉重复的元素。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

elementAt

仅发送指定『位置』的元素。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .elementAt(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

single

仅发送第一个(或第一个满足条件的)元素。如果序列超过或少于一个元素(或满足条件的元素),那么将会抛出一个错误。

// 会抛出错误。
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

// 不会抛出错误。
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single { $0 == "🐸" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

take

从第一个元素开始截取指定数量的元素发送。

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

takeLast

截取末尾的几个元素进行发送。

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .takeLast(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

takeWhile

和 filter 不同的是,当条件不满足时将截止发送元素。

Observable.of(1, 2, 3, 4, 0, 1)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    
    // 打印:
    1
    2
    3

takeUntil

指定一个序列,当指定序列开始发送元素时,源序列将停止发送元素。

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

// 打印:
next(🐱)
next(🐰)
next(🐶)
completed

skip

跳过开始阶段指定数量的元素发送。

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

skipWhile

跳过开始阶段满足条件的元素,出现了第一不满足条件的元素时再开始发送元素。

Observable.of(1, 2, 3, 4, 5, 6)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

skipWhileWithIndex

和 skipWhile 类似,唯一的不同是闭包中多传入了一个 index。

skipUntil

参考 takeUntil。

Mathematical and Aggregate Operators

作用于整个序列的操作符。

toArray

将一个序列转换成一个数组,再将这个数组作为一个元素进行发送,最后结束。

let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
    .toArray()
    .subscribe { print($0) }
    .disposed(by: disposeBag)
    
// 打印:
next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
completed

reduce

略。只发送计算后的结果作为唯一元素。

concat

用于元素是序列的序列(『嵌套序列』)。在前一个元素序列结束之前,后一个元素序列发送的元素都会被忽略,但会往前 replay 一个元素。

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let variable = Variable(subject1)

variable.asObservable()
    .concat()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

variable.value = subject2

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

// 打印:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

Connectable Operators </h2>

Connectable 序列在 connect() 执行时(而不是 subscribe 执行时) 才会开始发送元素。新的订阅者不会从第一个开始获得发送的元素,而是获得 connect() 发送的当前的元素,实际上真正的订阅只发生一次。

先看一个没有使用 Connectable Operators 的例子:

let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

_ = interval
    .subscribe(onNext: { print("Subscription: 1, Event: \($0)") })

delay(3) {
    _ = interval
        .subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
}

// 打印:
Subscription: 1, Event: 0
Subscription: 1, Event: 1
Subscription: 1, Event: 2

Subscription: 1, Event: 3 // 第三秒
Subscription: 2, Event: 0 // 第二个订阅从 0 开始
Subscription: 1, Event: 4
Subscription: 2, Event: 1
Subscription: 1, Event: 5
Subscription: 2, Event: 2
...

publish

将普通序列转化为 Connectable 序列。

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

delay(2) { _ = intSequence.connect() }

delay(4) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

delay(6) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

// 打印:
Subscription 1:, Event: 0 // 第二秒未执行?

Subscription 1:, Event: 1 // 第四秒
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2

Subscription 1:, Event: 3 // 第六秒
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5

replay

将普通序列转化为 Connectable 序列。新的订阅者开始订阅时,会进行『回忆』以获取当前元素之前的元素。不使用 replay 时,只能收到订阅以后发送的消息。

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .replay(5)

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

delay(2) { _ = intSequence.connect() }

delay(4) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

delay(8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

// 打印:
Subscription 1:, Event: 0

Subscription 2:, Event: 0 // 先执行『回忆』的内容
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4

Subscription 3:, Event: 0 // 先执行『回忆』的内容
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5

multicast

将普通序列转化为 Connectable 序列。发送元素时,会向指定的 subject 广播( subject 会获得其发送的元素)。

let subject = PublishSubject<Int>()

_ = subject
    .subscribe(onNext: { print("Subject: \($0)") })

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .multicast(subject)

_ = intSequence
    .subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })

delay(2) { _ = intSequence.connect() }

delay(4) {
    _ = intSequence
        .subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
}

delay(6) {
    _ = intSequence
        .subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
}

// 打印:
Subject: 0
    Subscription 1:, Event: 0
Subject: 1
    Subscription 1:, Event: 1
    Subscription 2:, Event: 1
Subject: 2
    Subscription 1:, Event: 2
    Subscription 2:, Event: 2
Subject: 3
    Subscription 1:, Event: 3
    Subscription 2:, Event: 3
    Subscription 3:, Event: 3

Error Handling Operators

帮助从错误通知中恢复的操作符。

catchErrorJustReturn

通过返回一个单元素序列(该序列发送完该元素后会终止),把源序列从错误事件中恢复。

let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

// 打印:
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed

catchError

通过切换到一个指定序列,把源序列从错误事件中恢复。

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊")

// 打印:
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)

retry

遇到错误事件重新开始订阅。

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    
    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    
    return Disposables.create()
}

sequenceThatErrors
    .retry()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    
// 打印:
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭

retry(_:)

retry 的升级版,可以限定重试的次数。略。

Debugging Operators </h2>

debug

打印出 subscriptions,events,disposals 等信息。略。

RxSwift.Resources.total

打印资源数量。

其他

参考

driver

我们现在看看下面的例子。

let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
    }

results
    .map { "\($0.count)" }
    .bindTo(resultCount.rx.text)
    .addDisposableTo(disposeBag)

results
    .bindTo(resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

上面程序会有下面几个异常情况:

  • 如果上面fetchAutoCompleteItems出错了,那么他绑定的UI将不再收到任何事件消息
  • 如果上面fetchAutoCompleteItems是在后台某个线程中运行的,那么事件绑定也是发生在后台某个线程,这样更新UI的时候会造成crash
  • 有两次绑定fetchAutoCompleteItems会执行两次

当然针对上面问题我们也有解决方案,我们可以使用神器shareReplay(1)保证不会执行两次,可以使用observeOn()保证后面所有操作在主线程完成。

let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .observeOn(MainScheduler.instance)
            .catchErrorJustReturn([])           
    }
    .shareReplay(1)                             

results
    .map { "\($0.count)" }
    .bindTo(resultCount.rx.text)
    .addDisposableTo(disposeBag)

results
    .bindTo(resultTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

但是你也可以使用Driver:

let results = query.rx.text.asDriver()    //转换成一个Driver序列
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .asDriver(onErrorJustReturn: [])  //当遇见错误需要返回什么
    }    //不需要添加shareReplay(1)

results
    .map { "\($0.count)" }
    .drive(resultCount.rx.text)    //和bingTo()功能一样
    .addDisposableTo(disposeBag) 

results
    .drive(resultTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

总结:drive 方法只能在 Driver 序列中使用,Driver 有以下特点:1 Driver 序列不允许发出 error ,2 Driver 序列的监听只会在主线程中。所以 Driver 是转为UI绑定量身打造的东西。以下情况你可以使用 Driver 替换 BindTo:

  • 不能发出error
  • 在主线程中监听
  • 共享事件流

UIBindingObserver

UIBindingObserver 这个东西很有用的,创建我们自己的监听者,有时候RxCocoa(RxSwiftz中对UIKit的一个扩展库)给的扩展不够我们使用,比如一个UITextField 有个 isEnabled 属性,我想把这个 isEnabled 变为一个observer,我们可以这样做:

extension Reactive where Base: UITextField {
    var inputEnabled: UIBindingObserver<Base, Result> {
        return UIBindingObserver(UIElement: base) { textFiled, result in
            textFiled.isEnabled = result.isValid
        }
    }
}

UIBindingObserver是一个类,他的初始化方法中,有两个参数,第一个参数是一个元素本身,第一个参数是一个闭包,闭包参数是元素本身,还有他的一个属性。

public init(UIElement: UIElementType, binding: @escaping (UIElementType, Value) -> Swift.Void)

自定义了一个inputEnabled Observer里面关联的 UITextField 的 isEnabled 属性。这样就可以使用 bindTo 来为 isEnable 绑定值。

相关文章

网友评论

    本文标题:RxSwift 中所有操作符的使用示例

    本文链接:https://www.haomeiwen.com/subject/pkguextx.html