RxSwift核心之操作符Operator

作者: __Mr_Xie__ | 来源:发表于2019-10-18 10:37 被阅读0次

目录

  • 简介
  • 操作符列表
  • 如何选择操作符?

简介

操作符(Operator)可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。
使用操作符是非常容易的。你可以直接调用实例方法,或者静态方法。

操作符列表

  • amb

当你传入多个 Observablesamb 操作符时,它将取其中一个 Observable:第一个产生事件的那个 Observable,可以是一个 nexterror 或者 completed 事件。 amb 将忽略掉其他的 Observables,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let a = Observable<String>.just("A")
        let b = Observable<String>.just("B")
        let c = Observable<String>.just("C")
        
        Observable<String>.amb([a, b, c]).subscribe { (e) in
            switch e {
            case .next(let element):
                print(element)
            case .error(let error):
                print(error)
            case .completed:
                print("completed")
            }
        }.disposed(by: disposeBag)
    }

打印结果:

A
completed
  • buffer

buffer 方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。

public func buffer(timeSpan: RxSwift.RxTimeInterval, count: Int, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<[Self.Element]>

该方法简单来说就是缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let publicSubject = PublishSubject<String>()
        
        publicSubject.buffer(timeSpan: DispatchTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { (e) in
            switch e {
            case .next(let element):
                print(element)
            case .error(let error):
                print(error)
            case .completed:
                print("completed")
            }
        }.disposed(by: disposeBag)
        
        publicSubject.onNext("A")
        publicSubject.onNext("B")
        publicSubject.onNext("C")
        
        publicSubject.onNext("D")
        publicSubject.onNext("E")
        publicSubject.onNext("F")
        
        publicSubject.onNext("G")
    }

打印结果:

["A", "B", "C"]
["D", "E", "F"]
  • catchError

catchError 会在要发生 onError 事件时将其拦截,并使用备用的序列替代它,再次执行操作,下面给出两段示例代码,如下:

eg1:

enum MyError: Error {
    case errorA
    case errorB
    var errorType: String {
        switch self {
        case .errorA:
            return "I am error A"
        case .errorB:
            return "I am error B"
        }
    }
}

func test() {
        let disposeBag = DisposeBag()
        
        let errorObservable = Observable<Int>.error(MyError.errorA)
        let intObservable = Observable<Int>.just(666)

        errorObservable.catchError({ error in
            print("catch:", error)
            return intObservable
        }).subscribe { event in
            switch event {
            case .next(let element):
                print("element:", element)
            case .error(let error):
                print("error:", error)
            case .completed:
                print("completed")
        }}.disposed(by: disposeBag)
    }

打印结果:

catch: errorA
element: 666
completed

eg2:

enum MyError: Error {
    case errorA
    case errorB
    var errorType: String {
        switch self {
        case .errorA:
            return "I am error A"
        case .errorB:
            return "I am error B"
        }
    }
}

func test() {
        let disposeBag = DisposeBag()

        let sequenceThatFails = PublishSubject<String>()
        let recoverySequence = PublishSubject<String>()
        
        sequenceThatFails.catchError { (error) -> Observable<String> in
            print(error)
            return recoverySequence
        }.subscribe { (e) in
            print(e.element ?? "")
        }.disposed(by: disposeBag)

        sequenceThatFails.onNext("A")
        sequenceThatFails.onNext("B")
        sequenceThatFails.onNext("C")
        
        sequenceThatFails.onError(MyError.errorA)

        recoverySequence.onNext("D")
    }

打印结果:

A
B
C
errorA
D
  • combineLatest

combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经发出过元素),示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let pb1 = PublishSubject<String>()
        let pb2 = PublishSubject<String>()
        
        Observable<String>.combineLatest(pb1, pb2) { (element1, element2) -> String in
            return "\(element1) --- \(element2)"
        }.subscribe { (e) in
            switch e {
                case .next(let element):
                    print("结果:", element)
                case .error(let error):
                    print("error:", error)
                case .completed:
                    print("completed")
            }
        }.disposed(by: disposeBag)
        
        pb1.onNext("1")
        pb2.onNext("A")
        pb1.onNext("2")
        pb1.onNext("3")
        pb2.onNext("B")
    }

打印结果:

结果: 1 --- A
结果: 2 --- A
结果: 3 --- A
结果: 3 --- B

注:如果有一个源 Observable 没有发出,就会继承上次的值,具体见demo 打印结果。

  • concat

concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。

concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let pb1 = PublishSubject<String>()
        let pb2 = PublishSubject<String>()
        
        pb1.concat(pb2).subscribe { (e) in
            switch e {
                case .next(let element):
                    print(element)
                case .error(let error):
                    print("error:", error)
                case .completed:
                    print("completed")
            }
        }.disposed(by: disposeBag)

        pb1.onNext("A")
        pb1.onNext("B")
        
        pb2.onNext("C")
        
        pb1.onCompleted()
        
        pb2.onNext("D")
    }

打印结果:

A
B
D

注:pb1 发出了AB 后,pb2 发出了C,但是这个时候 pb1 调用了 completed 事件,所以 pb2 发出的 C 就会被忽略了,所以只有在 pb1 发出 completed 事件后,pb2 的事件才不会被忽略。

  • concatMap

concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。这个操作符平时很少用到,这里就用官网的例子吧,示例代码如下:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let variable = Variable(subject1)

variable.asObservable()
        .concatMap { $0 }
        .subscribe { print($0) }
        .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

variable.value = subject2

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

打印结果:

next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
  • connect 和 publish

publish 要和 connect 一起使用才能有效果。当一个被观察者被 publish 后,它被订阅也是不能收到消息的,只有它被 connect 操作符连接后,才能收到数据,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let observable = Observable<String>.just("A").publish()
        
        observable.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
        
        // 等2s后开始连接
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            print("开始连接")
            observable.connect().disposed(by: disposeBag)
        }
    }

打印结果:

开始连接
A
  • create

create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(nexterrorcompleted)的产生过程。

通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let observable = Observable<String>.create { (observer) -> Disposable in
            
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        observable.subscribe(onNext: { (e) in
            print(e)
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

A
B
C
onCompleted
  • debounce

在指定的时间内,只接受最新的数据,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let publishSubject = PublishSubject<String>()
        
        publishSubject.debounce(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance).subscribe { (e) in
            print(e)
        }.disposed(by: disposeBag)
        
        publishSubject.onNext("A")
        publishSubject.onNext("B")
        publishSubject.onNext("C")
        publishSubject.onNext("D")
    }

预期打印结果:

next(D)

注:这个 disposeBag 必须是全局的,要不 debounce 没有效果。

  • debug

打印所有的订阅,事件以及销毁信息,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        let observable = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onCompleted()
            return Disposables.create()
        }
        
        observable.debug("打印字符").subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

2019-10-10 18:01:54.294: 打印字符 -> subscribed
2019-10-10 18:01:54.296: 打印字符 -> Event next(A)
A
2019-10-10 18:01:54.297: 打印字符 -> Event next(B)
B
2019-10-10 18:01:54.297: 打印字符 -> Event completed
2019-10-10 18:01:54.297: 打印字符 -> isDisposed
  • deferred

直到订阅发生的时候,才会创建序列,起到一个延迟创建的作用,看似订阅的都是同一个 Observable,其实每次都是订阅了一个新的 Observable,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let observable = Observable<Int>.deferred { () -> Observable<Int> in
            let ob = Observable<Int>.create { (observer) -> Disposable in
                observer.onNext(1)
                observer.onNext(2)
                observer.onCompleted()
                
                return Disposables.create()
            }
            return ob
        }
        
        observable.subscribe(onNext: { (e) in
            print(e)
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

1
2
onCompleted
  • delay

delay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来,示例代码如下:

let disposeBag = DisposeBag()

func test() {
        // 元素延迟2秒才发出
        Observable.of(1, 5, 9).delay(RxTimeInterval.seconds(2), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

1
5
9

注:这个 disposeBag 必须是全局的,要不 delay 没有效果。

  • delaySubscription

delaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作,示例代码如下:

let disposeBag = DisposeBag()

func test() {
        Observable<String>.of("A", "B", "C").delaySubscription(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

A
B
C

注:这个 disposeBag 必须是全局的,要不 delaySubscription 没有效果。

  • dematerialize 和 materialize

materialize是将序列产生的事件,转换成元素。通常,一个有限的 Observable 将产生零个或者多个 onNext 事件,然后产生一个 onCompleted 或者 onError 事件。materialize 操作符将 Observable 产生的这些事件全部转换成元素,然后发送出来。

dematerialize dematerialize 操作符将 materialize 转换后的元素还原。

materialize 使用相关示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable<String>.of("A", "B", "C").materialize().subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

next(A)
next(B)
next(C)
completed

dematerialize 使用相关示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable<String>.of("A", "B", "C").materialize().dematerialize().subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

A
B
C
  • distinctUntilChanged

distinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        Observable.of("A", "B", "C", "C", "C", "D", "C").distinctUntilChanged().subscribe(onNext: { (e) in
            print(e)}
        ).disposed(by: disposeBag)
    }

打印结果:

A
B
C
D
C
  • do

Observable 的某些事件产生时,你可以使用 do 操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable 原本的回调分离,示例代码如下:

func test() {
        Observable<[Int]>.of([1, 2, 3]).do(onNext: { element in
            print("do onNext:" ,element)
        }, onError: { error in
            print("do error:", error)
        }, onCompleted: {
            print("do completed")
        }, onSubscribe: {
            print("do subscribe")
        }, onSubscribed: {
            print("do subscribed")
        }, onDispose: {
            print("do dispose")
        })
        .subscribe(onNext: { (element) in
            print("onNext:", element)
        }, onError: { (error) in
            print("error:", error)
        }, onCompleted: {
            print("completed")
        }) {
            print("dispose")
        }.disposed(by: disposeBag)
    }

打印结果:

do subscribe
do subscribed
do onNext: [1, 2, 3]
onNext: [1, 2, 3]
do completed
completed
dispose
do dispose
  • elementAt

elementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        Observable.of("A", "B", "C", "D", "E", "F").elementAt(3).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

D
  • empty

empty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        let emptyObservable = Observable<Int>.empty()
        emptyObservable.subscribe(onNext: { (element) in
            print("onNext: \(element)")
        }, onError: { (error) in
            print("onError: \(error)")
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

onCompleted
  • error

error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件,示例代码如下:

enum MyError: Error {
    case errorA
    case errorB
    var errorType: String {
        switch self {
        case .errorA:
            return "I am error A"
        case .errorB:
            return "I am error B"
        }
    }
}

func test() {
        let disposeBag = DisposeBag()

        let emptyObservable = Observable<Int>.error(MyError.errorA)
        emptyObservable.subscribe(onNext: { (element) in
            print("onNext: \(element)")
        }, onError: { (error) in
            print("onError: \(error)")
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

onError: errorA
  • filter

filter 操作符将通过你提供的判定方法过滤一个 Observable,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable.of(2, 30, 22, 5, 60, 1).filter({ (e) -> Bool in
            return (e > 10)
        }).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

30
22
60
  • flatMap

flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable<Int>.of(1, 2, 3, 4).flatMap { (e) -> Observable<String> in
            return Observable<String>.just(">> \(e)")
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

>> 1
>> 2
>> 3
>> 4
  • flatMapLatest 和 flatMapFirst

flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let first = BehaviorSubject(value: "A")
        let second = BehaviorSubject(value: "1")
        
        let behaviorRelay = BehaviorRelay<BehaviorSubject>(value: first)
        
        behaviorRelay.asObservable().flatMapLatest { (subject) -> BehaviorSubject<String> in
            return subject
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
        
        first.onNext("B")
        
        behaviorRelay.accept(second)
        
        second.onNext("2")
        
        first.onNext("C")
    }

打印结果:

A
B
1
2

注:flatMapFirst 只会接收最初的 value 事件,感觉没什么用,不过也给出一段示例代码吧。

func test() {
        let disposeBag = DisposeBag()
        
        let first = BehaviorSubject(value: "A")
        let second = BehaviorSubject(value: "1")
        
        let behaviorRelay = BehaviorRelay<BehaviorSubject>(value: first)
        
        behaviorRelay.asObservable().flatMapFirst { (subject) -> BehaviorSubject<String> in
            return subject
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
        
        first.onNext("B")
        
        behaviorRelay.accept(second)
        
        second.onNext("2")
        
        first.onNext("C")
    }

打印结果:

A
B
C
  • from

from 可以将其他类型或者数据结构转换为 Observable,示例代码如下:

1、将一个数组转换为 Observable

func test() {
        let disposeBag = DisposeBag()
        
        let numbers = Observable.from([0, 1, 2])
        
        numbers.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

0
1
2

它相当于:

func test() {
        let disposeBag = DisposeBag()

        let numbers = Observable<Int>.create { observer in
            observer.onNext(0)
            observer.onNext(1)
            observer.onNext(2)
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        numbers.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

2、将一个可选值转换为 Observable

func test() {
        let disposeBag = DisposeBag()
        
        let optional: Int? = 1
        
        let value = Observable.from(optional: optional)
        
        value.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

1

它相当于:

func test() {
        let disposeBag = DisposeBag()
        
        let optional: Int? = 1
        
        let value = Observable<Int>.create { observer in
            if let element = optional {
                observer.onNext(element)
            }
            observer.onCompleted()
            return Disposables.create()
        }
        
        value.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }
  • groupBy

groupBy 操作符将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。

它会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        // 将奇数偶数分成两组
        Observable<Int>.of(0, 1, 2, 3, 4, 5).groupBy(keySelector: { (element) -> String in
                return (element % 2 == 0 ? "偶数" : "基数")
        }).subscribe { (event) in
            switch event {
                case .next(let group):
                    group.asObservable().subscribe({ (event) in
                        print("key:\(group.key)    event:\(event)")
                    })
                    .disposed(by: disposeBag)
                default:
                    print("")
            }
        }.disposed(by: disposeBag)
    }

打印结果:

key:偶数    event:next(0)
key:基数    event:next(1)
key:偶数    event:next(2)
key:基数    event:next(3)
key:偶数    event:next(4)
key:基数    event:next(5)
key:偶数    event:completed
key:基数    event:completed
  • ignoreElements

ignoreElements 忽略掉所有的元素,只发出 errorcompleted 事件,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable<Int>.of(1, 2, 3, 4, 5, 6).ignoreElements().subscribe { (e) in
            print(e)
        }.disposed(by: disposeBag)
    }

打印结果:

completed
  • interval

interval 操作符将创建一个 Observable,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素,示例代码如下:

let disposeBag = DisposeBag()

func test() {
        let intervalObservable = Observable<Int>.interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance)
        intervalObservable.subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    }

打印结果:

next(0)
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
....

注:这个 disposeBag 必须是全局的,要不 interval 没有效果。

  • just

创建一个 Observable,只包含一个元素,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let justObservable = Observable.just("100")
        
        justObservable.subscribe({ (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

next(100)
completed
  • map

map 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let observable = Observable<Int>.of(1, 32, 66, 88, 100)
        
        observable.map { (e) -> String in
            return "== \(e) =="
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

== 1 ==
== 32 ==
== 66 ==
== 88 ==
== 100 ==
  • merge

通过使用 merge 操作符可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
                 
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        Observable.of(subject1, subject2).merge().subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
         
        subject1.onNext(20)
        subject1.onNext(40)
        subject2.onNext(1)
        subject1.onNext(80)
        subject1.onNext(100)
    }

打印结果:

20
40
1
80
100
  • never

never 操作符将创建一个 Observable,这个 Observable 不会产生任何事件。

创建一个不会产生任何事件的 Observable

let id = Observable<Int>.never()

它相当于:

let id = Observable<Int>.create { observer in
    return Disposables.create()
}
  • observeOn 和 subscribeOn
    ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 observeOn 操作符,来指示 Observable 在哪个 Scheduler 发出通知。

注意⚠️:一旦产生了 onError 事件, observeOn 操作符将立即转发。他不会等待 onError 之前的事件全部被收到。这意味着 onError 事件可能会跳过一些元素提前发送出去。

subscribeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出执行。

默认情况下,Observable 创建,应用操作符以及发出通知都会在 Subscribe 方法调用的 Scheduler 执行。subscribeOn 操作符将改变这种行为,它会指定一个不同的 Scheduler 来让 Observable 执行,observeOn 操作符将指定一个不同的 Scheduler 来让 Observable 通知观察者。
以下是 subscribeOnobserveOn 简单例子,示例代码如下:
eg1:

func test() {
        Observable<String>.create { (observer) -> Disposable in
            observer.onNext("A")
            sleep(1)
            observer.onNext("B")

            return Disposables.create()
        }.subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.background)).subscribe(onNext: { (e) in
            print("\(e) -- 是否在主线程:\(Thread.isMainThread)")
        }).disposed(by: disposeBag)
    }

打印结果:

A -- 是否在主线程:false
B -- 是否在主线程:false

eg2: 结合上面示例代码和以下的示例代码,自己去体会 subscribeOnobserveOn 用法。

func test() {
        Observable<String>.create { (observer) -> Disposable in
            observer.onNext("A")
            sleep(1)
            observer.onNext("B")

            return Disposables.create()
        }.subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.background)).observeOn(MainScheduler.instance).subscribe(onNext: { (e) in
            print("\(e) -- 是否在主线程:\(Thread.isMainThread)")
        }).disposed(by: disposeBag)
    }

打印结果:

A -- 是否在主线程:true
B -- 是否在主线程:true
  • reduce

reduce 操作符将对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果,以下是方法及注释:

/**
  - parameter seed: 累加器的初始值
  - parameter accumulator: 累加器闭包
  - returns: 结合累加器的初始值,返回所有元素的累加值
*/
public func reduce<A>(_ seed: A, accumulator: @escaping (A, Self.Element) throws -> A) -> RxSwift.Observable<A>

示例代码如下:

func test() {
        Observable<Int>.from([1, 2, 3]).reduce(0) { (seed, e) -> Int in
            return (e + seed)
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

6
  • refCount

refCount 操作符将自动连接和断开可被连接的 Observable。它将可被连接的 Observable 转换为普通 Observable。当第一个观察者对它订阅时,那么底层的 Observable 将被连接。当最后一个观察者离开时,那么底层的 Observable 将被断开连接,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        let observable = Observable<String>.just("A").publish()
        
        observable.refCount().subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

A

注:请结合操作符 connectpublish 来理解示例代码。

  • repeatElement

repeatElement 操作符将创建一个 Observable,这个 Observable 将无止尽的发出同一个元素,示例代码如下:

let id = Observable.repeatElement(0)

它相当于:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    ... // 无数次
    return Disposables.create()
}
  • replay

replay 操作符将 Observable 转换为可被连接的 Observable,并且这个可被连接的 Observable 将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。

func test() {
        let intSequence = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
            .replay(5)
        
        print("====== subscribe 1 ======")
        _ = intSequence.subscribe(onNext: { (e) in
            print("Subscription 1:, Event: \(e)")
        })

        DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
            print("====== connect ======")
            _ = intSequence.connect()
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
            print("====== subscribe 2 ======")
            _ = intSequence.subscribe(onNext: { (e) in
                print("Subscription 2:, Event: \(e)")
            })
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 10) {
            print("====== subscribe 3 ======")
            _ = intSequence.subscribe(onNext: { (e) in
                print("Subscription 3:, Event: \(e)")
            })
        }
    }

打印结果:

====== subscribe 1 ======
====== connect ======
Subscription 1:, Event: 0
Subscription 1:, Event: 1
====== subscribe 2 ======
Subscription 2:, Event: 0
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
====== subscribe 3 ======
Subscription 3:, Event: 0
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
Subscription 1:, Event: 7
Subscription 2:, Event: 7
Subscription 3:, Event: 7
....
  • retry

retry 操作符将不会将 error 事件,传递给观察者,然而,它会从新订阅源 Observable,给这个 Observable 一个重试的机会,让它有机会不产生 error 事件。retry 总是对观察者发出 next 事件,即便源序列产生了一个 error 事件,所以这样可能会产生重复的元素,示例代码如下:

enum MyError: Error {
    case errorA
    case errorB
    var errorType: String {
        switch self {
        case .errorA:
            return "I am error A"
        case .errorB:
            return "I am error B"
        }
    }
}

func test() {
        let disposeBag = DisposeBag()
        var count = 1

        let sequenceThatErrors = Observable<String>.create { (observer) in
            observer.onNext("1")
            observer.onNext("2")
            observer.onNext("3")

            if count == 1 {
                observer.onError(MyError.errorA)
                print("Error encountered")
                count += 1
            }

            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            observer.onCompleted()

            return Disposables.create()
        }
        
        sequenceThatErrors.retry().subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

1
2
3
Error encountered
1
2
3
A
B
C
  • sample

sample 操作符将不定期的对源 Observable 进行取样操作。通过第二个 Observable 来控制取样时机。一旦第二个 Observable 发出一个元素,就从源 Observable 中取出最后产生的元素。这个操作符很少用到,这里就不举例了。

  • scan

scan 操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。

这种操作符在其他地方有时候被称作是 accumulator,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable.of(10, 100, 1000).scan(1) { (seed, e) -> Int in
            return seed + e
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

11
111
1111
  • shareReplay

shareReplay 操作符将使得观察者共享源 Observable,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者。

注:从框架源码中可以看出,shareReplayreplayrefCount 的组合,具体自己去 replayrefCount

  • single

single 操作符将限制 Observable 只产生一个元素。如果 Observable 只有一个元素,它将镜像这个 Observable。如果 Observable 没有元素或者元素数量大于一,它将产生一个 error 事件,示例代码如下:

func test() {
        let disposeBag = DisposeBag()
        
        Observable<Int>.of(1, 2, 3).single().subscribe(onNext: { (e) in
            print("A == ", e)
        }, onError: { (error) in
            print("error:", error)
        }).disposed(by: disposeBag)
        
        Observable<Int>.of(1, 2, 3).single { (e) -> Bool in
            return (e == 3)
        }.subscribe(onNext: { (e) in
            print("B == ", e)
        }).disposed(by: disposeBag)
    }

打印结果:

A ==  1
error: Sequence contains more than one element.
B ==  3
  • skip、skipUntil 和 skipWhile

skip 操作符可以让你跳过 Observable 中头 n 个元素,只关注后面的元素,示例代码如下:

skipUntil 操作符可以让你忽略源 Observable 中头几个元素,直到另一个 Observable 发出一个元素后,它才镜像源 Observable

skipWhile 操作符可以让你忽略源 Observable 中头几个元素,直到元素的判定为否后,它才镜像源 Observable

注:三个操作符用法类似,这里就举一个 skip 的例子供参考,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        Observable.of("A", "B", "C", "D", "E", "F").skip(2).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

C
D
E
F
  • startWith

startWith 操作符会在 Observable 头部插入一些元素,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        Observable.of("A", "B", "C", "D").startWith("1").startWith("2").startWith("3", "a", "b").subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

3
a
b
2
1
A
B
C
D
  • take、takeLast、takeUntil 和 takeWhile

take:通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。

takeLast:通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略掉前面的元素。

takeUntil 操作符将镜像源 Observable,它同时观测第二个 Observable。一旦第二个 Observable 发出一个元素或者产生一个终止事件,那个镜像的 Observable 将立即终止。

takeWhile 操作符将镜像源 Observable 直到某个元素的判定为 false。此时,这个镜像的 Observable 将立即终止。

注:四个操作符用法类似,这里就举一个 take 的例子供参考,示例代码如下:

func test() {
        let disposeBag = DisposeBag()

        Observable.of("A", "B", "C", "D", "E", "F").take(3).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)
    }

打印结果:

A
B
C
  • timeout

如果 Observable 在一段时间内没有产生元素,timeout 操作符将使它发出一个 error 事件。

let disposeBag = DisposeBag()

func test() {
        Observable<String>.create { (observer) -> Disposable in
            return Disposables.create()
        }.timeout(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
            print("onNext:", e)
        }, onError: { (error) in
            print("onError:", error)
        }).disposed(by: disposeBag)
    }

打印结果:

onError: Sequence timeout.

注:等了 3s 之后,发出一个 error 事件,这时你就看到了打印结果。

  • timer
    timer 操作符将创建一个 Observable,它在经过设定的一段时间后,产生唯一的一个元素,示例代码如下:
let disposeBag = DisposeBag()

func test() {
        Observable<Int>.timer(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
            print(e)
        }, onError: { (error) in
            print(error)
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

0
onCompleted

注:这里存在其他版本的 timer 操作符,可以实现:创建一个 Observable 在一段延时后,每隔一段时间产生一个元素。方法如下:

/**
    - parameter dueTime: 初始延时
    - parameter period: 时间间隔
    - parameter scheduler: 运行在哪个线程
         */
public static func timer(_ dueTime: RxSwift.RxTimeInterval, period: RxSwift.RxTimeInterval? = nil, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.Element>

示例代码如下:

let disposeBag = DisposeBag()

func test() {
        Observable<Int>.timer(RxTimeInterval.seconds(2), period: RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
            print(e)
        }, onError: { (error) in
            print(error)
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
    }

打印结果:

0
1
2
3
4
5
6
....
  • using

通过使用 using 操作符创建 Observable 时,同时创建一个可被清除的资源,一旦 Observable 终止了,那么这个资源就会被清除掉了。

func test() {
        // 一个无限序列(每隔1秒创建一个序列数 )
        let infiniteInterval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).do(onNext: { (e) in
                print("infinite : \(e)")
            }, onSubscribe: {
                print("开始订阅 infinite")
            }, onDispose: {
                print("销毁 infinite")
            })
         
        // 一个有限序列(每隔2秒创建一个序列数,共创建三个 )
        let limited = Observable<Int>
            .interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance).take(2).do(onNext: { (e) in
            print("limited: \(e)")
        }, onSubscribe: {
            print("开始订阅 limited")
        }, onDispose: {
            print("销毁 limited")
        })
         
        //使用using操作符创建序列
        let o: Observable<Int> = Observable.using({ () -> AnyDisposable in
            return AnyDisposable(infiniteInterval.subscribe())
        }) { (disposable) -> Observable<Int> in
            return limited
        }
        
        o.subscribe()
    }

class AnyDisposable: Disposable {
    let _dispose: () -> Void
    init(_ disposable: Disposable) {
        _dispose = disposable.dispose
    }
    
    func dispose() {
        _dispose()
    }
}

打印结果如下:

开始订阅 infinite
开始订阅 limited
infinite : 0
infinite : 1
limited: 0
infinite : 2
infinite : 3
limited: 1
销毁 limited
销毁 infinite
  • window

window 操作符和 buffer 十分相似,buffer 周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。

buffer 要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。

func test() {
        let disposeBag = DisposeBag()
        
        let publicSubject = PublishSubject<String>()
        
        publicSubject.window(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe(onNext: { (observable) in
            _ = observable.subscribe(onNext: { (e) in
                print("onNext: ", e)
            })
        }, onError: { (error) in
            print("onError:", error)
        }, onCompleted: {
            print("onCompleted")
        }).disposed(by: disposeBag)
        
        publicSubject.onNext("A")
        publicSubject.onNext("B")
        publicSubject.onNext("C")
        
        publicSubject.onNext("D")
        publicSubject.onNext("E")
        publicSubject.onNext("F")
        
        publicSubject.onNext("G")
    }

打印结果如下:

onNext:  A
onNext:  B
onNext:  C
onNext:  D
onNext:  E
onNext:  F
onNext:  G
  • withLatestFrom

eg1:

func test() {
        let disposeBag = DisposeBag()
        let firstSubject = PublishSubject<String>()
        let secondSubject = PublishSubject<String>()
        
        firstSubject.withLatestFrom(secondSubject).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)

        firstSubject.onNext("A")
        firstSubject.onNext("B")
        secondSubject.onNext("1")
        secondSubject.onNext("2")
        firstSubject.onNext("C")
    }

打印结果如下:

2

eg2:

func test() {
        let disposeBag = DisposeBag()
        let firstSubject = PublishSubject<String>()
        let secondSubject = PublishSubject<String>()
        
        firstSubject.withLatestFrom(secondSubject, resultSelector: { (first, second) -> String in
            return "\(first) -- \(second)"
        }).subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)

        firstSubject.onNext("A")
        firstSubject.onNext("B")
        secondSubject.onNext("1")
        secondSubject.onNext("2")
        firstSubject.onNext("C")
    }

打印结果如下:

C -- 2
  • zip

zip 操作符将多个(最多不超过 8 个) Observables 的元素通过一个函数组合起来,然后将这个组合的结果发出来。它会严格的按照序列的索引数进行组合。例如,返回的 Observable 的第一个元素,是由每一个源 Observables 的第一个元素组合出来的。它的第二个元素 ,是由每一个源 Observables 的第二个元素组合出来的。它的第三个元素 ,是由每一个源 Observables 的第三个元素组合出来的,以此类推。它的元素数量等于源 Observables 中元素数量最少的那个。

func test() {
        let disposeBag = DisposeBag()
        
        let first = PublishSubject<String>()
        let second = PublishSubject<String>()
        
        Observable.zip(first, second) { (e1, e2) -> String in
            return "\(e1) -- \(e2)"
        }.subscribe(onNext: { (e) in
            print(e)
        }).disposed(by: disposeBag)

        first.onNext("1")
        second.onNext("A")
        first.onNext("2")
        second.onNext("B")
        second.onNext("C")
        second.onNext("D")
        first.onNext("3")
        first.onNext("4")
    }

打印结果如下:

1 -- A
2 -- B
3 -- C
4 -- D

如何选择操作符?

我想要创建一个 Observable

  • 产生特定的一个元素:just
    ◦ 经过一段延时:timer
  • 从一个序列拉取元素:from
  • 重复的产生某一个元素:repeatElement
  • 存在自定义逻辑:create
  • 每次订阅时产生:deferred
  • 每隔一段时间,发出一个元素:interval
    ◦ 在一段延时后:timer
  • 一个空序列,只有一个完成事件:empty
  • 一个任何事件都没有产生的序列:never

我想要创建一个 Observable 通过组合其他的 Observables

  • 任意一个 Observable 产生了元素,就发出这个元素:merge
  • 让这些 Observables 一个接一个的发出元素,当上一个 Observable 元素发送完毕后,下一个 Observable 才能开始发出元素:concat
  • 组合多个 Observables 的元素
    ◦ 当每一个 Observable 都发出一个新的元素:zip
    ◦ 当任意一个 Observable 发出一个新的元素:combineLatest

我想要转换 Observable 的元素后,再将它们发出来

  • 对每个元素直接转换:map
  • 转换到另一个 ObservableflatMap
    ◦ 只接收最新的元素转换的 Observable 所产生的元素:flatMapLatest
    ◦ 每一个元素转换的 Observable 按顺序产生元素:concatMap
  • 基于所有遍历过的元素: scan

我想要将产生的每一个元素,拖延一段时间后再发出:delay

我想要将产生的事件封装成元素发送出来

  • 将他们封装成 Event<Element>:materialize
    ◦ 然后解封出来:dematerialize

我想要忽略掉所有的 next 事件,只接收 completederror 事件:ignoreElements

我想创建一个新的 Observable 在原有的序列前面加入一些元素:startWith

我想从 Observable 中收集元素,缓存这些元素之后在发出:buffer

我想将 Observable 拆分成多个 Observableswindow

  • 基于元素的共同特征:groupBy

我想只接收 Observable 中特定的元素

  • 发出唯一的元素:single

我想重新从 Observable 中发出某些元素

  • 通过判定条件过滤出一些元素:filter
  • 仅仅发出头几个元素:take
  • 仅仅发出尾部的几个元素:takeLast
  • 仅仅发出第n 个元素:elementAt
  • 跳过头几个元素
    ◦ 跳过头 n 个元素:skip
    ◦ 跳过头几个满足判定的元素:skipWhileskipWhileWithIndex
    ◦ 跳过某段时间内产生的头几个元素:skip
    ◦ 跳过头几个元素直到另一个 Observable 发出一个元素:skipUntil
  • 只取头几个元素
    ◦ 只取头几个满足判定的元素:takeWhiletakeWhileWithIndex
    ◦ 只取某段时间内产生的头几个元素:take
    ◦ 只取头几个元素直到另一个 Observable 发出一个元素:takeUntil
  • 周期性的对 Observable 抽样:sample
  • 发出那些元素,这些元素产生后的特定的时间内,没有新的元素产生:debounce
  • 直到元素的值发生变化,才发出新的元素:distinctUntilChanged
    ◦ 并提供元素是否相等的判定函数:distinctUntilChanged
  • 在开始发出元素时,延时后进行订阅:delaySubscription

我想要从一些 Observables 中,只取第一个产生元素的 Observableamb

我想评估 Observable 的全部元素

  • 并且对每个元素应用聚合方法,待所有元素都应用聚合方法后,发出结果:reduce
  • 并且对每个元素应用聚合方法,每次应用聚合方法后,发出结果:scan

我想把 Observable 转换为其他的数据结构:as...

我想在某个 Scheduler 应用操作符:subscribeOn

  • 在某个 Scheduler 监听:observeOn

我想要 Observable 发生某个事件时, 采取某个行动:do

我想要 Observable 发出一个 error 事件:error

  • 如果规定时间内没有产生元素:timeout

我想要 Observable 发生错误时,优雅的恢复

  • 如果规定时间内没有产生元素,就切换到备选 Observabletimeout
  • 如果产生错误,将错误替换成某个元素 :catchErrorJustReturn
  • 如果产生错误,就切换到备选 ObservablecatchError
  • 如果产生错误,就重试 :retry

我创建一个 Disposable 资源,使它与 Observable 具有相同的寿命:using

我创建一个 Observable,直到我通知它可以产生元素后,才能产生元素:publish

  • 并且,就算是在产生元素后订阅,也要发出全部元素:replay
  • 并且,一旦所有观察者取消观察,他就被释放掉:refCount
  • 通知它可以产生元素了:connect

Author

如果你有什么建议,可以关注我,直接留言,留言必回。

相关文章

网友评论

    本文标题:RxSwift核心之操作符Operator

    本文链接:https://www.haomeiwen.com/subject/knbgpctx.html