美文网首页iOS
学习 RxSwift & RxCocoa

学习 RxSwift & RxCocoa

作者: FicowShen | 来源:发表于2019-03-15 14:19 被阅读0次
    内容概览:
    • Rx Marble Diagrams(宝石图)
    • 关键概念
    • Event - 事件
    • Observable - 产生事件
    • Observer - 响应事件
    • Operator - 创建变化组合事件
    • Disposable - 管理绑定(订阅)的生命周期
    • Schedulers - 线程队列调配



    为什么要使用Rx(ReactiveX)?

    • 简化异步操作
    • 简化多线程操作
    • 更简洁的代码
    • 多平台适用(C++, Java, C#, JavaScript, Python,Scala,Kotlin,Go)

    另外,官方文档解释了一个很重要的点:ReactiveX不是函数响应式编程(Functional Reactive Programming)!
    请不要再被人忽悠,也不要用这个概念去忽悠别人~

    Rx Marble Diagrams(宝石图)

    Observable发出事件,然后被Observer接收并被转换为其他形式,转换后的事件又继续被发出。
    当Observable序列完成时,或者出现错误时,序列被终结。

    Observable是一个序列,类似于Swift中的序列,但是这个序列可以异步接收元素。
    Observable的subscribe方法,类似于Swift序列中的makeIterator方法。
    Observer(通常是一系列的闭包)需要传递给Observable的subscribe,以接收序列的元素。

    Rx抽象了时间状态机,我们可以更方便地去完成业务逻辑。
    否则,我们就要重复地去编写一些有关临时状态转换的逻辑代码。

    宝石图可以帮助你理解Rx中的操作符

    一个关于数字的序列:
    --1--2--3--4--5--6--| // 序列完成,并终结

    另一个关于字符的序列
    --a--b--a--a--a---d---X // 序列发生错误,并终结

    有些序列是无穷无尽的,比如按钮点击事件序列:
    ---tap-tap-------tap--->

    这里有一条基本的规则:
    • 序列可以产生0个或多个元素。
    • 只要序列完成了或者发生了错误,序列就不能再产生元素。

    关键概念


    Event - 事件

    /// 代表一系列事件
    public enum Event<Element> {
        /// 序列产生了一个新的元素
        case next(Element)
        /// 创建序列时产生了一个错误,导致序列终止
        case error(Swift.Error)
        /// 序列的所有元素都已经成功产生,整个序列已经完成
        case completed
    }
    

    请注意,序列产生了错误,就会导致序列终止!
    有时候,这并不是你期待的结果!!!


    Observable - 产生事件

    创建Observable,并让Observer订阅事件,在事件结束后释放观察者。

    typealias JSON = Any
    
    let disposeBag = DisposeBag()
    
    let json: Observable<JSON> = Observable.create { (observer) -> Disposable in
        let task = URLSession.shared.dataTask(with: ...) { data, _, error in
            guard error == nil else {
                observer.onError(error!)
                return
            }
            guard let data = data,
                let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves)
                else {
                observer.onError(DataError.cantParseJSON)
                return
            }
            observer.onNext(jsonObject)
            observer.onCompleted()
        }
        task.resume()
        return Disposables.create { task.cancel() }
    }
    
    json.subscribe(onNext: { json in
        print("取得 json 成功: \(json)")
    }, onError: { error in
        print("取得 json 失败 Error: \(error.localizedDescription)")
    }, onCompleted: {
        print("取得 json 任务成功完成")
    }).disposed(by: disposeBag) // 释放Observers
    
    
    Observable 的冷与热

    一创建就可以开始发出事件的,就是热的。
    直到有observer订阅才开始发出事件的,就是冷的!

    Observable 特征序列(定制版的Observable)
    • Single
      要么发出一个元素,要么产生一个 error 事件。
      一个比较常见的例子就是执行 HTTP 请求,然后返回一个应答或错误。
      不过你也可以用 Single 来描述任何只有一个元素的序列。

    示例Demo:

    let disposeBag = DisposeBag()
    
    func getRepo(_ repo: String) -> Single<[String: Any]> {
        return Single<[String: Any]>.create { single in
            let url = URL(string: "https://api.github.com/repos/\(repo)")!
            let task = URLSession.shared.dataTask(with: url) { data, _, error in
                if let error = error {
                    single(.error(error))
                    return
                }
                guard let data = data,
                      let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
                      let result = json as? [String: Any] else {
                    single(.error(DataError.cantParseJSON))
                    return
                }
                single(.success(result))
            }
            task.resume()
            return Disposables.create { task.cancel() }
        }
    }
    
    getRepo("ReactiveX/RxSwift").subscribe(onSuccess: { json in
        print("JSON: ", json)
    }, onError: { error in
        print("Error: ", error)
    }).disposed(by: disposeBag)
    

    订阅提供一个 SingleEvent 的枚举:

    public enum SingleEvent<Element> {
        case success(Element)
        case error(Swift.Error)
    }
    
    • Completable
      要么产生一个 completed 事件,要么产生一个 error 事件。
      Completable 适用于那种你只关心任务是否完成,而不需要在意任务返回值的情况。

    示例Demo:

    let disposeBag = DisposeBag()
    
    func cacheLocally() -> Completable {
        return Completable.create { completable in
           // Store some data locally
           ...
           ...
    
           guard success else {
               completable(.error(CacheError.failedCaching))
               return Disposables.create {}
           }
           completable(.completed)
           return Disposables.create {}
        }
    }
    
    cacheLocally().subscribe(onCompleted: {
        print("Completed with no error")
    }, onError: { error in
        print("Completed with an error: \(error.localizedDescription)")
    }).disposed(by: disposeBag)
    

    订阅提供一个 CompletableEvent 的枚举:

    public enum CompletableEvent {
        case error(Swift.Error)
        case completed
    }
    
    • Maybe
      它介于 Single 和 Completable 之间,它要么发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
      如果你遇到那种可能需要发出一个元素,又可能不需要发出时,就可以使用 Maybe。

    示例Demo:

    let disposeBag = DisposeBag()
    
    func generateString() -> Maybe<String> {
        return Maybe<String>.create { maybe in
    
            maybe(.success("RxSwift"))
            // OR
            maybe(.completed)
            // OR
            maybe(.error(error))
            return Disposables.create {}
        }
    }
    
    generateString().subscribe(onSuccess: { element in
        print("Completed with element \(element)")
    }, onError: { error in
        print("Completed with an error \(error.localizedDescription)")
    }, onCompleted: {
        print("Completed with no element")
    }).disposed(by: disposeBag)
    

    订阅提供一个 MaybeEvent 的枚举:

    public enum MaybeEvent<Element> {
        case success(Element)
        case error(Swift.Error)
        case completed
    }
    
    • Driver
      为UI元素精心准备的特征序列
      1. 不会产生 error 事件
      2. 一定在 MainScheduler 监听(主线程监听)
      3. 共享状态变化

    为什么要使用Driver?

    • ControlEvent
      用于描述 UI 控件所产生的事件(如:按钮点击事件、输入框文本更新事件等)。
      1. 不会产生 error 事件
      2. 一定在 MainScheduler 订阅(主线程订阅)
      3. 一定在 MainScheduler 监听(主线程监听)
      4. 共享状态变化

    Observer - 响应事件

    button.rx.tap.subscribe(onNext: { [weak self] in
        self?.showAlert()
    }, onError: { error in
        print("发生错误: \(error.localizedDescription)")
    }, onCompleted: {
        print("任务完成")
    })
    

    创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述事件发生时,需要如何做出响应。
    而观察者就是由后面的 onNext,onError,onCompleted的这些闭包构建出来的。

    特征观察者(定制的Observer)
    • AnyObserver
      可以用来描叙任意一种观察者。
    URLSession.shared.rx.data(request: URLRequest(url: url))
    .subscribe(onNext: { data in
        print("Data Task Success with count: \(data.count)")
    }, onError: { error in
        print("Data Task Error: \(error)")
    })
    .disposed(by: disposeBag)
    

    以上代码可以看作:

    let observer: AnyObserver<Data> = AnyObserver { (event) in
        switch event {
        case .next(let data):
            print("Data Task Success with count: \(data.count)")
        case .error(let error):
            print("Data Task Error: \(error)")
        default:
            break
        }
    }
    
    URLSession.shared.rx.data(request: URLRequest(url: url))
    .subscribe(observer)
    .disposed(by: disposeBag)
    
    • Binder
      Binder有以下特征:
      1. 不会处理错误事件
      2. 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler
    let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
        switch event {
        case .next(let isHidden):
            self?.usernameValidOutlet.isHidden = isHidden
        default:
            break
        }
    }
    
    usernameValid
    .bind(to: observer)
    .disposed(by: disposeBag)
    

    以上代码如果用Binder实现,效果更佳。

    let observer: Binder<Bool> = Binder(usernameValidOutlet) { (view, isHidden) in
        view.isHidden = isHidden
    }
    
    usernameValid
    .bind(to: observer)
    .disposed(by: disposeBag)
    

    Binder 可以只处理 next 事件,并且保证响应 next 事件的代码一定会在给定 Scheduler 上执行,这里采用默认的 MainScheduler



    RxCocoa中已经采用Binder实现了很多常用的观察者,比如:

    extension Reactive where Base: UIView {
      public var isHidden: Binder<Bool> {
          // self.base是实际的UI控件对象
          return Binder(self.base) { view, hidden in
              view.isHidden = hidden
          }
      }
    }
    
    extension Reactive where Base: UIControl {
      public var isEnabled: Binder<Bool> {
          return Binder(self.base) { control, value in
              control.isEnabled = value
          }
      }
    }
    
    extension Reactive where Base: UILabel {
      public var text: Binder<String?> {
          return Binder(self.base) { label, text in
              label.text = text
          }
      }
    }
    

    你也可以用这种方式来创建自定义的 UI 观察者

    Observable & Observer

    有些事物比较特别。它们既是可被监听的序列也是观察者。
    有许多 UI 控件都存在这种特性,例如:switch的开关状态,segmentedControl的选中索引号,datePicker的选中日期等等。

    // 作为可被监听的序列
    let observable = textField.rx.text
    observable.subscribe(onNext: { text in show(text: text) })
    
    // 作为观察者
    let observer = textField.rx.text
    let text: Observable<String?> = ...
    text.bind(to: observer)
    



    框架里面定义了一些辅助类型可以帮助你更准确的描述事物的特征,它们既是可被监听的序列也是观察者。

    • AsyncSubject
      将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素)。

    • PublishSubject
      发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。

    • ReplaySubject
      发送全部的元素(元素个数由buffer的大小决定),无论观察者是何时进行订阅的。

    • BehaviorSubject
      当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。

    • Variable
      在 Swift 中我们经常会用 var 关键字来声明变量。
      RxSwift 提供的 Variable 实际上是 var 的 Rx 版本,你可以将它看作是 RxVar。
      如果我们声明的变量需要提供 Rx 支持,那就选用 Variable 这个类型。

    • ControlProperty
      ControlProperty 专门用于描述 UI 控件属性的,它具有以下特征:

      1. 不会产生 error 事件
      2. 一定在 MainScheduler 订阅(主线程订阅)
      3. 一定在 MainScheduler 监听(主线程监听)
      4. 共享状态变化

    Operator - 创建变化组合事件

    操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。

    filter - 过滤

    // 温度
    let rxTemperature: Observable<Double> = ...
    
    // filter 操作符
    rxTemperature.filter { temperature in temperature > 33 }
    .subscribe(onNext: { temperature in
        print("高温:\(temperature)度")
    })
    .disposed(by: disposeBag)
    

    map - 转换

    // JSON
    let json: Observable<JSON> = ...
    
    // map 操作符
    json.map(Model.init)
    .subscribe(onNext: { model in
        print("取得 Model: \(model)")
    })
    .disposed(by: disposeBag)
    

    zip - 配对

    // 汉堡
    let rxHamburg: Observable<Hamburg> = ...
    // 薯条
    let rxFrenchFries: Observable<FrenchFries> = ...
    
    // zip 操作符
    Observable.zip(rxHamburg, rxFrenchFries)
    .subscribe(onNext: { (hamburg, frenchFries) in
        print("取得汉堡: \(hamburg) 和薯条:\(frenchFries)")
    })
    .disposed(by: disposeBag)
    

    Rx提供了充分的操作符来帮我们创建序列。当然如果内置操作符无法满足你的需求时,你还可以创建自定义的操作符。

    如果你不确定该如何选择操作符,可以参考 决策树。它会引导你找出合适的操作符。


    Disposable - 管理绑定(订阅)的生命周期

    通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。
    如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的 可被清除的资源(Disposable) 调用 dispose 方法。
    但是,推荐使用 清除包(DisposeBag) 或者 takeUntil 操作符 来管理订阅的生命周期。

    DisposeBag用法:

    var disposeBag = DisposeBag()
    
    override func viewWillAppear(_ animated: Bool) {
        super.viewWillAppear(animated)
    
        textField.rx.text.orEmpty
            .subscribe(onNext: { text in print(text) })
            .disposed(by: self.disposeBag)
    }
    
    override func viewWillDisappear(_ animated: Bool) {
        super.viewWillDisappear(animated)
    
        self.disposeBag = DisposeBag()
    }
    

    takeUntil 用法:

    override func viewDidLoad() {
        super.viewDidLoad()
    
        _ = usernameValid
            .takeUntil(self.rx.deallocated)
            .bind(to: passwordOutlet.rx.isEnabled)
    
        _ = usernameValid
            .takeUntil(self.rx.deallocated)
            .bind(to: usernameValidOutlet.rx.isHidden)
    
        _ = passwordValid
            .takeUntil(self.rx.deallocated)
            .bind(to: passwordValidOutlet.rx.isHidden)
    
        _ = everythingValid
            .takeUntil(self.rx.deallocated)
            .bind(to: doSomethingOutlet.rx.isEnabled)
    
        _ = doSomethingOutlet.rx.tap
            .takeUntil(self.rx.deallocated)
            .subscribe(onNext: { [weak self] in self?.showAlert() })
    }
    

    Schedulers - 线程队列调配

    Schedulers 是 Rx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。



    一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

    GCD实现:

    DispatchQueue.global(qos: .userInitiated).async {
        let data = try? Data(contentsOf: url)
        DispatchQueue.main.async {
            self.data = data
        }
    } 
    

    RxSwift实现:

    let rxData: Observable<Data> = ...
    
    rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)
    
    • MainScheduler
      MainScheduler 代表主线程。
      如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。

    • SerialDispatchQueueScheduler
      SerialDispatchQueueScheduler 抽象了串行 DispatchQueue。
      如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。

    • ConcurrentDispatchQueueScheduler
      ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。
      如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。

    • OperationQueueScheduler
      OperationQueueScheduler 抽象了 NSOperationQueue。
      它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。


    Error Handling - 错误处理

    一旦序列里面产出了一个 error 事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:

    • retry - 重试
    • catch - 恢复
    retry - 重试

    请求 JSON 失败时,立即重试,重试 3 次后仍然失败就将错误抛出:

    let rxJson: Observable<JSON> = ...
    
    rxJson
    .retry(3)
    .subscribe(onNext: { json in
        print("取得 JSON 成功: \(json)")
    }, onError: { error in
        print("取得 JSON 失败: \(error)")
    })
    .disposed(by: disposeBag)
    

    请求 JSON 失败时,等待 5 秒后重试:

    let retryDelay: Double = 5  // 重试延时 5 秒
    
    rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
    }
    .subscribe(...)
    .disposed(by: disposeBag)
    

    retryWhen 操作符主要描述应该在何时重试,并且通过闭包里面返回的 Observable 来控制重试的时机。

    如果重试超过 4 次,就将错误抛出。如果错误在 4 次以内时,就等待 5 秒后重试:

    let maxRetryCount = 4       // 最多重试 4 次
    let retryDelay: Double = 5  // 重试延时 5 秒
    
    rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
            guard index < maxRetryCount else {
                return Observable.error(error)
            }
            return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
        }
    }
    .subscribe(...)
    .disposed(by: disposeBag)
    

    flatMapWithIndex 操作符可以提供错误的索引数 index。然后用这个索引数判断是否超过最大重试数,如果超过了,就将错误抛出。如果没有超过,就等待 5 秒后重试。

    catch - 恢复

    catchError 可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉。

    当错误产生时,就返回一个空数组,于是就会显示一个空列表页:

    searchBar.rx.text.orEmpty
    ...
    .flatMapLatest { query -> Observable<[Repository]> in
        ...
        return searchGitHub(query)
            .catchErrorJustReturn([])
    }
    ...
    .bind(to: ...)
    .disposed(by: disposeBag)
    

    你也可以使用 catchError,当错误产生时,将错误事件替换成一个备选序列:

    // 先从网络获取数据,如果获取失败了,就从本地缓存获取数据
    
    let rxData: Observable<Data> = ...      // 网络请求的数据
    let cahcedData: Observable<Data> = ...  // 之前本地缓存的数据
    
    rxData
    .catchError { _ in cahcedData }
    .subscribe(onNext: { date in
        print("获取数据成功: \(date.count)")
    })
    .disposed(by: disposeBag)
    
    Result

    如果我们只是想给用户错误提示,那要如何操作呢?

    以下提供一个最为直接的方案,不过这个方案存在一些问题:

    updateUserInfoButton.rx.tap
    .withLatestFrom(rxUserInfo)
    .flatMapLatest { userInfo -> Observable<Void> in
        return update(userInfo)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: {
        print("用户信息更新成功")
    }, onError: { error in
        print("用户信息更新失败: \(error.localizedDescription)")
    })
    .disposed(by: disposeBag)
    

    这样实现是非常直接的。但是 一旦网络请求操作失败了,序列就会终止!!整个订阅将被取消!!
    如果用户再次点击更新按钮,就无法再次发起网络请求进行更新操作了。

    为了解决这个问题,我们需要选择合适的方案来进行错误处理。例如使用枚举 Result:

    // 自定义一个枚举类型 Result
    public enum Result<T> {
        case success(T)
        case failure(Swift.Error)
    }
    

    然后之前的代码需要修改成:

    updateUserInfoButton.rx.tap
    .withLatestFrom(rxUserInfo)
    .flatMapLatest { userInfo -> Observable<Result<Void>> in
        return update(userInfo)
            .map(Result.success)  // 转换成 Result
            .catchError { error in Observable.just(Result.failure(error)) }
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { result in
        switch result {           // 处理 Result
        case .success:
            print("用户信息更新成功")
        case .failure(let error):
            print("用户信息更新失败: \(error.localizedDescription)")
        }
    })
    .disposed(by: disposeBag)
    

    这样我们的错误事件被包装成了 Result.failure(Error) 元素,就不会终止整个序列。就算网络请求失败,整个订阅依然存在。如果用户再次点击更新按钮,也是能够发起网络请求进行更新操作的。

    除此之外,强烈建议阅读 How to handle errors in RxSwift



    最后,对于初学者,建议将RxSwift代码仓库下载到本地。

    然后,打开 Rx.xcworkspace
    在左上角的 Schema 选中 RxSwift-macOS,然后 Build (快捷键:Command + B)项目。
    在Build结束后,进入 Project navigator (快捷键:Command + 1) 并找到 Rx - Rx.playground,然后打开调试窗口(快捷键:Shift + Command + Y) 以查看示例代码执行效果。

    在这个Playground中,官方对 所有的操作符 进行了阐释,这可以帮助你迅速掌握RxSwift。

    最后的最后,强烈建议初学者阅读 RxSwift - Getting Started!!!




    参考文章:
    ReactiveX Introduction
    Github: ReactiveX / RxSwift
    RxMarbles(常见宝石图)
    RxSwift - Getting Started
    RxSwift 中文文档 (本文大部分内容来源于此文档)
    ReactiveX文档中文翻译
    How to handle errors in RxSwift



    如需转载,请注明出处,谢谢 ~

    相关文章

      网友评论

        本文标题:学习 RxSwift & RxCocoa

        本文链接:https://www.haomeiwen.com/subject/wmstmqtx.html