美文网首页
RxSwift #07 | Schedulers

RxSwift #07 | Schedulers

作者: JeremyTechBlog | 来源:发表于2021-12-15 11:28 被阅读0次

    在理解 Schedulers 之前,还需要理解 observeOn 操作符是怎么回事。

    创建自定义调度器已经超出了本书的范围。请记住,RxSwift、RxCocoa 和 RxBlocking 提供的调度器和初始化器通常涵盖99%的情况。总是尽量使用内置的调度器。

    What is a scheduler

    简而言之,Schedulers 是一个发生进程的上下文。这个上下文可以是一个线程、一个调度队列或类似的实体,甚至是在 OperationQueueScheduler 中使用的一个操作。

    [图片上传失败...(image-8bbef7-1639538931250)]

    在这张图中,你有一个 cache operator 的概念。一个 Observable 向服务器发出请求,并检索一些数据。这些数据被一个名为 cache 的自定义操作符处理,它将数据存储在某个地方。在这之后,数据被传递给不同调度器上的所有订阅者,很可能是位于主线程之上的 MainScheduler,这使得 UI 的更新成为可能。

    Demystifying the scheduler

    关于调度器的一个常见误解是,它们与线程同样相关。而这起初似乎是合乎逻辑的——毕竟,调度器的工作确实与 GCD 的调度队列类似。

    但情况完全不是这样的。如果你在写一个自定义的调度器,这依然不是一个值得推荐的方法,你可以使用同一个线程创建多个调度器,或者在多个线程之上创建一个调度器。这将是很奇怪的——但它可以工作!

    [图片上传失败...(image-bd7168-1639538931250)]

    要记住的是,调度器不是线程,它们与线程没有一对一的关系。始终检查调度器执行操作的上下文——而不是线程。在本章后面,你会遇到一些好的例子来帮助你理解这一点。

    Switching schedulers

    RxSwift 中最重要的事情之一是能够随时切换调度器,除了产生事件的内部流程所施加的限制外,没有任何限制。你有很好的理由希望能够控制 operator 在哪个调度器上接收元素。

    • 在后台调度器上执行很重的工作。
    • 控制很重的工作是串行还是并行发生。
    • 为了保证用户界面更新在主线程上的执行。

    <aside> 💡 当使用让你切换调度器的操作符时,要确保序列传输的元素是线程安全的。RwSwift 本身就像苹果的 Dispatch 框架:它可以让你切换调度器/线程,而不管你的数据的线程安全。

    </aside>

    Using subscribeOn

    在某些情况下,你可能想改变可观察的计算代码在哪个调度器上运行——不是任何一个订阅操作符中的代码,而是实际发射可观察事件的代码。

    对于你所创建的自定义观测器,发射事件的代码是尾部闭包所提供的。Observable.create { ... }.

    为该计算代码设置调度器的方法是使用 subscribeOn。 乍听之下,这可能是一个反直觉的名字,但在思考了一段时间后,它开始变得有意义了。

    当你想实际观察一个可观察对象时,你必须首先订阅它。这决定了原始处理将在哪里发生。如果 subscribeOn 没有被调用,RxSwift 会自动使用当前线程。

    [图片上传失败...(image-d3c898-1639538931250)]

    这个过程是使用主调度器在主线程上创建事件。MainScheduler 位于主线程的顶部。所有你想在主线程上执行的任务都必须使用这个调度器,这就是为什么你在前面的例子中使用它来处理用户界面。要切换调度器,你要使用 subscribeOn

    在 main.swift 中,有一个预定义的调度器名为 globalScheduler,它使用一个后台队列。这个调度器是使用全局调度队列创建的,它是一个并发队列。

    let globalScheduler = ConcurrentDispatchQueueScheduler(queue:
    DispatchQueue.global())
    fruit
      .subscribeOn(globalScheduler)
      .dump()
      .dumpingSubscription()
      .disposed(by: bag)
    /**
    00s | [E] [dog] emitted on Main Thread
    00s | [S] [dog] received on Main Thread
    00s | [E] [apple] emitted on Anonymous Thread
    00s | [S] [apple] received on Anonymous Thread
    02s | [E] [pineapple] emitted on Anonymous Thread
    02s | [S] [pineapple] received on Anonymous Thread
    04s | [E] [strawberry] emitted on Anonymous Thread
    04s | [S] [strawberry] received on Anonymous Thread
    **/
    
    

    全局队列使用一个没有名字的线程,所以在这种情况下,匿名线程是全局并发调度队列的线程之一。

    现在,发射器和订阅器都在同一个线程中处理数据。

    [图片上传失败...(image-7b3f73-1639538931250)]

    这很酷,但是如果你想改变观察者执行你的操作者的代码的位置,你能做什么?你必须使用 observeOn

    Using observerOn

    Observing 是 Rx 的三个基本概念之一。它涉及到一个产生事件的实体,以及这些事件的观察者。在这种情况下,与 subscribeOn 相反,observeOn 操作符改变了观察发生的调度器。

    因此,一旦事件被 Observable 推送,这个操作符就会确保订阅者在指定的调度器上接收到该事件。这也包括了你在 observeOn 之后添加的所有操作符!

    要从当前全局调度器切换到主线程,你需要在订阅前调用 observeOn。再来一次,替换你的水果订阅代码:

    fruit
      .subscribeOn(globalScheduler)
      .dump()
      .observeOn(MainScheduler.instance)
      .dumpingSubscription()
      .disposed(by: bag)
    /**
    00s | [E] [dog] emitted on Main Thread
    00s | [S] [dog] received on Main Thread
    00s | [E] [apple] emitted on Anonymous Thread
    00s | [S] [apple] received on Main Thread
    02s | [E] [pineapple] emitted on Anonymous Thread
    02s | [S] [pineapple] received on Main Thread
    04s | [E] [strawberry] emitted on Anonymous Thread
    04s | [S] [strawberry] received on Main Thread
    **/
    
    

    你已经达到了你想要的结果。现在所有的事件都在正确的线程上处理。主观察者在后台线程上处理和生成事件,而订阅者则在主线程上接收事件。

    [图片上传失败...(image-941dde-1639538931250)]

    这是一个非常常见的模式。你用一个后台调度器从服务器上获取数据,并处理收到的数据,只是切换到 MainScheduler 来处理最后的事件并在用户界面上显示数据。

    Pitfalls

    切换调度器和线程的能力看起来很神奇,但它也有一些隐患。为了了解原因,你将使用一个新的线程向主体推送一些事件。因为你需要跟踪计算是在哪个线程上进行的,

    所以一个好的解决方案是使用一个操作系统线程。

    就在水果观测器之后,添加以下代码来生成一些动物。

    let animalsThread = Thread() {
      sleep(3)
      animal.onNext("[cat]")
      sleep(3)
      animal.onNext("[tiger]")
      sleep(3)
      animal.onNext("[fox]")
      sleep(3)
      animal.onNext("[leopard]")
    }
    animalsThread.name = "Animals Thread"
    animalsThread.start()
    /**
    03s | [E] [cat] emitted on Animals Thread
    03s | [S] [cat] received on Animals Thread
    04s | [E] [strawberry] emitted on Anonymous Thread
    04s | [S] [strawberry] received on Main Thread
    06s | [E] [tiger] emitted on Animals Thread
    06s | [S] [tiger] received on Animals Thread
    09s | [E] [fox] emitted on Animals Thread
    09s | [S] [fox] received on Animals Thread
    12s | [E] [leopard] emitted on Animals Thread
    12s | [S] [leopard] received on Animals Thread
    **/
    
    animal
      .dump()
      .observeOn(globalScheduler)
      .dumpingSubscription()
      .disposed(by: bag)
    /**
    03s | [E] [cat] emitted on Animals Thread
    03s | [S] [cat] received on Anonymous Thread
    04s | [E] [strawberry] emitted on Anonymous Thread
    04s | [S] [strawberry] received on Main Thread
    06s | [E] [tiger] emitted on Animals Thread
    06s | [S] [tiger] received on Anonymous Thread
    09s | [E] [fox] emitted on Animals Thread
    09s | [S] [fox] received on Anonymous Thread
    12s | [E] [leopard] emitted on Animals Thread
    12s | [S] [leopard] received on Anonymous Thread
    **/
    
    animal
      .subscribeOn(MainScheduler.instance)
      .dump()
      .observeOn(globalScheduler)
      .dumpingSubscription()
      .disposed(by: bag)
    
    /**
    03s | [E] [cat] emitted on Animals Thread
    03s | [S] [cat] received on Anonymous Thread
    04s | [E] [strawberry] emitted on Anonymous Thread
    04s | [S] [strawberry] received on Main Thread
    06s | [E] [tiger] emitted on Animals Thread
    06s | [S] [tiger] received on Anonymous Thread
    09s | [E] [fox] emitted on Animals Thread
    09s | [S] [fox] received on Anonymous Thread
    12s | [E] [leopard] emitted on Animals Thread
    12s | [S] [leopard] received on Anonymous Thread
    **/
    
    

    等等?什么?为什么计算没有发生在正确的调度器上?这是一个常见且危险的陷阱,它来自于将 RxSwift 默认为异步或多线程的想法——事实并非如此。

    RxSwift 和一般的抽象是自由线程的;在处理数据时没有神奇的线程切换发生。如果你不指定的话,计算总是发生在原来的线程上。

    任何线程切换都是在程序员使用操作符 subscribeOnobserveOn 发出明确请求后发生的。

    认为 RxSwift 默认会做一些线程处理是一个常见的陷阱。上面的情况是对 Subject 的误用。原来的计算是在一个特定的线程上进行的,而这些事件是在该线程中使用Thread() { ... }. 由于 Subject 的性质,RxSwift 没有能力切换原来的计算调度器并转移到另一个线程,因为没有直接控制 Subject 被推到哪里。

    为什么这对水果线程有效呢?这是因为使用 Observable.create(_:) 使 RxSwift 控制了 Thread 块内发生的事情,所以你可以更精细地定制线程处理。

    这种意外的结果通常被称为 "热和冷 "的观察变量问题。

    在上面的案例中,你正在处理一个热的可观察变量。观察变量在订阅过程中没有任何副作用,但是它确实有自己的上下文,在这些上下文中产生事件,RxSwift 无法控制它(即它有自己的线程)。

    相反,一个冷观察者在任何观察者订阅它之前不会产生任何元素。这实际上意味着它没有自己的上下文,直到在订阅时,它创建了一些上下文并开始产生元素。

    Hot vs. cold

    上面的部分涉及到了 Hot and cold observable 的话题。Hot and cold observable 的话题是相当有意见的,并产生了很多争论,所以让我们在这里简单地了解一下它。这个概念可以简化为一个非常简单的问题。

    [图片上传失败...(image-7dd21b-1639538931250)]

    副作用的一些例子是:

    • 向服务器发出请求
    • 编辑本地数据库
    • 写入文件系统
    • 发射火箭

    副作用的世界是无穷无尽的,所以你需要确定你的 Observable 实例是否在订阅时执行副作用。如果你不能确定这一点,那就进行更多的分析或进一步挖掘源代码。在每个订阅上发射火箭可能不是你想要实现的......

    另一种常见的描述方式是问 Observable 是否共享副作用。如果你在订阅时进行了副作用,这意味着副作用不被共享。否则,副作用是与所有订阅者共享的。

    这是一条相当普遍的规则,适用于任何 ObservableType 对象,如主体和相关子类型。

    你可能已经注意到了,到目前为止,我们在书中还没有过多地谈到热观察变量和冷观察变量。这是反应式编程中的一个常见话题,但在 RxSwift 中,你只会在特定的情况下遇到这个概念,比如上面的 Thread 例子,或者当你需要更大的控制时,比如在运行测试时。

    把这一节作为一个参考点,这样万一你需要用热或冷的可观察变量来处理一个问题,你可以快速打开书到这一点,让自己对这个概念有所了解。

    Best practices and built-in schedulers

    Serial vs concurrent schedulers

    考虑到调度器只是一个上下文,它可以是任何东西(调度队列、线程、自定义上下文),而且所有操作者转换序列都需要保留隐含的保证,你需要确定你使用的是正确的调度器。

    • 如果你使用的是一个串行调度器,RxSwift 将以串行方式进行计算。对于一个串行调度队列,调度器也将能够在下面执行自己的优化。
    • 在一个并发调度器中,RxSwift 会尝试同时运行代码,但 observeOnsubscribeOn 会保留任务需要执行的顺序,并确保你的订阅代码最终出现在正确的调度器上。

    MainScheduler

    MainScheduler 位于主线程的顶部。这个调度器被用来处理用户界面的变化和执行其他高优先级的任务。作为在 iOS、tvOS 或 macOS 上开发应用程序的一般做法,长期运行的任务不应该使用这个调度器来执行,所以要避免像服务器请求或其他重型任务。

    此外,如果你执行了更新用户界面的副作用,你必须切换到 MainScheduler 以保证这些更新能够出现在屏幕上。

    在使用大多数 RxCocoa 特性时,MainScheduler 也被用于所有的观察,更具体地说,Driver 和 Signal。正如前一章所讨论的,这些特质确保观察总是在 MainScheduler 中进行,以使你有能力将数据直接绑定到你的应用程序的用户界面上。

    SerialDispatchQueueScheduler

    SerialDispatchQueueScheduler 管理抽象出一个串行 DispatchQueue 上的工作。在使用 observeOn 时,这个调度器有很大的优势,可以进行一些优化。

    你可以使用这个调度器来处理那些最好以串行方式调度的后台工作。例如,如果你有一个应用程序与一个服务器的单一端点对话(如Firebase或GraphQL应用程序),你可能想避免同时调度多个请求,这将给接收端带来太大的压力。这个调度器绝对是你想要的,用于任何应该像串行任务队列一样推进的工作。

    ConcurrentDispatchQueueScheduler

    ConcurrentDispatchQueueScheduler,类似于 SerialDispatchQueueScheduler,管理对 DispatchQueue 的抽象工作。这里的主要区别是,调度器使用的不是一个串行队列,而是一个并发的队列。

    这种调度器在使用 observeOn 时并没有被优化,所以在决定使用哪种调度器时记得要考虑到这一点。

    对于需要同时结束的多个长期运行的任务,ConcurrentDispatchQueueScheduler 可能是一个不错的选择。将多个观察变量与阻塞操作符结合起来,这样所有的结果在准备好的时候就会被结合在一起,这可能会使串行调度器无法发挥其最佳性能。相反,一个并发的调度器可以执行多个并发的任务,并优化结果的收集。

    OperationQueueScheduler

    OperationQueueScheduler 类似于 ConcurrentDispatchQueueScheduler,但是它不是在 DispatchQueue 上抽象出工作,而是在 OperationQueue 上执行工作。有时你需要对正在运行的并发作业进行更多的控制,这是你在并发 DispatchQueue 中无法做到的。

    如果你需要微调并发作业的最大数量,这就是作业的调度器。你可以设置 maxConcurrentOperationCount 来限制并发作业的数量,以适应你的应用程序的需要。

    TestScheduler

    TestScheduler 是一种特殊的野兽。它只用于测试,所以你不应该在生产代码中使用它。这个特殊的调度器简化了操作者的测试;它是 RxTest 库的一部分。你会在关于测试的专门章节中看到这个调度器的使用。

    相关文章

      网友评论

          本文标题:RxSwift #07 | Schedulers

          本文链接:https://www.haomeiwen.com/subject/jdaufrtx.html