美文网首页
RxSwift核心逻辑(二)-Schedulers

RxSwift核心逻辑(二)-Schedulers

作者: jamalping | 来源:发表于2019-08-04 08:22 被阅读0次

    前言

    Schedulers是Rx实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

    在这里,我们就来探索下RxSwift中。Schedulers是什么?是如何实现的呢?又是如何实现Schedulers的切换的呢?

    Schedulers是什么

    根据文档:
    Schedulers分为以下几种:

    • MainScheduler: 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。
    • SerialDispatchQueueScheduler: 抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。
    • ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。
    • OperationQueueScheduler :抽象了 NSOperationQueue。它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。
    • ConcurrentMainScheduler:看源码时发现的。代表主线程,他对subscribeOn操作符做了优化,因此。在主线程subscribeOn应该使用ConcurrentMainScheduler

    接下来我们就深入到源码当中,看这些Scheduler到底是如何实现的。

    Schedulers的实现

    MainScheduler

    看定义,MainSchedulerSerialDispatchQueueScheduler的子类。

    源代码如下:

    public final class MainScheduler : SerialDispatchQueueScheduler {
    
        private let _mainQueue: DispatchQueue
    
        /// 计数(相当于引用计数)
        let numberEnqueued = AtomicInt(0)
    
        public init() {
            self._mainQueue = DispatchQueue.main
            super.init(serialQueue: self._mainQueue)
        }
    
        public static let instance = MainScheduler()
    
        public static let asyncInstance = SerialDispatchQueueScheduler(serialQueue: DispatchQueue.main)
    
        public class func ensureExecutingOnScheduler(errorMessage: String? = nil) {
            if !DispatchQueue.isMain {
                rxFatalError(errorMessage ?? "Executing on background thread. Please use `MainScheduler.instance.schedule` to schedule work on main thread.")
            }
        }
    
        public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
            #if !os(Linux) // isMainThread is not implemented in Linux Foundation
                guard Thread.isMainThread else {
                    rxFatalError(errorMessage ?? "Running on background thread.")
                }
            #endif
        }
    
        override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            let previousNumberEnqueued = increment(self.numberEnqueued)
    
            if DispatchQueue.isMain && previousNumberEnqueued == 0 {
                let disposable = action(state)
                decrement(self.numberEnqueued)
                return disposable
            }
    
            let cancel = SingleAssignmentDisposable()
    
            self._mainQueue.async {
                if !cancel.isDisposed {
                    _ = action(state)
                }
    
                decrement(self.numberEnqueued)
            }
    
            return cancel
        }
    }
    

    从源代码很容易看出,MainScheduler封装了DispatchQueue.main。并提供了一系列访问方法,包括initinstanceasyncInstance。并实现了自己的调度方法scheduleInternal

    scheduleInternal的主要逻辑为:

    • 1、 将事件的引用计数加一,当此时的引用计数为0时,执行一次事件回调。事件回调完成后引用计数减一。
    • 2、当引用计数不为0时,也就是说当前主队列中还有Event事件没有执行完,继续在主线程中执行未完成的Event事件,完成一件后,相应的引用计数减一

    ConcurrentDispatchQueueScheduler

    源码如下:

    public class ConcurrentDispatchQueueScheduler: SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
        
        public var now : Date {
            return Date()
        }
    
        let configuration: DispatchQueueConfiguration
        
        public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
        }
        
       
        public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.init(queue: DispatchQueue(
                label: "rxswift.queue.\(qos)",
                qos: qos,
                attributes: [DispatchQueue.Attributes.concurrent],
                target: nil),
                leeway: leeway
            )
        }
        
        /// 正常调度
        public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.schedule(state, action: action)
        }
        
        /// 延迟调度,dueTime为延迟时间
        public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
        }
        
        /// 延迟并重复调度
        /// startAfter: 延迟时长。
        /// period: 重复时间间隔。
        public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
            return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
        }
    }
    

    说明:

    • 1、提供了一些初始化接口,主要是初始化configuration
    • 2、提供了调度接口:schedule正常调度、scheduleRelative延迟调度、schedulePeriodic延迟重复调度。这些调度接口都是通过configuration去实现的。

    DispatchQueueConfiguration的源码这里就不贴了,源码主要是通过DispatchSource实现了上面提到的几种调度。

    SerialDispatchQueueScheduler

    SerialDispatchQueueScheduler源码如下:

    public class SerialDispatchQueueScheduler : SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
        
        /// - returns: Current time.
        public var now : Date {
            return Date()
        }
        
        let configuration: DispatchQueueConfiguration
        
        init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
        }
    
        public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
            serialQueueConfiguration?(queue)
            self.init(serialQueue: queue, leeway: leeway)
        }
        
        public convenience init(queue: DispatchQueue, internalSerialQueueName: String, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            // Swift 3.0 IUO
            let serialQueue = DispatchQueue(label: internalSerialQueueName,
                                            attributes: [],
                                            target: queue)
            self.init(serialQueue: serialQueue, leeway: leeway)
        }
    
        @available(iOS 8, OSX 10.10, *)
        public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.init(queue: DispatchQueue.global(qos: qos.qosClass), internalSerialQueueName: internalSerialQueueName, leeway: leeway)
        }
        /// 调度方法,在当前线程中实现action回调
        public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.scheduleInternal(state, action: action)
        }
        /// 调度方法,在当前线程中实现action回调,可供子类重写
        func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.schedule(state, action: action)
        }
        /// 延迟调度,dueTime为延迟时间
        public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
        }
        /// 延迟并重复调度
        /// startAfter: 延迟时长。
        /// period: 重复时间间隔。
        public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
            return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
        }
    }
    

    由源码可以看出SerialDispatchQueueScheduler的实现和ConcurrentDispatchQueueScheduler大体相同。只不过一个抽象了串行队列,一个抽象了并发队列。

    OperationQueueScheduler

    OperationQueueScheduler抽象了OperationQueue。因此具有OperationQueue的一些特性,比如设置最大并发数等。
    源码如下:

    public class OperationQueueScheduler: ImmediateSchedulerType {
        public let operationQueue: OperationQueue
        public let queuePriority: Operation.QueuePriority
        
        public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
            self.operationQueue = operationQueue
            self.queuePriority = queuePriority
        }
       
        public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            let cancel = SingleAssignmentDisposable()
    
            let operation = BlockOperation {
                if cancel.isDisposed {
                    return
                }
    
    
                cancel.setDisposable(action(state))
            }
    
            operation.queuePriority = self.queuePriority
    
            self.operationQueue.addOperation(operation)
            
            return cancel
        }
    
    }
    

    ConcurrentMainScheduler(看源码时发现的)

    /**
    Abstracts work that needs to be performed on `MainThread`. In case `schedule` methods are called from main thread, it will perform action immediately without scheduling.
    
    This scheduler is optimized for `subscribeOn` operator. If you want to observe observable sequence elements on main thread using `observeOn` operator,
    `MainScheduler` is more suitable for that purpose.
    */
    public final class ConcurrentMainScheduler : SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
    
        private let _mainScheduler: MainScheduler
        private let _mainQueue: DispatchQueue
    
        /// - returns: Current time.
        public var now: Date {
            return self._mainScheduler.now as Date
        }
    
        private init(mainScheduler: MainScheduler) {
            self._mainQueue = DispatchQueue.main
            self._mainScheduler = mainScheduler
        }
    
        /// Singleton instance of `ConcurrentMainScheduler`
        public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
    
        public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            if DispatchQueue.isMain {
                return action(state)
            }
    
            let cancel = SingleAssignmentDisposable()
    
            self._mainQueue.async {
                if cancel.isDisposed {
                    return
                }
    
                cancel.setDisposable(action(state))
            }
    
            return cancel
        }
    
        public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self._mainScheduler.scheduleRelative(state, dueTime: dueTime, action: action)
        }
    
        public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
            return self._mainScheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
        }
    }
    

    源码的大概实现和其他Scheduler差不多。

    这里我们要注意这点

    ConcurrentMainScheduler抽象了MainScheduler并针对subscribeOn操作符做了相应的优化。而MainScheduler针对observeOn作了优化。具体请看schedule方法的分别实现。

    因此:

    • 1、在主线程subscribeOn应该使用ConcurrentMainScheduler
    • 2、在主线程observeOn应该使用MainScheduler

    如何切换Scheduler

    subscribeOn & observeOn

    observeOnsubscribeOn属于Rx中调度器调度的操作符。

    ru

    如上图:

    图上有两个Schedulers.

    subscribeOn用来决定数据序列的构建函数在哪个Scheduler上运行.

    observeOn用来决定在哪个 Scheduler监听这个数据序列。

    实现逻辑

    为了说明捋清楚这个逻辑,我们先看一个简单的demo:

    demo的大概逻辑:在子线程创建序列,在主线程订阅(监听)这个序列

    /// 创建一个String类型的序列。
    let observable = Observable<String>.create { (observe) -> Disposable in
        /// 发送一个事件
        observe.onNext("1")
        /// 发送一个事件
        observe.onNext("2")
        /// 发送完成事件
        observe.onCompleted()
                
        return Disposables.create()
    }
            
    /// 在子线程构建该序列,在主线程订阅。
    observable
        .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: DispatchQueue.global()))
        .observeOn(MainScheduler.instance)
        .subscribe(onNext: { (str) in
            /// 监听到next事件
            print(str)
        }, onError: { ( error) in
            /// 监听到error事件
            print(error)
        }, onCompleted: {
            /// 监听到完成事件
            print("订阅完成")
        }).disposed(by: disposeBag)
    

    subscribeOn

    结合一开始的例子,当我们调用subscribeOn,RxSwift会如何处理呢?

    源码如下:

    extension ObservableType {
        public func subscribeOn(_ scheduler: ImmediateSchedulerType)
            -> Observable<Element> {
            return SubscribeOn(source: self, scheduler: scheduler)
        }
    }
    

    由此可以看出,调用subscribeOn方法。初始化了SubscribeOn的实例,并保存了源序列和调度器scheduler

    SubscribeOn的实现如下

    final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
        let source: Ob
        let scheduler: ImmediateSchedulerType
        
        init(source: Ob, scheduler: ImmediateSchedulerType) {
            self.source = source
            self.scheduler = scheduler
        }
        
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
            let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
    

    SubscribeOn是继承Producer,所以也是一个可监听序列。

    observeOn

    extension ObservableType {
        public func observeOn(_ scheduler: ImmediateSchedulerType)
            -> Observable<Element> {
                if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                    return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
                }
                else {
                    return ObserveOn(source: self.asObservable(), scheduler: scheduler)
                }
        }
    }
    

    说明:

    • 1、如果当前的schedulerSerialDispatchQueueScheduler,则初始化一个ObserveOnSerialDispatchQueue并返回。
    • 2、否则初始化一个ObserveOn,并返回。

    结合一开始给的例子observeOn(MainScheduler.instance):所以这里初始化了一个SerialDispatchQueueScheduler并保存了源序列和调度器。

    SerialDispatchQueueScheduler的实现如下:

    final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
        let scheduler: SerialDispatchQueueScheduler
        let source: Observable<Element>
    
        init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
            self.scheduler = scheduler
            self.source = source
    
            #if TRACE_RESOURCES
                _ = Resources.incrementTotal()
                _ = increment(_numberOfSerialDispatchQueueObservables)
            #endif
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
            let subscription = self.source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    
        #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
            _ = decrement(_numberOfSerialDispatchQueueObservables)
        }
        #endif
    }
    

    同样ObserveOnSerialDispatchQueue也继承Producer,所以也是可以可监听序列。

    到这里我们可以发现subscribeOnobserveOn分别创建了一个可监听序列。并保存的源序列和调度器。

    添加scheduler后,序列的产生和订阅流程

    • 1、调用subscribe之后,回来到Producersubscribe方法。其中有一个判断if !CurrentThreadScheduler.isScheduleRequired为false。
    • 2、那么就走CurrentThreadScheduler.instance.schedule方法。派发action

    CurrentThreadScheduler源码如下:

    public class CurrentThreadScheduler : ImmediateSchedulerType {
        /// 调度队列,队列里放的是ScheduledItemType元素
        typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
    
        /// CurrentThreadScheduler的单例
        public static let instance = CurrentThreadScheduler()
        /// 在保存线程特有数据(TSD)之前,需要获取线程特有数据的 key。
        private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
            let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
            defer { key.deallocate() }
                                                                   
            guard pthread_key_create(key, nil) == 0 else {
                rxFatalError("isScheduleRequired key creation failed")
            }
    
            return key.pointee
        }()
        /// 开辟一块内存空间,并返回指向该内存的指针,这里主要用于判断CurrentThreadScheduler是否被调度过。
        private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
            return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
        }()
        /// 调度队列
        static var queue : ScheduleQueue? {
            get {
                return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
            }
            set {
                Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
            }
        }
    
        /// Gets a value that indicates whether the caller must call a `schedule` method.
        /// 判断CurrentThreadScheduler是否需要被调度,默认true
        public static fileprivate(set) var isScheduleRequired: Bool {
            get {
                // 获取线程特有信息
                return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
            }
            set(isScheduleRequired) {
                // 设置线程特有信息
                if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                    rxFatalError("pthread_setspecific failed")
                }
            }
        }
        /// CurrentThreadScheduler的调度
        public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            // 本次调用 schedule 是否需要派发 action
            // 也就是当前线程之前有没有调用过 schedule,或者没有执行完。
            if CurrentThreadScheduler.isScheduleRequired {
                CurrentThreadScheduler.isScheduleRequired = false
    
                let disposable = action(state)
                /// CurrentThreadScheduler调度完成后,将isScheduleRequired重置为true
                defer {
                    CurrentThreadScheduler.isScheduleRequired = true
                    CurrentThreadScheduler.queue = nil
                }
                /// 查看和当前线程关联的队列 queue 中是否有未派发的 action,如果有则执行
                guard let queue = CurrentThreadScheduler.queue else {
                    return disposable
                }
                
                while let latest = queue.value.dequeue() {
                    if latest.isDisposed {
                        continue
                    }
                    /// 派发action,并释放
                    latest.invoke()
                }
    
                return disposable
            }
            /// 如果当前线程有被调用过(还有任务在执行),
            /// 先将 action 先保存到和当前线程关联的队列 queue 中
            let existingQueue = CurrentThreadScheduler.queue
    
            let queue: RxMutableBox<Queue<ScheduledItemType>>
            if let existingQueue = existingQueue {
                queue = existingQueue
            }
            else {
                queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
                CurrentThreadScheduler.queue = queue
            }
            /// 初始化调度对象ScheduledItem,并将action(调度的具体事件)和state保存到ScheduledItem实例中。
            let scheduledItem = ScheduledItem(action: action, state: state)
            /// 将ScheduledItem添加到队列中
            queue.value.enqueue(scheduledItem)
            /// 返回一个调度对象
            return scheduledItem
        }
    }
    
    • 派发action,就会来到ObserveOnSerialDispatchQueuerun方法。run方法里初始化了ObserveOnSerialDispatchQueueSink,并将当前的schedulerobserver以及cancel保存了起来。
    • 调用源序列(SubscribeOn)的subscribe并将ObserveOnSerialDispatchQueueSink实例当参数传递过去。
    • 接下来走到了SubscribeOnrun方法。run方法里初始化了SubscribeOnSink,SubscribeOnSink持有了SubscribeOn实例、订阅者observer(这里的订阅者是ObserveOnSerialDispatchQueueSink实例)以及销毁者Cancelable(Cancelable暂不做分析)。
    • 接着调用了SubscribeOnSinkrun方法。

    SubscribeOnSink的实现如下:

    final private class SubscribeOnSink<Ob: ObservableType, Observer: ObserverType>: Sink<Observer>, ObserverType where Ob.Element == Observer.Element {
        typealias Element = Observer.Element 
        typealias Parent = SubscribeOn<Ob>
        
        let parent: Parent
        
        init(parent: Parent, observer: Observer, cancel: Cancelable) {
            self.parent = parent
            super.init(observer: observer, cancel: cancel)
        }
        
        func on(_ event: Event<Element>) {
            self.forwardOn(event)
            
            if event.isStopEvent {
                self.dispose()
            }
        }
        
        func run() -> Disposable {
            let disposeEverything = SerialDisposable()
            let cancelSchedule = SingleAssignmentDisposable()
            
            disposeEverything.disposable = cancelSchedule
            
            let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
                let subscription = self.parent.source.subscribe(self)
                disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
                return Disposables.create()
            }
    
            cancelSchedule.setDisposable(disposeSchedule)
        
            return disposeEverything
        }
    }
    

    对照源码,可以看出SubscribeOnSinkrun主要逻辑如下:

    • 1、在subscribeOn时设置好的scheduler(ConcurrentDispatchQueueScheduler)下进行schedule(调度),接着来到DispatchQueueConfigurationschedule方法。该方法实现如下:
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            let cancel = SingleAssignmentDisposable()
    
            self.queue.async {
                if cancel.isDisposed {
                    return
                }
                cancel.setDisposable(action(state))
            }
            return cancel
        }
    

    可以看出,action会在之前设置好的queue(也就是ConcurrentDispatchQueueScheduler)内去执行。action也就是我们源序列的订阅流程,这就充分说明了。RxSwift是如何在指定scheduler下执行生成序列的函数。

    接下来是源序列的订阅流程。

    • 1、源序列(AnonymousObservable)调用subscribe并将SubscribeOnSink作为订阅者传递过去。
    • 2、接下来会来到AnonymousObservable(源序列)的run方法。接下来的调用流程和RxSwift核心逻辑(一)-序列的产生以及订阅的流程很像。只不过这里中间多了几个订阅者。简单说明一下。
    • 3、AnonymousObservableSinkrun方法,进行Event事件发送

    AnonymousObservableSinkon -> SubscribeOnSinkon -> ObserveOnSerialDispatchQueueSinkonCore

    ObserveOnSerialDispatchQueueSinkonCore实现如下

    override func onCore(_ event: Event<Element>) {
            _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
        }
    

    也就是调用当前的scheduler(observeOn是设置的,根据demo是MainScheduler)的schedule方法。

    接下来的调用MainSchedulerscheduleInternal

    MainSchedulerscheduleInternal的实现如下

    override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            let previousNumberEnqueued = increment(self.numberEnqueued)
    
            if DispatchQueue.isMain && previousNumberEnqueued == 0 {
                let disposable = action(state)
                decrement(self.numberEnqueued)
                return disposable
            }
    
            let cancel = SingleAssignmentDisposable()
    
            self._mainQueue.async {
                if !cancel.isDisposed {
                    _ = action(state)
                }
    
                decrement(self.numberEnqueued)
            }
    
            return cancel
        }
    
    

    可以看到是在主线程调用actionaction也就是ObserveOnSerialDispatchQueueSinkcachedScheduleLambda闭包。

    接下来回来到ObserveOnSerialDispatchQueueSinkobserveron方法。而ObserveOnSerialDispatchQueueSinkobserver就是AnonymousObserver。也就是说接下来调用AnonymousObserveronCore方法去执行事件的回调。

    至此,添加Schedulers后,序列的产生和订阅流程已基本捋清。

    文字描述可能不是太直观,后续会补一个流程图。

    如有误,欢迎指正。

    相关文章

      网友评论

          本文标题:RxSwift核心逻辑(二)-Schedulers

          本文链接:https://www.haomeiwen.com/subject/gtdedctx.html