美文网首页
17 | 响应式编程:如何保证程序状态自动更新?

17 | 响应式编程:如何保证程序状态自动更新?

作者: 清风烈酒2157 | 来源:发表于2021-06-24 09:31 被阅读0次

    [toc]

    前言

    本文来自拉勾网课程整理

    iOS 开发中,随着 App 功能不断增强,处理各种异步事件,保持程序状态实时更新,也变得越来越困难。

    697a504674368fc56429b5c0169c7348

    ViewController 来为例,我们需要处理许多异步事件,比如来自 DelegateDataSource 的回调,来自 NotificationCenter 的通知消息,来自 ViewTarget-Action 事件,等等。

    由于它们随机发生且可能来自不同的线程,本身就会比较复杂,再加上其他新事件的引入,代码处理的逻辑会呈指数式增长。那么,怎样才能从根本上解决这些问题呢?这一讲我们所介绍的响应式编程就可以解决。

    响应式编程与 RxSwift

    所谓响应式编程,就是使用异步数据流(Asynchronous data streams)进行编程。在传统的指令式编程语言里,代码不仅要告诉程序做什么,还要告诉程序什么时候做。而在响应式编程里,我们只需要处理各个事件,程序会自动响应状态的更新。而且,这些事件可以单独封装,能有效提高代码复用性并简化错误处理的逻辑。

    现在,响应式编程已慢慢成为主流的编程范式,比如Android平台的 Architecture Components 提供了支持响应式编程的 LiveDataSwiftUI也配套了 Combine 框架。在 Moments App 中,我采用的也是响应式编程模式

    目前比较流行的响应式编程框架有 ReactiveKitReactiveSwiftCombine。在这里,我们推荐使用RxSwift。因为 RxSwift 遵循了 ReactiveXAPI 标准,由于 ReactiveX提供了多种语言的实现,学会 RxSwift能有效把知识迁移到其他平台。还有 RxSwift 项目非常活跃,也比较成熟。更重要的是,RxSwift 提供的 RxCocoa 能帮助我们为UIKit扩展响应式编程的能力,而Combine所对应的 CombineCocoa还不成熟。

    ffe7adb705a2da31612ce89a5cbb0f11

    为了让 App 可以自动更新状态,我们在Moments App 里面使用RxSwiftMVVM 各层连接起来。

    从上图可以看出,当用户打开朋友圈页面,App 会使用后台排程器向 BFF发起一个网络请求,Networking 模块把返回结果通过Observable 序列发送给 Repository 模块。Repository模块订阅接收后,把数据发送到Subject里面,然后经过map操作符转换,原先的 Model类型转换成了 ViewModel类型。 ViewModel 模块订阅经过操作符转换的数据,发送给下一个Subject,之后,这个数据被 ViewController订阅,并通过主排程器更新了 UI

    整个过程中,Repository模块、 ViewModel模块、ViewController 都是订阅者,分别接收来自前一层的信息。就这样,当App 得到网络返回数据时,就能自动更新每一层的状态信息,也能实时更新 UI 显示。

    这其中的 Observable 序列、订阅者、Subject、操作符、排程器属于 RxSwift中的关键概念,它们该如何理解,如何使用呢?接下来我就一一介绍下。

    异步数据序列 Observable

    为了保证程序状态的同步,我们需要把各种异步事件都发送到异步数据流里,供响应式编程发挥作用。在 RxSwfit 中,异步数据流称为 Observable 序列,它表示可观察的异步数据序列,也可以理解为消息发送序列。

    在实际应用中,我们通常使用Observable序列作为入口,把外部事件连接到响应式编程框架里面。比如在 Moments App ,我通过Observable 把网络请求的结果连接进 MVVM 架构中。

    那么怎样创建 Observable序列呢?为方便我们生成 Observable 序列, RxSwfitObservable类型提供了如下几个工厂方法:

    • just方法,用于生成只有一个事件的 Observable 序列;
    • of方法,生成包含多个事件的 Observable序列;
    • from方法,和of方法一样,from方法也能生成包含多个事件的 Observable 序列,但它只接受数组为入口参数。
    let observable1: Observable<Int> = Observable.just(1) // 序列包含 1
    let observable2: Observable<Int> = Observable.of(1, 2, 3) // 序列包含 1, 2, 3 
    let observable3: Observable<Int> = Observable.from([1, 2, 3]) // 序列包含 1, 2, 3
    let observable4: Observable<[Int]> = Observable.of([1, 2, 3]) // 序列包含 [1, 2, 3]
    

    当你需要生成只有一个事件的 Observable 序列时,可以使用just方法,如observable1只包含了1

    当需要生成包含多个事件的Observable序列时,可以使用of或者from方法。它们的区别是,of接收多个参数而from只接收一个数组。如上所示,我们分别使用了offrom方法来生成observable2observable3,它们都包含了1、2 和 3三个事件。

    这里需要注意,of方法也能接收数组作为参数的。与from方法会拆分数组为独立元素的做法不同,of方法只是把这个数组当成唯一的事件,例如observable4只包含值为[1, 2, 3]的一个事件。

    在开发当中,Observable 序列不仅仅存放数值,比如 Moments App 的异步数据流就需要存放朋友圈信息来更新 UIObservable也支持存放任意类型的数据。像在下面的例子中,peopleObservable就存放了两条类型为Person的数据,其中 Jake的收入是10Ken 的收入是20

    struct Person {
    
    在响应式编程模式里,订阅者是一个重要的角色。在 Moments App 里面,上层模块都担任订阅者角色,主要订阅下层模块的 Observable 序列。那订阅者怎样才能订阅和接收数据呢?
        let name: String
        let income: Int
    }
    let peopleObservable = Observable.of(Person(name: "Jake", income: 10), Person(name: "Ken", income: 20))
    

    订阅者

    在响应式编程模式里,订阅者是一个重要的角色。在Moments App 里面,上层模块都担任订阅者角色,主要订阅下层模块的Observable序列。那订阅者怎样才能订阅和接收数据呢?

    RxSwift 中,订阅者可以调用Observable对象的subscribe方法来订阅。如下所示。

    let observable = Observable.of(1, 2, 3)
    observable.subscribe { event in
        print(event)
    }
    
    

    订阅者调用subscribe方法订阅observable,并接收事件,当程序执行时会打印以下信息:

    next(1)
    next(2)
    next(3)
    completed
    

    你可能会问上面的nextcompleted是什么呢?其实它们都是事件,用来表示异步数据流上的一条信息。RxSwift 使用了Event枚举来表示事件,定义如下。

    public enum Event<Element> {
        /// Next element is produced.
        case next(Element)
        /// Sequence terminated with an error.
        case error(Swift.Error)
        /// Sequence completed successfully.
        case completed
    }
    
    • .next(value: T):用于装载数据的事件。当 Observable 序列发送数据时,订阅者会收到next事件,我们可以从该事件中取出实际的数据。
    • .error(error: Error):用于装载错误事件。当发生错误的时候,Observable 序列会发出error事件并关闭该序列,订阅者一旦收到error事件后就无法接收其他事件了。
    • .completed:用于正常关闭序列的事件。当 Observable 序列发出completed事件时就会关闭自己,订阅者在收到completed事件以后就无法收到任何其他事件了。

    怎么理解呢?下面我通过两个例子来介绍下。由于之前讲过的offrom等方法都不能发出errorcompleted事件 ,在这里我就使用了create方法来创建 Observable序列。

    首先我们看一下发送error事件的例子。

    Observable<Int>.create { observer in
        observer.onNext(1)
        observer.onNext(2)
        observer.onError(MyError.anError)
        observer.onNext(3)
        return Disposables.create()
    }.subscribe { event in
        print(event)
    }
    

    在这个例子中,我们调用了create方法来生成一个 Observable 序列,该 Observable 发出next(1)next(2)errornext(3)事件。由于next(3)事件在错误事件之后,因此订阅者无法接收到next(3)事件。程序执行时会打印下面的日志。

    next(1)
    next(2)
    error(anError)
    

    接着我们看一下发送completed事件的例子。

    Observable<Int>.create { observer in
        observer.onNext(1)
        observer.onCompleted()
        observer.onNext(2)
        observer.onNext(3)
        return Disposables.create()
    }.subscribe { event in
        print(event)
    }
    
    

    在这里,我调用create方法来生成一个 Observable 序列,该 Observable 发出了next(1)、completed、next(2)和next(3)事件。因为next(2)和next(3)都在完成事件之后发出的,所以订阅者也无法接收它们,程序执行时会打印如下的日志。

    next(1)
    completed
    

    在现实生活中,当我们订阅了报刊时可以自己选择退订,却无法让发行方停刊。在 RxSwift 里面也一样,订阅者无法强行让 Observable 序列发出completed事件来关闭数据流。那订阅者该怎样取消订阅呢?

    如果你仔细观察就会发现,subscribe方法返回的类型为Disposable的对象,我们可以通过调用该对象的dispose方法来取消订阅。

    为了更好地理解dispose方法的作用和触发时机,我通过subscribe()方法来打印出各个事件,如下所示。

    let disposable = Observable.of(1, 2).subscribe { element in
        print(element) // next event
    } onError: { error in
        print(error)
    } onCompleted: {
        print("Completed")
    } onDisposed: {
        print("Disposed")
    }
    disposable.dispose()
    

    我们在onNext闭包里面处理next事件;在onError闭包里处理error事件;在onCompleted闭包里处理completed事件;而在onDisposed闭包里处理退订事件。

    在这里,我们调用subscribe方法后,它又马上调用了dispose方法,因此程序会在调用onCompleted之后立刻调用onDisposed。其执行效果如下:

    1
    2
    Completed
    Disposed
    

    假如我在订阅前调用delay方法,那么所有的事件都会延时两秒钟后才通知订阅者,代码如下:

    let disposableWithDelay = Observable.of(1, 2).delay(.seconds(2), scheduler: MainScheduler.instance).subscribe { element in
        print(element) // next event
    } onError: { error in
        print(error)
    } onCompleted: {
        print("Completed")
    } onDisposed: {
        print("Disposed")
    }
    disposableWithDelay.dispose()
    

    和上面没有延时的例子一样,我们在调用subscribe方法以后马上调用了dispose方法,由于 Observable序列上所有事件还在延时等待中,程序会直接调用onDisposed并退订了disposableWithDelay序列,因此没办法再收到两秒钟后所发出的next(1)、next(2)completed事件了。 其执行效果如下:

    Disposed
    

    在很多时候,订阅后马上退订并不是我们想要的结果,我们希望订阅者一直监听事件直到自身消亡的时候才取消订阅。那有什么好的办法能做到这一点呢?

    RxSwift为我们提供了DisposeBag类型,方便存放和管理各个Disposable对象。其用法也非常简单,只需调用Disposable的disposed(by:)方法即可。代码如下:

    let disposeBag: DisposeBag = .init()
    Observable.just(1).subscribe { event in
        print(event)
    }.disposed(by: disposeBag)
    Observable.of("a", "b").subscribe { event in
        print(event)
    }.disposed(by: disposeBag)
    

    代码中的disposeBag存放了两个Disposable对象。当订阅者调用其deinit方法时,同时也会调用disposeBagdeinit方法。在这时候,disposeBag会取出存放的所有Disposable对象,并调用它们的dispose方法来取消所有订阅。

    在实际情况下,我建议只需为一个订阅者定义一个disposeBag即可。例如 Repository模块同时订阅了 Networking模块和 DataStore模块,但它只定义了一个disposeBag来管理所有的订阅。

    事件中转 Subject

    以上是如何生成、订阅和退订 Observable序列。使用Observable的工厂方法所生成的对象都是“只读”,一旦生成,就无法添加新的事件。但很多时候,我们需要往 Observable 序列增加事件,比如要把用户点击UI的事件添加到 Observable 中,或者把底层模块的事件加工并添加到上层模块的序列中。

    那么,有什么好办法能为异步数据序列添加新的事件呢?RxSwift为我们提供的 Subject及其onNext方法可以完成这项操作。

    具体来说,Subject作为一种特殊的Observable序列,它既能接收又能发送,我们一般用它来做事件的中转。在Moments AppMVVM架构里面,我们就大量使用Subject发挥这一作用。 比如,当 Repository 模块从 Networking 模块中接收到事件时,会把该事件转送到自身的 Subject来通知 ViewModel,从而保证 ViewModel的状态同步。

    那么,都有哪些常见的Subject呢?一般有 PublishSubject、BehaviorSubject 和 ReplaySubject。它们的区别在于订阅者能否收到订阅前的事件。

    • PublishSubject:如果你想订阅者只收到订阅后的事件,可以使用 PublishSubject
    • BehaviorSubject:如果你想订阅者在订阅时能收到订阅前最后一条事件,可以使用 BehaviorSubject
    • ReplaySubject:如果你想订阅者在订阅的时候能收到订阅前的 N 条事件,那么可以使用 ReplaySubject

    在订阅以后,它们的行为都是一致的,当 Subject发出error或者completed事件以后,订阅者将无法接收到新的事件。

    操作符

    操作符(Operator)是RxSwift 另外一个重要的概念,它能帮助订阅者在接收事件之前把Observable序列中的事件进行过滤、转换或者合并。

    例如在 Moments App 里面,我们使用 map操作符把 Model数据转换成 ViewModel 类型来更新UI。这里的 map操作符就属于转换操作符,能帮助我们从一种数据类型转变成另外一种类型。除了mapcompactMapflapMap也属于转换操作符。

    此外还有 filterdistinctUntilChanged等过滤操作符,我们可以使用过滤操作符把订阅者不关心的事件给过滤掉。还有合并操作符如 startWith,concatmerge,combineLatestzip,可用于组装与合并多个 Observable 序列。

    除了上面提到过的常用操作符,RxSwift 还为我们提供了 50 多个操作符,那怎样才能学会它们呢?我推荐你到 rxmarbles.com或者到App Store 下载 RxMarbles App,然后打开各个操作符并修改里面的参数,通过输入的事件和执行的结果来理解这些操作的作用。

    排程器

    保持程序状态自动更新之所以困难,很大原因在于处理并发的异步事件是一件烦琐的事情。为了方便处理来自不同线程的并发异步事件,RxSwift 为我们提供了排程器。它可以帮我们把繁重的任务调度到后台排程器完成,并能指定其运行方式(如是串行还是并发),也能保证 UI 的任务都在主线程上执行。

    比如在Moments App 里面,NetworkingDataStore 模块都在后台排程器上执行,而 View 模块都在主排程器上执行。

    根据串行或者并发来归类,我们可以把排程器分成两大类串行的排程器和并发的排程器。

    串行的排程器包括 CurrentThreadScheduler、MainScheduler、SerialDispatchQueueScheduler

    其中,CurrentThreadScheduler可以把任务安排在当前的线程上执行,这是默认的排程器。当我们不指定排程器的时候,RxSwift 都会使用 CurrentThreadScheduler 把任务放在当前线程里串行执行;MainScheduler是把任务调度到主线程MainThread里并马上执行,它主要用于执行 UI相关的任务;而SerialDispatchQueueScheduler则会把任务放在dispatch_queue_t里面并串行执行。

    并发的排程器包括 ConcurrentDispatchQueueSchedulerOperationQueueScheduler

    其中,ConcurrentDispatchQueueScheduler把任务安排到dispatch_queue_t里面,且以并发的方式执行。该排程器一般用于执行后台任务,例如网络访问和数据缓存等等。在创建的时候,我们可以指定DispatchQueue的类型,例如使用ConcurrentDispatchQueueScheduler(qos: .background)来指定使用后台线程执行任务。

    OperationQueueScheduler是把任务放在NSOperationQueue里面,以并发的方式执行。这个排程器一般用于执行繁重的后台任务,并通过设置maxConcurrentOperationCount来控制所执行并发任务的最大数量。它可以用于下载大文件。

    那么,如何用排程器进行调度,处理好不同线程的并发异步事件呢?请看下面的代码实现。

    Observable.of(1, 2, 3, 4)
        .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
        .dumpObservable()
        .map { "\(getThreadName()): \($0)" }
        .observeOn(MainScheduler.instance)
        .dumpObserver()
        .disposed(by: disposeBag)
    
    

    首先我们传入ConcurrentDispatchQueueScheduler(qos: .background)来调用subscribeOn方法,把 Observable 序列发出事件的执行代码都调度到后台排程器去执行。然后通过传入MainScheduler.instance来调用observeOn,把订阅者执行的逻辑都调度主排程器去执行。

    这是一种常用的模式,我们通常使用后台排程器来进行网络访问并处理返回数据,然后通过主排程器把数据呈现到UI 中去。

    由于后台线程不能保证执行的顺序,其执行效果如下,当你执行的时候可能会有点变化。

    [Observable] 1 emitted on Unnamed Thread
    [Observable] 2 emitted on Unnamed Thread
    [Observer] Unnamed Thread: 1 received on Main Thread
    [Observable] 3 emitted on Unnamed Thread
    [Observer] Unnamed Thread: 2 received on Main Thread
    [Observable] 4 emitted on Unnamed Thread
    [Observer] Unnamed Thread: 3 received on Main Thread
    [Observer] Unnamed Thread: 4 received on Main Thread
    

    总结

    在这一讲中我们介绍了RxSwift的五个关键概念:Observable 序列订阅者Subject操作符以及排程器。我把本讲的代码都放在 Moments App 项目中的RxSwift Playground 文件里面,希望你能多练习,把五个概念融会贯通。

    以下是我在实际工作中使用RxSwift的一些经验总结,希望能帮助到你。

    当我们拿到需求的时候,先把任务进行分解,找出哪个部分是事件发布者,哪部分是事件订阅者,例如一个新功能页面,网络请求部分一般是事件发布者,当得到网络请求的返回结果时会发出事件,而 UI 部分一般为事件订阅者,通过订阅事件来保持UI的自动更新。

    找到事件发布者以后,要分析事件发布的频率与间隔。如果只是发布一次,可以使用Obervable;如果需要多次发布,可以使用Subject;如果需要缓存之前多个事件,可以使用ReplaySubject

    当我们有了事件发布者和订阅者以后,接着可以分析发送和订阅事件的类型差异,选择合适的操作符来进行转换。我们可以先使用本讲中提到的常用操作符,如果它们还不能解决你的问题,可以查看 RxMarbles 来寻找合适的操作符。

    最后,我们可以根据事件发布者和订阅者所执行的任务性质,通过排程器进行调度。例如把网络请求和数据缓存任务都安排在后台排程器,而 UI 更新任务放在主排程器。

    源码地址:

    RxSwift Playground文件地址:https://github.com/lagoueduCol/iOS-linyongjian/blob/main/Playgrounds/RxSwiftPlayground.playground/Contents.swift

    相关文章

      网友评论

          本文标题:17 | 响应式编程:如何保证程序状态自动更新?

          本文链接:https://www.haomeiwen.com/subject/enlwyltx.html