美文网首页Rx
RxSwift源码分析(13)——Scheduler调度者

RxSwift源码分析(13)——Scheduler调度者

作者: 无悔zero | 来源:发表于2020-10-16 00:12 被阅读0次

    在前面的篇章中,多次遇见Scheduler,这是一个调度者,但是都没详细说,因为想在这篇章中详细说说。

    • Scheduler其实就是封装了一套GCD,在一些函数源码里已经使用了,我们也可以主动调用这个Scheduler。主要的有:MainSchedulerSerialDispatchQueueSchedulerConcurrentDispatchQueueSchedulerOperationQueueScheduler
      我们看看下面的例子:
    Observable.of(1, 2, 3)
    .observeOn(MainScheduler.instance)  //主线程
    .subscribe(onNext: { (num) in
        print(num)
    }).disposed(by: disposeBag)
    
    Observable.of(1, 2, 3)
    .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "obOnSerial"))  //串行
    .subscribe(onNext: { (num) in
        print(num)
    }).disposed(by: disposeBag)
    
    Observable.of(1, 2, 3)
    .observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))  //并行
    .subscribe(onNext: { (num) in
        print(num)
    }).disposed(by: disposeBag)
    
    1. 我们从调度者的初始化入手,前3个都是继承了SchedulerType协议,SchedulerType又继承了ImmediateSchedulerType
    (1)MainScheduler
    • MainScheduler的本质其实就是SerialDispatchQueueScheduler,可以看到初始化时,默认保存了DispatchQueue.main队列:
    public final class MainScheduler : SerialDispatchQueueScheduler {
        ...
        public init() {
            self._mainQueue = DispatchQueue.main
            super.init(serialQueue: self._mainQueue)
        }
    
        public static let instance = MainScheduler()
        ...
    }
    
    (2)SerialDispatchQueueScheduler
    public class SerialDispatchQueueScheduler : SchedulerType {
        ...
        public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.init(queue: DispatchQueue.global(qos: qos.qosClass), 
                internalSerialQueueName: internalSerialQueueName, 
                leeway: leeway)
        }
        ...
    }
    
    (3)ConcurrentDispatchQueueScheduler

    ConcurrentDispatchQueueSchedulerSerialDispatchQueueScheduler的思路其实是一样的:

    public class ConcurrentDispatchQueueScheduler: SchedulerType {
        ...
        public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.init(queue: DispatchQueue(
                label: "rxswift.queue.\(qos)",
                qos: qos,
                attributes: [DispatchQueue.Attributes.concurrent], 
                target: nil),
                leeway: leeway
            )
        }
        ...
    }
    
    (4)OperationQueueScheduler

    OperationQueueScheduler封装了NSOperationQueue

    public class OperationQueueScheduler: ImmediateSchedulerType {
        ...
        public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
            self.operationQueue = operationQueue
            self.queuePriority = queuePriority
        }
        ...
    }
    
    1. 我们走MainScheduler的流程,然后我们来看看下一步的函数observeOn,这里会判断是否为串行队列:
    extension ObservableType {
        ...
        public func observeOn(_ scheduler: ImmediateSchedulerType)
            -> Observable<Element> {
                if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                    return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
                }
                else {
                    return ObserveOn(source: self.asObservable(), scheduler: scheduler)
                }
        }
    }
    
    1. 判断自然是SerialDispatchQueueScheduler,然后返回了ObserveOnSerialDispatchQueue序列(ObserveOnSerialDispatchQueue继承了Producer)。ObserveOnSerialDispatchQueue保存了scheduler调度者source源序列
    final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
        let scheduler: SerialDispatchQueueScheduler
        let source: Observable<Element>
    
        init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
            self.scheduler = scheduler
            self.source = source
            ...
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
            let subscription = self.source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
        ...
    }
    
    final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
        ...
        init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
            self.scheduler = scheduler
            self.observer = observer
            self.cancel = cancel
            super.init()
            //默认初始化cachedScheduleLambda
            self.cachedScheduleLambda = { pair in
                ...
            }
        }
        ...
    }
    

    根据RxSwift核心逻辑,来到ObserveOnSerialDispatchQueuerun函数,run函数创建了ObserveOnSerialDispatchQueueSink(业务下沉),ObserveOnSerialDispatchQueueSink创建时默认初始化了self.cachedScheduleLambda。然后由保存的self.source源序列进行subscribe

    1. 继续根据核心逻辑就会来到ObserveOnSerialDispatchQueueSinkonCore函数:
    final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
        ...
        override func onCore(_ event: Event<Element>) {
            _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
        }
        ...
    }
    
    • 在这里保存的self.scheduler调度者开始执行操作。
    1. 接着一步步走到scheduleInternal函数:
    public class SerialDispatchQueueScheduler : SchedulerType {
        ...
            public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.scheduleInternal(state, action: action)
        }
        //不是它
        func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.schedule(state, action: action)
        }
        ...
    }
    
    • 在这里要注意,调用的是MainSchedulerscheduleInternal函数,千万不要被骗了:
    public final class MainScheduler : SerialDispatchQueueScheduler {
        ...
        override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            let previousNumberEnqueued = increment(self.numberEnqueued)
    
            if DispatchQueue.isMain && previousNumberEnqueued == 0 {
                let disposable = action(state)
                decrement(self.numberEnqueued)
                return disposable
            }
    
            let cancel = SingleAssignmentDisposable()
    
            self._mainQueue.async {
                if !cancel.isDisposed {
                    _ = action(state)
                }
    
                decrement(self.numberEnqueued)
            }
    
            return cancel
        }
    }
    
    • 但是不管哪个调度者,最后都会在相应的队列调用传过来的action
    MainScheduler DispatchQueueConfiguration

    action便是ObserveOnSerialDispatchQueueSink初始化保存的self.cachedScheduleLambda闭包:

    final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
        ...
        init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
            self.scheduler = scheduler
            self.observer = observer
            self.cancel = cancel
            super.init()
    
            self.cachedScheduleLambda = { pair in
                guard !cancel.isDisposed else { return Disposables.create() }
    
                pair.sink.observer.on(pair.event)
    
                if pair.event.isStopEvent {
                    pair.sink.dispose()
                }
    
                return Disposables.create()
            }
        }
        ...
    }
    
    1. 最终调用了pair.sink.observer.on(pair.event),这语法让人看得有点懵,慢慢一一对应:

      pair原来是元组,pair.sink.observer就是保存的self.observer,所以
      pair.sink.observer.on=>
      self.observer.on=>
      AnonymousObserver.on
    2. 最后的最后根据RxSwift核心逻辑,便会调用外面的响应闭包:
    .subscribe(onNext: { (num) in
        print(num)
    })
    

    其实调度者封装的GCD就是这么简单,非常好的帮我们控制在相应的队列执行任务。

    相关文章

      网友评论

        本文标题:RxSwift源码分析(13)——Scheduler调度者

        本文链接:https://www.haomeiwen.com/subject/luubpktx.html