美文网首页
RXSwift中Scheduler调度者核心原理解析(二)

RXSwift中Scheduler调度者核心原理解析(二)

作者: lb_ | 来源:发表于2019-08-05 11:48 被阅读0次

先来看一个例子:

print("当前测试方法中的线程:\(Thread.current)")

Observable.of(1,2,3,4,5,6,7,8,9,10)
           .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
           .subscribe{print("observeOn",$0,Thread.current)}
           .disposed(by: self.bag)

直接看打印结果:

当前测试方法中的线程:<NSThread: 0x600001232c40>{number = 1, name = main}
observeOn next(1) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn next(2) <NSThread: 0x600001250900>{number = 3, name = (null)}
... /*中间因篇幅原因就省略了*/
observeOn next(10) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn completed <NSThread: 0x600001250900>{number = 3, name = (null)}

上面写法中比较奇特的 observeOn 以及上篇文章中也有提到 subscribeOn 到底做了什么, Scheduler 又是如何调度的呢?

Scheduler 流程解析

步骤1: cmd + 点击 of 进去 , 返回了一个 ObservableSequence 的序列, 这个序列继承自 Producer .

public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
   return ObservableSequence(elements: elements, scheduler: scheduler)
}

注意:

我们调用 .of 只传了 1,2,3,4,5,6,7,8.. 也就是 elements 参数,那么第二个 scheduler 参数则是上面 of方法中所赋的默认参数 CurrentThreadScheduler.instance ( swift 语言特性 不多阐述)。 而这个 CurrentThreadScheduler.instance 上篇文章RXSwift中Scheduler调度者本质核心原理解析(一)中有讲述,就是主队列。也可以自己点进去看下。

那么意味着 原序列 ObservableSequenceScheduler,是主队列。

步骤2: cmd + 点击 observeOn 进去, 注意 进入到父类 ObservableTypeobserveOn 方法中.

public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
     if let scheduler = scheduler as? SerialDispatchQueueScheduler {
         return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
     }else {
         return ObserveOn(source: self.asObservable(), scheduler: scheduler)
     }
}

由于外界我们指定的 observeOn 的参数, 显然 我们走上面, 也就是返回了一个 ObserveOnSerialDispatchQueue 新序列, 并且这个序列同样继承与 Producer .并且这个新产生的中间序列的 Scheduler,为用户指定的。

步骤3: 外界开始执行 subscribe, 而这里很明显是针对ObserveOnSerialDispatchQueue 新序列进行订阅. 上面提到这个新序列继承自 Producer 那么我们直接来到子类的 run 方法中.
(关于这里的执行顺序如果还有问题,请仔细研究本系列 核心逻辑1 以及 核心逻辑2
)

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}

在这里, 我们看到了一步 和 .map 原理分析时同样的操作. 也就是针对源序列进行订阅. self.source.subscribe(sink) ,那么接下来 --基本操作,创建一个 sink 管道,并将用户指定的队列传递过去。

步骤4: 当针对原序列订阅之后,原序列的观察者发送响应时 来到 ObservableSequence 的父类 Producersubscribe 方法中。

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    if !CurrentThreadScheduler.isScheduleRequired {   // 后续走这里
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
        return disposer
    }
    else {    // 第一次走这里
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
    }
}

第一次进来,标识为 true ,走 CurrentThreadScheduler.instance.schedule(())

public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    if CurrentThreadScheduler.isScheduleRequired {
        CurrentThreadScheduler.isScheduleRequired = false

        let disposable = action(state)

        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }

        guard let queue = CurrentThreadScheduler.queue else {
            return disposable
        }
    ///省略
        return disposable
    }
    ///省略
    return scheduledItem
}

我们看到这一步 将标识置为 false , 并调用传递进来的尾随闭包 action(state) ,那么回到上个方法 就会执行 self.run(observer, cancel: disposer) 也就是会来到源序列的 run 方法。

步骤5: ObservableSequence -> run

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
    let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}

再过渡到 ObservableSequenceSink -> run

func run() -> Disposable {
    return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
        var mutableIterator = iterator
        if let next = mutableIterator.next() {
            self.forwardOn(.next(next))
            recurse(mutableIterator)
        }
        else {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

cmd + 点击 scheduleRecursive,之前提到原序列的 _scheduler 是默认主队列,进入到对应方法中,ImmediateSchedulerType -> scheduleRecursive

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
    
    recursiveScheduler.schedule(state)
    
    return Disposables.create(with: recursiveScheduler.dispose)
}

我们看到 创建了一个 RecursiveImmediateScheduler 的类, 保存了外部 self._parent._scheduler.scheduleRecursive 传过来的闭包, 并且调用了自己的 schedule 方法。

步骤6: RecursiveImmediateScheduler -> schedule方法

func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial

    let d = self._scheduler.schedule(state) { state -> Disposable in
        //无关代码已省略
        let action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
        
        if let action = action {
            action(state, self.schedule)
        }
        //...
        return Disposables.create()
    }

}

调用: self._scheduler.schedule , 这里的 self._scheduler 为主队列, 那么调用主队列的 schedule 方法,也就是 SerialDispatchQueueSchedulerschedule 方法


过度方法省略,直接来到
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }


        cancel.setDisposable(action(state))
    }

    return cancel
}

走到这里,异步执行了 RecursiveImmediateScheduler 的闭包,也就是下图中选中的地方。


走到这里,我们已经找到了 action(state, self.schedule) 方法调用,那么就继续回调。

步骤7: ObservableSequenceSink 的回调


步骤6中 action 调用,即上图选中部分,那么意味着调用了 self.forwardOn(.next(next))
点进去可以看到其实就是 self._observer.on(event) ,给原序列发送响应。而在中间序列订阅原序列时, 响应者是 ObserveOnSerialDispatchQueueSink
回顾一下:
final private class ObserveOnSerialDispatchQueue<E>: Producer<E> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<E>

    init(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)   
        return (sink: sink, subscription: subscription)
    }

}

步骤8: ObserveOnSerialDispatchQueueSink -> onCore

self.source.subscribe(sink) 这儿就很灵性了,订阅原序列,但订阅者是新产生的中间序列的sink,并且中间序列的调度环境 scheduler 就是用户指定的队列。
那么由于原序列订阅者是 ObserveOnSerialDispatchQueueSink , 接收到事件就来到了其 onCore 方法。

override func onCore(_ event: Event<E>) {
    _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}

继续调度:
中间过渡方法列一下:

public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}

最终来到: 在 self.queue 中异步执行任务, 调用 action(state)

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }


        cancel.setDisposable(action(state))
    }

    return cancel
}

加个断点验证一下:


打印结果证明,这次 on(event) 是在我们规定的队列执行的。从而实现了 我们所写代码 observeOn 的目的

总结:

.observeOn / .subscribeOn 函数时产生了新的中间序列,而对新序列 ObserveOnSerialDispatchQueue 进行订阅时,ObserveOnSerialDispatchQueuerun 方法中,对源序列同样进行订阅,订阅者为内部类ObserveOnSerialDispatchQueueSink 其调度环境为用户指定的队列。那么源序列发送 event 时, ObserveOnSerialDispatchQueueSink 响应,在用户指定的队列中异步执行任务。

这个流程走下来和 .map 函数很类似,其实这也是 RX 中很多操作函数的流程。 也就是针对原序列做相应操作时,会产生一个新序列,并对原序列进行订阅,而订阅者是具体中间类的sink,或者是自定义的 实现了 onCore方法的类,这个类保存了原序列的事件, 从而实现 类似 映射、筛选、改变调度环境等操作。

相关文章

网友评论

      本文标题:RXSwift中Scheduler调度者核心原理解析(二)

      本文链接:https://www.haomeiwen.com/subject/vttydctx.html