RxSwift常用基础知识

作者: MrDarren | 来源:发表于2022-03-14 17:13 被阅读0次

    目的

    gitHub地址: https://github.com/ReactiveX/RxSwift

    RxSwift的目的是让数据/事件流和异步任务能够更方便的序列化处理, 能够使用Swift进行响应式编程。

    RxSwift做了什么

    RxSwift把我们程序中每一个操作都看成一个事件,比如一个TextField中的文本改变,一个按钮被点击,或者一个网络请求结束等,每一个事件源就可以看成一个管道,也就是sequence,比如TextField,当我们改变里面的文本的时候,这个TextField就会不断的发出事件,从他的这个sequence中不断的流出,我们只需要监听这个sequence,每流出一个事件就做相应的处理。同理,Button也是一个sequence,每点击一次就流出一个事件。

    RxSwift的核心思想是 Observable

    sequence,observable表示可监听或者可观察,也就是说RxSwift的核心思想是可监听的序列。并且Observable sequence可以接受异步信号,也就是说,信号是可以异步给监听者的,Observable(ObservableType) 和 SequenceType类似,ObservableType.subscribe 和 SequenceType.generate类似,由于RxSwift支持异步获得信号,所以用ObservableType.subscribe,这和indexGenerator.next()类似其中SequenceType是Swift(2.3以前版本,之后的版本没有该协议)中的一个协议,比如Swift中的Array就遵循这个协议,通过这个协议,你可以这样的去操作一个Array

    let array = [1, 2, 3, 4, 5]
    
    let array2 = array.filter({$0 > 1}).map({$0 * 2})//4 6 8 10
    
    var indexGenerator = array2.generate()
    
    let fisrt = indexGenerator.next() // 4
    
    let seoncd = indexGenerator.next() //6
    

    RxSwift中,ObservableType.subscribe的回调(新的信号到来)一共有三个

    enum Event<Element> {
        
        case Next(Element) // 新的信号到来
        
        case Error(ErrorType) // 信号发生错误,序列不会再产生信号
        
        case Completed // 序列发送信号完成,不会再产生新的信号
        
    }
    

    Observable分为两种

    1 在有限的时间内会自动结束(Completed/Error),比如一个网络请求当作一个序列,当网络请求完成的时候,Observable自动结束,资源会被释放。
    2 信号不会自己结束,最简单的比如一个Timer,每隔一段时间发送一个新的信号过来,这时候需要手动取消监听,来释放相应的资源,比如一个label.rx.text是一个Obserable,通常需要这样调用addDisposableTo(disposeBag)来让其在deinit,也就是所有者要释放的时候,自动取消监听。

    class Observable<Element> {
        func subscribe(observer: Observer<Element>) -> Disposable //调用Disposable的方法来取消
    }
    

    当然,除了手动释放,RxSwift提供了一些操作符,比如 takeUntil来根据条件取消

    sequence .takeUntil(self.rx__deallocated) //当对象要释放的时候,取消监听 .subscribe { print($0) }
    

    Operator运算符相关

    never

    never就是创建一个sequence,但是不发出任何事件信号。

    Observable<String>.never().subscribe { _ in
         print("This will never be printed")
    }.disposed(by: disposeBag)
    
    empty

    empty就是创建一个空的sequence,只能发出一个complected事件。

    Observable<Int>.empty().subscribe { event in
         let text = "operator: empty__\(event)"
         self.showText(text: text)
         print(text)
    }.disposed(by: disposeBag)
    
    create

    我们也可以自定义可观察的sequence,那就是使用create。
    create操作符传入一个观察者observer,然后调用observer的onNext,onCompleted和onError方法。返回一个可观察的obserable序列。
    无参创建create

    Observable<Any>.create { (observal:AnyObserver<Any>) -> Disposable in
        observal.onNext("abc")
        observal.onNext("12")
        observal.onCompleted()
        return Disposables.create()
    }.subscribe(onNext: { str in
        let text = "operator: create__\(str)"
        self.showText(text: text)
        print(text)
    }).disposed(by: disposeBag)
    

    添加参数创建create

    func createObservable(element:String) -> Observable<String> {
        return Observable.create { (observal:AnyObserver<String>) -> Disposable in
                    observal.onNext(element)
                    observal.onCompleted()
               return Disposables.create()
       }
    }
    
    createObservable(element: "element").subscribe(onNext: { str in
           let text = "operator: create with element__\(str)"
           self.showText(text: text)
           print(text)
    }).disposed(by: disposeBag)
    
    deferred

    延时创建Observable对象,当subscribe的时候才去创建,它为每一个Observer创建一个新的Observable,也就是说每个订阅者订阅的对象都是内容相同但是完全独立的序列。
    deferr采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现。

    let defObservable = Observable<String>.deferred { () -> Observable<String> in
           return Observable.create { observer -> Disposable in
                       observer.onNext("defObservable create")
                       observer.onCompleted()
                  return Disposables.create()
           }
    }
    defObservable.subscribe { event in
           let text = "operator: deferred one__\(event)"
           self.showText(text: text)
           print(text)
    }.disposed(by: disposeBag)
    defObservable.subscribe { event in
           let text = "operator: deferred two__\(event)"
           self.showText(text: text)
           print(text)
    }.disposed(by: disposeBag)
    
    /// deferred 延迟创建的例子
    var value1:String?
    let observable1 = Observable<String>.from(optional: value1)
    value1 = "Darren1"
    observable1.subscribe { event in
                           
         /// 只打印出 completed
         /// 并没有像我们想象中的那样也会打印出 onNext事件,这个是为什么呢?因为在我们订阅的时候,数据未必已经初始化完成
         let text = "operator: deferred__\(event)"
         self.showText(text: text)
         print(text)
    }.disposed(by: disposeBag)
    
    /// 把这个例子使用defer重新测试一下
    var value2:String?
    let observable2 = Observable<String>.deferred { () -> Observable<String> in
          return Observable<String>.from(optional: value2)
    }
    value2 = "Darren2"
    observable2.subscribe { event in
         let text = "operator: deferred__\(event)"
         self.showText(text: text)
                           
         /// next(Darren2) completed
         print(text)
    }.disposed(by: disposeBag)
    
    of/from

    这两个方法都是把一个对象或者数据转换为可观察序列,这在使用Swift中的SequenceType时很有用。

    Observable<String>.of("hello", "RxSwift").subscribe { event in
         let text = "operator: of__\(event)"
         self.showText(text: text)
                                                         
         /// next(hello) next(RxSwift) completed
         print(text)
    }.disposed(by: disposeBag)
    Observable<String>.from(["hello", "RxSwift"]).subscribe { event in
         let text = "operator: from__\(event)"
         self.showText(text: text)
                                                             
         /// next(hello) next(RxSwift) completed
         print(text)
    }.disposed(by: disposeBag)
    
    just

    将一个对象或者一个Sequence转换为 一个可观察序列,请注意这里与From是完全不相同的:from是转换为一个或者多个可观察序列(这取决于你是要将一个还是一个序列进行转换)。也就是说just只能包含一个观察序列,请注意与上面例子结果进行对比。

    Observable<Array<String>>.just(["hello", "RxSwift"]).subscribe { event in
         let text = "operator: just__\(event)"
         self.showText(text: text)
                                                                    
         /// next(["hello", "RxSwift"]) completed
         print(text)
    }.disposed(by: disposeBag)
    
    range

    给定范围,依次显示,就是创建一个sequence,他会发出这个范围中的从开始到结束的所有事件,Observable必须指定数据类型。

    Observable<Int>.range(start: 1, count: 4).subscribe(onNext: { value in
         let text = "operator: range__\(value)"
         self.showText(text: text)
         print(text)
    }).disposed(by: disposeBag)
    
    interval

    创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)。

    Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
         let text = "operator: interval__\(event)"
         self.showText(text: text)
                                                                                                              
         /// operator: interval__next(0)
         /// operator: interval__next(1)
         /// operator: interval__next(2)
         /// operator: interval__completed
         print(text)
    }.disposed(by: disposeBag)
    
    timer

    在指定的时间后,发送一个特定的item (E -> Int/NSInteger),请注意这里与interval的区别(interval是发送一系列特定item,而timer只会发送一个)。

    Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
         let text = "operator: timer__\(event)"
         self.showText(text: text)
                                                                                                   
         /// operator: timer__next(0)
         /// operator: timer__completed
         print(text)
    }.disposed(by: disposeBag)
    
    repeatElement

    创建一个sequence,发出特定的事件n次。

    /// 如果没有.take(3), 就会一直执行下去
    Observable.repeatElement("darren").take(3).subscribe(onNext: { value in
         let text = "operator: repeat__\(value)"
         self.showText(text: text)
         print(text)
    }).disposed(by: disposeBag)
    
    generate

    类似于for循环,创建一个可观察sequence,当初始化的条件为true的时候,他就会发出所对应的事件。

    Observable.generate(initialState: 0) {
        $0 < 5
    } iterate: {
        $0 + 2
    }.subscribe(onNext: { value in
         let text = "operator: generate__\(value)"
         self.showText(text: text)
         print(text)
    }).disposed(by: disposeBag)
    
    error

    发出错误信号,创建一个可观察序列,但不发出任何正常的事件,只发出error事件并结束。

    let error = NSError(domain: "error", code: 10, userInfo: ["This is error" : "xxxxxx"]) as Error
    Observable<Any>.error(error).subscribe(onNext: { value in
         let text = "operator: error__\(value)"
         self.showText(text: text)
         print(text)
    }).disposed(by: disposeBag)
    

    Transform 变换相关

    buffer

    定期的将需要发射的items随机到一个buffer的包中,分批次的发射这些包,而不是一次发射一个item:例如你有[1, 2, 3, 4] ,你可以一次发射一个,也可以一次发射两个item或者三个。

    /// 一次发射1个Item事件
    Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 1, scheduler: MainScheduler.instance).subscribe { event in
         let text = "transform: buffer__\(event)"
         self.showText(text: text)
                                         
         /// transform: buffer__next([1])
         /// transform: buffer__next([2])
         /// transform: buffer__next([3])
         /// transform: buffer__next([4])
         /// transform: buffer__next([])
         /// transform: buffer__completed
         print(text)
    }.disposed(by: disposeBag)
    
    /// 一次发射3个Item事件
    Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
         let text = "transform: buffer__\(event)"
         self.showText(text: text)
         
         /// transform: buffer__next([1, 2, 3])
         /// transform: buffer__next([4])
         /// transform: buffer__completed
         print(text)
    }.disposed(by: disposeBag)
    
    window

    与buffer类似,但是每次发射的不是item,而是Observables序列(请注意与buffer的结果比较)。

    Observable<Int>.of(1, 2, 3, 4).window(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
         let text = "transform: window__\(event)"
         self.showText(text: text)
                                                                                                                                       
         /// transform: window__next(RxSwift.AddRef<Swift.Int>)
         /// transform: window__next(RxSwift.AddRef<Swift.Int>)
         /// transform: window__completed
         print(text)
    }.disposed(by: disposeBag)       
    
    flatMap

    将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
    这个方法是很有用的,例如当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

    /// 我需要在每一个Item后跟一个新的Item叫做RxSwift
    Observable<Int>.of(0, 1, 2).flatMap { (element:Int) -> Observable<String> in
         return Observable<String>.of("\(element)", "RxSwift")
    }.subscribe { event in
         let text = "transform: flatMap__\(event)"
         self.showText(text: text)
                                                     
         /// transform: flatMap__next(0)
         /// transform: flatMap__next(RxSwift)
         /// transform: flatMap__next(1)
         /// transform: flatMap__next(RxSwift)
         /// transform: flatMap__next(2)
         /// transform: flatMap__next(RxSwift)
         /// transform: flatMap__completed
         print(text)
    }.disposed(by: disposeBag)
    
    groupBy

    将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。

    /// 我需要将奇数偶数分成两组
    Observable<Int>.of(0, 1, 2, 3, 4, 5).groupBy { (element) -> String in
         return element % 2 == 0 ? "偶数" : "奇数"
    }.subscribe { event in
         switch event {
                case .next(let group):
                     group.asObservable().subscribe { event in
                            let text = "transform: groupBy__key:  \(group.key)  \(event)"
                            self.showText(text: text)
                                                                                                  
                            /// transform: groupBy__ key:  偶数  next(0)
                            /// transform: groupBy__ key:  奇数  next(1)
                            /// transform: groupBy__ key:  偶数  next(2)
                            /// transform: groupBy__ key:  奇数  next(3)
                            /// transform: groupBy__ key:  偶数  next(4)
                            /// transform: groupBy__ key:  奇数  next(5)
                            /// transform: groupBy__ key:  偶数  completed
                            /// transform: groupBy__ key:  奇数  completed
                            print(text)
                     }.disposed(by: self.disposeBag)
                                                                  
                default:break
         }
    }.disposed(by: disposeBag)
    
    map

    通过一个闭包将原来的序列转换为一个新序列的操作。

    Observable<Int>.of(1, 2, 3).map { return "hello " + "\($0)" }.subscribe { event in
         let text = "transform: map__\(event)"
         self.showText(text: text)
                                                                             
         /// transform: map__next(hello 1)
         /// transform: map__next(hello 2)
         /// transform: map__next(hello 3)
         /// transform: map__completed
         print(text)
    }.disposed(by: disposeBag)
    
    scan

    从字面意思可以看出是扫描,也就是说该方法会给出一个初始值(seed),每次通过一个函数将上一次的结果与序列中的Item进行处理,每处理完成都会发射.next事件。

    Observable<String>.of("Rx", "Swift").scan("hello ") { acum, element in
         return acum + element
    }.subscribe { event in
         let text = "transform: scan__\(event)"
         self.showText(text: text)
                                                                     
         /// transform: scan__next(hello Rx)
         /// transform: scan__next(hello RxSwift)
         /// transform: scan__completed
         print(text)
    }.disposed(by: disposeBag)
    
    reduce

    与上述scan类似,都是初始一个seed,每次通过函数将上一次的结果与序列中的item进行处理,但是唯一不同的一点是,只会在最后发射一次.next事件,将其拿来作数学计算很有用,这个我们将会在后面讲到 (请注意与上述scan的结果比较)。

    Observable<String>.of("Rx", "Swift").reduce("hello ") { acum, element in
         return acum + element
    }.subscribe { event in
        let text = "transform: reduce__\(event)"
        self.showText(text: text)
                                                                       
        /// transform: reduce__next(hello RxSwift)
        /// transform: reduce__completed
        print(text)
    }.disposed(by: disposeBag)
    

    Filter过滤器相关

    debounce

    在规定的时间内过滤item。

    Observable<Int>.of(1, 2, 3, 4).debounce(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
         let text = "filter: debounce__\(event)"
         self.showText(text: text)
                                                                                                                     
         /// filter: debounce__next(4)
         /// filter: debounce__completed
         print(text)
    }.disposed(by: disposeBag)
    
    distinctUntilChanged

    过滤掉可观察到的重复item,表示如果发射的事件与上一次不相同那么才会发射此次事件。

    Observable<Int>.of(1, 2, 2, 2, 3).distinctUntilChanged().subscribe { event in
         let text = "filter: distinctUntilChanged__\(event)"
         self.showText(text: text)
                                                                        
         /// filter: distinctUntilChanged__next(1)
         /// filter: distinctUntilChanged__next(2)
         /// filter: distinctUntilChanged__next(3)
         /// filter: distinctUntilChanged__completed
         print(text)
    }.disposed(by: disposeBag)
    
    elementAt

    发射第 n个item。

    Observable<Int>.of(1, 2, 3, 4, 5).element(at: 3).subscribe { event in
         let text = "filter: elementAt__\(event)"
         self.showText(text: text)
                                                                
         /// filter: elementAt__next(4)
         /// filter: elementAt__completed
         print(text)
    }.disposed(by: disposeBag)
    
    filter

    仅发射谓词测试通过的Items。

    Observable<Int>.of(9, 10, 11, 12).filter { element -> Bool in
         return element > 10
    }.subscribe { event in
         let text = "filter: filter__\(event)"
         self.showText(text: text)
                                                          
         /// filter: filter__next(11)
         /// filter: filter__next(12)
         /// filter: filter__completed
         print(text)
    }.disposed(by: disposeBag)
    
    skip

    发射第n(包含n)之后的items。

    Observable<Int>.of(9, 10, 11, 12).skip(2).subscribe { event in
         let text = "filter: skip__\(event)"
         self.showText(text: text)
                                                         
         /// filter: skip__next(11)
         /// filter: skip__next(12)
         /// filter: skip__completed
         print(text)
    }.disposed(by: disposeBag)
    
    take

    发射第n(不包含n)之前的items,与skip相反效果。

    Observable<Int>.of(9, 10, 11, 12).take(2).subscribe { event in
         let text = "filter: take__\(event)"
         self.showText(text: text)
                                                         
         /// filter: take__next(9)
         /// filter: take__next(10)
         /// filter: take__completed
         print(text)
    }.disposed(by: disposeBag)
    
    takeLast

    发射第n(包含n)之后的items,与skip相同效果。

    Observable<Int>.of(9, 10, 11, 12).takeLast(2).subscribe { event in
         let text = "filter: takeLast__\(event)"
         self.showText(text: text)
                                                             
         /// filter: takeLast__next(11)
         /// filter: takeLast__next(12)
         /// filter: takeLast__completed
         print(text)
    }.disposed(by: disposeBag)
    

    Combine结合相关

    merge

    将多个序列的items合并为一个序列的items。

    let observable1 = Observable<Int>.of(1, 3, 5, 7, 9)
    let observable2 = Observable<Int>.of(2, 4, 6)
    Observable<Int>.merge(observable1, observable2).subscribe { event in
         let text = "combine: merge__\(event)"
         self.showText(text: text)
                                                               
         /// combine: merge__next(1)
         /// combine: merge__next(2)
         /// combine: merge__next(3)
         /// combine: merge__next(4)
         /// combine: merge__next(5)
         /// combine: merge__next(6)
         /// combine: merge__next(7)
         /// combine: merge__next(9)
         /// combine: merge__completed
         print(text)
    }.disposed(by: disposeBag)
    
    startWith

    在发射序列items前新增一个item。

    Observable<String>.of("  ", "RxSwift", "!").startWith("hello").reduce("") { (accum, element) -> String in
         return accum + element
    }.subscribe { event in
         let text = "combine: startWith__\(event)"
         self.showText(text: text)
                                                                                           
         /// combine: startWith__next(hello  RxSwift!)
         /// combine: startWith__completed
         print(text)
    }.disposed(by: disposeBag)
    
    zip

    将多个序列的items进行一一合并,但是需要注意的是,它会等到item对其后合并,未对齐的会舍弃。

    let observable3 = Observable<Int>.of(1, 2, 3, 4, 5, 6, 7)
    let observable4 = Observable<String>.of("A", "B", "C", "D")
    Observable<String>.zip(observable3, observable4) { (e1:Int, e2:String) -> String in
         return "\(e1)\(e2)"
    }.subscribe { event in
         let text = "combine: zip__\(event)"
         self.showText(text: text)
                                                                  
         /// combine: zip__next(1A)
         /// combine: zip__next(2B)
         /// combine: zip__next(3C)
         /// combine: zip__next(4D)
         /// combine: zip__completed
         print(text)
    }.disposed(by: disposeBag)
    
    combineLatest

    如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。
    类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。
    combineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,combineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

    let observable5 = Observable<Int>.of(1, 2, 3, 4, 5)
    let observable6 = Observable<String>.of("A", "B", "C", "D")
    Observable<String>.combineLatest(observable5, observable6) { (e1:Int, e2:String) -> String in
         return "\(e1)\(e2)"
    }.subscribe { event in
         let text = "combine: combineLatest__\(event)"
         self.showText(text: text)
                                                                            
         /// combine: combineLatest__next(1A)
         /// combine: combineLatest__next(2A)
         /// combine: combineLatest__next(2B)
         /// combine: combineLatest__next(3B)
         /// combine: combineLatest__next(3C)
         /// combine: combineLatest__next(4C)
         /// combine: combineLatest__next(4D)
         /// combine: combineLatest__next(5D)
         /// combine: combineLatest__completed
         print(text)
    }.disposed(by: disposeBag)
    
    Observable<String>.combineLatest(observable6, observable5) { (e1:String, e2:Int) -> String in
         return "\(e1)\(e2)"
    }.subscribe { event in
         let text = "combine: combineLatest__\(event)"
         self.showText(text: text)
                                                                            
         /// combine: combineLatest__next(A1)
         /// combine: combineLatest__next(B1)
         /// combine: combineLatest__next(B2)
         /// combine: combineLatest__next(C2)
         /// combine: combineLatest__next(C3)
         /// combine: combineLatest__next(D3)
         /// combine: combineLatest__next(D4)
         /// combine: combineLatest__next(D5)
         /// combine: combineLatest__completed
         print(text)
    }.disposed(by: disposeBag)
    

    Error错误处理相关

    catch

    在收到序列的异常事件时,通过返回另一个序列来持续发送非error事件。

    Observable<UInt8>.create { observer in
         observer.onNext(0)
         observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
         return Disposables.create()
    }.catch { error -> Observable<UInt8> in
         let text = "combine: catch error__\(error)"
         self.showText(text: text)
                                      
         /// combine: catch error__Error Domain=error Code=110 "(null)"
         print(text)
         return Observable<UInt8>.of(1, 2)
    }.subscribe { event in
         let text = "combine: catch__\(event)"
         self.showText(text: text)
                                                  
         /// combine: catch__next(0)
         /// combine: catch__next(1)
         /// combine: catch__next(2)
         print(text)
    }.disposed(by: disposeBag)
    
    retry

    出现错误事件后,重新发送所有事件信息。

    Observable<UInt8>.create { observer in
          observer.onNext(0)
          observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
          return Disposables.create()
    }.retry(3).subscribe { event in
          let text = "combine: retry__\(event)"
          self.showText(text: text)
                                                   
          /// combine: retry__next(0)
          /// combine: retry__next(0)
          /// combine: retry__next(0)
          /// combine: retry__error(Error Domain=error Code=110 "(null)")
          print(text)
    }.disposed(by: disposeBag)
    

    PracticalOperation实用操作相关

    delay

    延迟发射事件。

    let text = "practicalOperation: delay__start time: \(Date())"
    showText(text: text)
    
    /// practicalOperation: delay__start time: 2021-03-14 09:38:36 +0000
    print(text)
    Observable<Int>.of(1, 2).delay(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
         if event.isCompleted {
             let text = "practicalOperation: delay__end time: \(Date())"
             self.showText(text: text)
                                                                                                                
             /// practicalOperation: delay__end time: 2021-03-14 09:38:37 +0000
             print(text)
          }
          let text = "practicalOperation: delay__\(event)"
          self.showText(text: text)
                                                                                                            
          /// practicalOperation: delay__next(1)
          /// practicalOperation: delay__next(2)
          /// practicalOperation: delay__completed
          print(text)
    }.disposed(by: disposeBag)
    
    do

    在一个序列的每个事件执行之前添加一个执行动作。

    Observable<Int>.of(1, 2, 3).do { _ in
         let text = "practicalOperation: do__previous next"
         self.showText(text: text)
         print(text)
    } onError: { _ in
         let text = "practicalOperation: do__previous error"
         self.showText(text: text)
         print(text)
    } onCompleted: {
        let text = "practicalOperation: do__previous complete"
        self.showText(text: text)
        print(text)
    }.subscribe { event in
        let text = "practicalOperation: do__\(event)"
        self.showText(text: text)
                 
        /// practicalOperation: do__previous next
        /// practicalOperation: do__next(1)
        /// practicalOperation: do__previous next
        /// practicalOperation: do__next(2)
        /// practicalOperation: do__previous next
        /// practicalOperation: do__next(3)
        /// practicalOperation: do__previous complete
        /// practicalOperation: do__completed
        print(text)
    }.disposed(by: disposeBag)
    
    observeOn

    observer在指定scheduler中观察序列事件。

    Observable<Int>.of(1).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "test"))).subscribe { event in
         let text = "practicalOperation: observeOn__isMainThread: \(Thread.current.isMainThread)  \(event)"
         DispatchQueue.main.async {
              self.showText(text: text)
         }
                                                                                                                        
         /// practicalOperation: observeOn__isMainThread: false  next(1)
         /// practicalOperation: observeOn__isMainThread: false  completed
         print(text)
    }.disposed(by: disposeBag)
    
    subscribeOn

    在指定的scheduler中操作,参考observeOn。

    Observable<Int>.of(1).subscribe(on: MainScheduler.instance).subscribe { event in
         let text = "practicalOperation: subscribeOn__isMainThread: \(Thread.current.isMainThread)  \(event)"
         self.showText(text: text)
                                                                           
         /// practicalOperation: subscribeOn__isMainThread: true  next(1)
         /// practicalOperation: subscribeOn__isMainThread: true  completed
         print(text)
    }.disposed(by: disposeBag)
    
    timeout

    一个序列在指定时间内未发射完成所有事件,那么将会进入.onError。

    Observable<Int>.of(1).delay(RxTimeInterval.seconds(2), scheduler: MainScheduler.instance).timeout(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
         let text = "practicalOperation: timeout__\(event)"
         self.showText(text: text)
    
         /// practicalOperation: timeout__error(Sequence timeout.)
         print(text)
    }.disposed(by: disposeBag)
    
    ifEmpty

    如果是序列中没有任何item,那么给定一个default。

    Observable<Int>.empty().ifEmpty(default: 0).subscribe { event in
         let text = "practicalOperation: defaultIfEmpty__\(event)"
         self.showText(text: text)
                                                           
         /// practicalOperation: defaultIfEmpty__next(0)
         /// practicalOperation: defaultIfEmpty__completed
         print(text)
    }.disposed(by: disposeBag)
    
    skipUntil

    丢弃掉第一个序列的所有items,直到第二个序列的item出现。

    let sourceSequence = PublishSubject<String>()
    let referenceSequence = PublishSubject<String>()
    sourceSequence.skip(until: referenceSequence).subscribe { event in
        let text = "practicalOperation: skipUntil__\(event)"
        self.showText(text: text)
                                                             
        /// practicalOperation: skipUntil__next(4)
        /// practicalOperation: skipUntil__next(5)
        /// practicalOperation: skipUntil__completed
        print(text)
    }.disposed(by: disposeBag)
    sourceSequence.onNext("1")
    sourceSequence.onNext("2")
    sourceSequence.onNext("3")
    referenceSequence.onNext("1")
    sourceSequence.onNext("4")
    sourceSequence.onNext("5")
    
    /// 必须调用onCompleted, 不然返回, 所在控制器, 不会被释放
    sourceSequence.onCompleted()
    
    skipWhile

    丢弃掉所有的items,直到满足某个不满足条件的item出现。

    Observable<String>.of("AD", "BD", "CD").skip { element -> Bool in
         return element.contains("A")
    }.subscribe { event in
         let text = "practicalOperation: skipWhile__\(event)"
         self.showText(text: text)
                                                              
         /// practicalOperation: skipWhile__next(BD)
         /// practicalOperation: skipWhile__next(CD)
         /// practicalOperation: skipWhile__completed
         print(text)
    }.disposed(by: disposeBag)
    
    takeUntil

    取得第一个序列所有items,直到第二个序列发射item或者终止。

    Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(until: { element -> Bool in
         return element > 4
    }).subscribe { event in
         let text = "practicalOperation: takeUntil__\(event)"
         self.showText(text: text)
                                                                            
         /// practicalOperation: takeUntil__next(1)
         /// practicalOperation: takeUntil__next(2)
         /// practicalOperation: takeUntil__next(3)
         /// practicalOperation: takeUntil__next(4)
         /// practicalOperation: takeUntil__completed
         print(text)
    }.disposed(by: disposeBag)
    
    takeWhile

    取得第一个序列的所有items,直到出现不满足条件的item (请仔细体会与skipWhile的不同之处)。

    Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(while: { element -> Bool in
         return element < 5
    }).subscribe { event in
         let text = "practicalOperation: takeWhile__\(event)"
         self.showText(text: text)
                                                                            
         /// practicalOperation: takeWhile__next(1)
         /// practicalOperation: takeWhile__next(2)
         /// practicalOperation: takeWhile__next(3)
         /// practicalOperation: takeWhile__next(4)
         /// practicalOperation: takeWhile__completed
         print(text)
    }.disposed(by: disposeBag)
    

    Observer观察者&Observable被观察者相关

    AnyObserver
    let observer1 = AnyObserver<Int> { event in
                         let text = "observer: anyObserver__\(event)"
                         self.showText(text: text)
                                      
                         /// observer: anyObserver__next(1)
                         /// observer: anyObserver__next(2)
                         /// observer: anyObserver__next(3)
                         /// observer: anyObserver__completed
                         print(text)
                     }
    Observable<Int>.of(1, 2, 3).subscribe(observer1).disposed(by: disposeBag)
    
    AsyncSubject

    Subject在ReactiveX的一些实现中扮演了一种桥梁或者代理的角色,它既可以作为Observer也可以作为Observable来使用。
    作为观察者来说它可以订阅一个或者多个可观察序列,作为可观察者来说它可以通过items的reemitting来观察,并且还可以发射新的items事件,我们将从如下四个Subject进行学习:
    AsyncSubject仅仅只发送订阅之后的最后一个item以及.onCompleted,如果出现错误,那么仅仅将只发送.onError。

    let asyncSubject = AsyncSubject<Int>()
    asyncSubject.onNext(1)
    asyncSubject.onNext(2)
    asyncSubject.subscribe { event in
         let text = "observer: asyncSubject__\(event)"
         self.showText(text: text)
                            
         /// observer: asyncSubject__next(4)
         /// observer: asyncSubject__completed
         print(text)
    }.disposed(by: disposeBag)
    asyncSubject.onNext(3)
    asyncSubject.onNext(4)
    
    /// 没有调用onCompleted, 则收不到subscribe, 因为不知道哪个是最后一个
    asyncSubject.onCompleted()
    
    ReplaySubject

    订阅ReplaySubject的时候,可以接收到订阅他之后的事件,但也可以接受订阅他之前发出的事件,接受几个事件取决与bufferSize的大小。
    createUnbounded()表示接受所有事件。
    create(bufferSize: 4) 表示可接受到的订阅他之前的事件的个数,但是订阅他之后的事件一定会触发。

    let replaySubject = ReplaySubject<Int>.createUnbounded()
    replaySubject.onNext(1)
    replaySubject.onNext(2)
    replaySubject.subscribe { event in
          let text = "observer: replaySubject__\(event)"
          self.showText(text: text)
                             
          /// observer: replaySubject__next(1)
          /// observer: replaySubject__next(2)
          /// observer: replaySubject__next(3)
          /// observer: replaySubject__completed
          print(text)
    }.disposed(by: disposeBag)
    replaySubject.onNext(3)
    replaySubject.onCompleted()
    
    let replaySubject1 = ReplaySubject<Int>.create(bufferSize: 1)
    replaySubject1.onNext(1)
    replaySubject1.onNext(2)
    replaySubject1.subscribe { event in
          let text = "observer: replaySubject__\(event)"
          self.showText(text: text)
                              
          /// observer: replaySubject__next(2)
          /// observer: replaySubject__next(3)
          /// observer: replaySubject__completed
          print(text)
    }.disposed(by: disposeBag)
    replaySubject1.onNext(3)
    replaySubject1.onCompleted()
    
    PublishSubject

    订阅PublishSubject的时候,只能接收到订阅他之后发生的事件。subject.onNext()发出onNext事件,对应的还有onError()和onCompleted()事件,可以把他看成一个bufferSize=0的ReplaySubject。

    let publishSubject = PublishSubject<Int>()
    publishSubject.onNext(1)
    publishSubject.onNext(2)
    publishSubject.subscribe { event in
           let text = "observer: publishSubject__\(event)"
           self.showText(text: text)
                              
           /// observer: publishSubject__next(3)
           /// observer: publishSubject__completed
           print(text)
    }.disposed(by: disposeBag)
    publishSubject.onNext(3)
    publishSubject.onCompleted()
    
    BehaviorSubject

    订阅了BehaviorSubject,会接受到订阅之前的最后一个事件,订阅之后的事件一定会触发。

    let behaviorSubject = BehaviorSubject<Int>(value: 0)
    behaviorSubject.onNext(1)
    behaviorSubject.onNext(2)
    behaviorSubject.subscribe { event in
            let text = "observer: behaviorSubject__\(event)"
            self.showText(text: text)
                               
            /// observer: behaviorSubject__next(2)
            /// observer: behaviorSubject__next(3)
            /// observer: behaviorSubject__completed
            print(text)
    }.disposed(by: disposeBag)
    behaviorSubject.onNext(3)
    behaviorSubject.onCompleted()
    
    BehaviorRelay

    BehaviorReplay是Swift5 替换 Swift4 中的 Variable。
    1 可以明确的是它不是subject类型,因为它只是一个可观察序列,但是它又包含subject对象(私有的BehaviorSubject)。
    2 初始化的时候也需要一个初始值。
    3 既然它不是一个订阅者,那么就不能发出onNext:、complete和error事件。
    4 只能通过accept发出event。
    总结:BehaviorRelay 跟 BehaviorSubject 很像,只是不是发出complete、error事件。

    let behaviorRelay = BehaviorRelay(value: 0)
    behaviorRelay.accept(1)
    behaviorRelay.accept(2)
    behaviorRelay.subscribe { event in
         let text = "observer: behaviorRelay__\(event)"
         self.showText(text: text)
                             
         /// observer: behaviorRelay__next(2)
         /// observer: behaviorRelay__next(3)
         /// observer: behaviorRelay__next(4)
         print(text)
    }.disposed(by: disposeBag)
    behaviorRelay.accept(3)
    behaviorRelay.accept(4)
    
    Driver

    Driver从名字上可以理解为驱动,在功能上它类似被观察者(Observable),而它本身也可以与被观察者相互转换(Observable: asDriver, Driver: asObservable),它驱动着一个观察者,当它的事件流中有事件涌出时,被它驱动着的观察者就能进行相应的操作。一般我们会将一个Observable被观察者转换成Driver后再进行驱动操作。
    Driver的drive方法与Observable的方法bindTo用法非常相似,事实上,它们的作用也是一样,说白了就是被观察者与观察者的绑定。
    那为什么RxSwift的作者又搞出Driver这么个东西来呢?
    比较与Observable,Driver有以下的特性:
    1 它不会发射出错误(Error)事件。
    2 对它的观察订阅是发生在主线程(UI线程)的。
    3 自带shareReplayLatestWhileConnected。

    let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
         return [ServiceCategoryItemModel(model: Void(), items: models)]
    }.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)
    

    Scheduler调度器相关

    对于Scheduler来说,我们需要了解Concurrent(并行)、Serial(串行)Scheduler就可以了。

    Observable<Int>.of(1, 2, 3).observe(on: SerialDispatchQueueScheduler(internalSerialQueueName: "serialDispatchQueue")).map { element -> Int in
         let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
         DispatchQueue.main.async {
                self.showText(text: text)
         }
                                                                                                                               
         /// scheduler: map --> Main Thread: false element__1
         /// scheduler: map --> Main Thread: false element__2
         /// scheduler: map --> Main Thread: false element__3
         print(text)
         return element * 2
      
    /// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
    }.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
         let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
         self.showText(text: text)
                                                                                                                              
         /// scheduler: subscribe --> Main Thread: true event__next(2)
         /// scheduler: subscribe --> Main Thread: true event__next(4)
         /// scheduler: subscribe --> Main Thread: true event__next(6)
         /// scheduler: subscribe --> Main Thread: true event__completed
         print(text)
    }.disposed(by: disposeBag)
    
    Observable<Int>.of(4, 5, 6).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "concurrentDispatchQueue"))).map { element -> Int in
         let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
         DispatchQueue.main.async {
              self.showText(text: text)
         }
                                                                                                                                           
         /// scheduler: map --> Main Thread: false element__4
         /// scheduler: map --> Main Thread: false element__5
         /// scheduler: map --> Main Thread: false element__6
         print(text)
         return element * 2
                                                                         
    /// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
    }.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
         let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
         self.showText(text: text)
                                                                                                                                          
         /// scheduler: subscribe --> Main Thread: true event__next(8)
         /// scheduler: subscribe --> Main Thread: true event__next(10)
         /// scheduler: subscribe --> Main Thread: true event__next(12)
         /// scheduler: subscribe --> Main Thread: true event__completed
         print(text)
    }.disposed(by: disposeBag)
    
    Observable<Int>.of(7, 8, 9).observe(on: ConcurrentMainScheduler.instance).map { element -> Int in
         let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
         DispatchQueue.main.async {
              self.showText(text: text)
         }
            
         /// scheduler: map --> Main Thread: true element__7
         /// scheduler: map --> Main Thread: true element__8
         /// scheduler: map --> Main Thread: true element__9
         print(text)
         return element * 2
                                                                                   
    /// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
    }.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
         let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
         self.showText(text: text)
                                                                                  
         /// scheduler: subscribe --> Main Thread: true event__next(14)
         /// scheduler: subscribe --> Main Thread: true event__next(16)
         /// scheduler: subscribe --> Main Thread: true event__next(18)
         /// scheduler: subscribe --> Main Thread: true event__completed
         print(text)
    }.disposed(by: disposeBag)
    

    Disposable

    订阅了一个可观察序列,如果有特殊需求需要提前取消订阅时使用。也就是说Disposable是用来取消订阅的一个工具,通过Disposables工具创建。

    let dis1 = Disposables.create()
    let dis2 = Disposables.create {
        print("在Dispose之前所做一些工作")
    }
    let _ = Disposables.create([dis1, dis2])
    
    /// dispose:通过.dispose()取消或者添加到disposeBag(可以将它看成一个非ARC机制下的AutoReleasePool)
    let disposable = Observable<Int>.of(1, 2, 3).subscribe { event in
                          print(event)
                     }
    disposable.dispose()
    
    /// 或者
    disposable.disposed(by: disposeBag)
    

    常用方法

    监听点击事件和输入事件
    closeButton.rx.tap.subscribe(onNext: {[weak self] _ in
         self?.closeClosure?()
    }).disposed(by: disposeBag)
    
    phoneTextField.rx.text.orEmpty.subscribe(onNext: {[weak self] (text) in
         self?.textDidChange()
         self?.textDidChangeClosure?(text)
    }).disposed(by: disposeBag)
    
    phoneTextField.rx.controlEvent(.editingDidBegin).subscribe(onNext: {[weak self] _ in
         self?.editingDidBeginClosure?()
    }).disposed(by: disposeBag)
    
    phoneTextField.rx.controlEvent(.editingDidEnd).subscribe(onNext: {[weak self] _ in
         self?.editingDidEndClosure?()
    }).disposed(by: disposeBag)
    
    代替通知
    // MARK: public
    extension NotificationManager {
    
        /**
         注册通知
         */
        static func registNotification(_ name:NSNotification.Name, disposeBag:DisposeBag, callBack:@escaping ((_ noti:Notification) -> ())) {
            NotificationCenter.default.rx.notification(name).subscribe(onNext: {(noti) in
                callBack(noti)
            }).disposed(by: disposeBag)
        }
    
        /**
         发送通知
         */
        static func post(name aName: NSNotification.Name, object anObject: Any? = nil, userInfo aUserInfo: [AnyHashable : Any]? = nil) {
            NotificationCenter.default.post(name: aName, object: anObject, userInfo: aUserInfo)
        }
    }
    
    代替观察者
    /// 只发出与下一个间隔超过0.25秒的元素
    scrollView.rx.observe(String.self, "contentSize").debounce(RxTimeInterval.milliseconds(250), scheduler: MainScheduler.instance).subscribe(onNext: {[weak self] _ in
        self?.contentSizeClosure?(self?.scrollView.contentSize ?? .zero)
    }).disposed(by: self.disposeBag)
    
    信号合并
    /// 将下拉刷新/上拉加载更多的信号和请求参数信号合并, 确定最后发起请求的请求参数
    Observable<NewsListParams>.combineLatest(input.paramsRelay, input.reloadDataSubject) {[weak self] (params:NewsListParams, reloadData:Bool) -> NewsListParams in
         params.page = self?.caculatePage(reloadData) ?? 1
         params.row = self?.pageSize ?? 10
         return params
     }.subscribe(onNext: {[weak self] params in
         self?.showHUD()
         NewsManager.requestNewsListData(params: params) { datas in
               HUD.dismiss()
               self?.handleData(datas?.rows ?? [], scrollView: self?.tableView ?? BaseTableView(), isFirstFetch: params.page == 1, totalCount: datas?.total ?? 0)
         }
    }).disposed(by: disposeBag)
    
    数据绑定
    let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
         return [ServiceCategoryItemModel(model: Void(), items: models)]
    }.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)
    
    代替定时器
    /// Interval:创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)
    Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
         let text = "operator: interval__\(event)"
         self.showText(text: text)
    
         /// operator: interval__next(0)
         /// operator: interval__next(1)
         /// operator: interval__next(2)
         /// operator: interval__completed
         print(text)
    }.disposed(by: disposeBag)
    
    /// Timer:在指定的时间后,发送一个特定的Item (E -> Int/NSInteger),请注意这里与Interval的区别(Interval是发送一系列特定Item,而Timer只会发送一个)
    Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
         let text = "operator: timer__\(event)"
         self.showText(text: text)
    
         // operator: timer__next(0)
         /// operator: timer__completed
         print(text)
    }.disposed(by: disposeBag)
    

    相关文章

      网友评论

        本文标题:RxSwift常用基础知识

        本文链接:https://www.haomeiwen.com/subject/fpqsdrtx.html