美文网首页RAC和RxSwift
RxSwift6.0调度者-Scheduler

RxSwift6.0调度者-Scheduler

作者: 数字d | 来源:发表于2020-02-08 18:31 被阅读0次

    RxSwift中就四个内容
    可观察序列-Observable
    观察者-Observer
    调度者-Scheduler
    销毁者-Dispose

    CurrentThreadScheduler类表示当前线程调度者Scheduler

    public class CurrentThreadScheduler : ImmediateSchedulerType {
        typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
        /// The singleton instance of the current thread scheduler.
        public static let instance = CurrentThreadScheduler()
    
        static var queue : ScheduleQueue? {
            get {
                return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
            }
            set {
                Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
            }
        }
        /// Gets a value that indicates whether the caller must call a `schedule` method.
        public static fileprivate(set) var isScheduleRequired: Bool {
            get {
                return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
            }
            set(isScheduleRequired) {
                if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                    rxFatalError("pthread_setspecific failed")
                }
            }
        }
    
        public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
          ...
         }
    }
    

    外界获取判断当前队列的是否被关联isScheduleRequired,利用对 queue的set,get方法的观察,绑定我们的当前队列与静态字符串
    线程extension

    extension Thread {
        static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary
    
            if let newValue = value {
                threadDictionary[key] = newValue
            }
            else {
                threadDictionary[key] = nil
            }
        }
    
        static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary
            
            return threadDictionary[key] as? T
        }
    }
    

    MainScheduler:表示主线程。如果我们需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行,这里绑定了主队列DispatchQueue.main

    public final class MainScheduler : SerialDispatchQueueScheduler {
        private let _mainQueue: DispatchQueue
        let numberEnqueued = AtomicInt(0)
    
        public init() {
            self._mainQueue = DispatchQueue.main
            super.init(serialQueue: self._mainQueue)
        }
        public static let instance = MainScheduler()
    }
    

    同是这里还有继承了SerialDispatchQueueScheduler就是串行调度者。

    public class SerialDispatchQueueScheduler : SchedulerType {
        let configuration: DispatchQueueConfiguration
        init(serialQueue: DispatchQueue, leeway:) {
            self.configuration = DispatchQueueConfiguration(queue: leeway:)
        }
    
    public convenience init(internalSerialQueueName: serialQueueConfiguration: leeway: ) {
            let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
            serialQueueConfiguration?(queue)
            self.init(serialQueue: queue, leeway: leeway)
        }
     }
    

    从这里也可以看出就是接收串行队列,如果没有,自己内部创建一个串行队列

    public class ConcurrentDispatchQueueScheduler: SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
        
        public var now : Date {
            return Date()
        }
    
        let configuration: DispatchQueueConfiguration
      
        public init(queue: leeway: ) {
            self.configuration = DispatchQueueConfiguration(queue: leeway:)
        }
        
        public convenience init(qos: leeway: ) {
            self.init(queue: DispatchQueue(
                label: "rxswift.queue.\(qos)",
                qos: qos,
                attributes: [DispatchQueue.Attributes.concurrent],
                target: nil),
                leeway: leeway
            )
        }
    }
    

    OperationQueueScheduler:封装了 NSOperationQueue, 下面代码非常清晰了,典型的操作队列和操作优先级

    public class OperationQueueScheduler: ImmediateSchedulerType {
        public let operationQueue: OperationQueue
        public let queuePriority: Operation.QueuePriority
        public init(operationQueue: queuePriority: ) {
            self.operationQueue = operationQueue
            self.queuePriority = queuePriority
        }
    }
    

    调度执行

    func schedule<StateType>(_ state: action: ) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }
    
    func scheduleInternal<StateType>(_ state:  action: ) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }
    
    func scheduleRelative<StateType>(_ state: dueTime: action: ) -> Disposable {
        return self.configuration.scheduleRelative(state, dueTime: action:)
    }
    
    func schedulePeriodic<StateType>(state: startAfter:period: action: ) -> Disposable {
        return self.configuration.schedulePeriodic(state, startAfter: period: action:)
    }
    

    从上面核心方法:schedule 可以非常轻松看出都是我们的 self.configuration具体施行者

    func schedule<StateType>(_ state: StateType, action: ) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        self.queue.async {
            if cancel.isDisposed { return }
            cancel.setDisposable(action(state))
        }
        return cancel
    }
    

    调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行

    observeOn&subscribeOn

    点击按钮测试

    DispatchQueue.global().async {
        self.actionBtn.rx.tap
            .subscribe(onNext: { () in
                print("点击了按钮 --- \(Thread.current)")
            })
            .disposed(by: self.bag)
    }
    

    调度主线程判断

    public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
        let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
                MainScheduler.ensureRunningOnMainThread()
            }
        return ControlEvent(events: source)
    }
    

    线程切换

    public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
        self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
    }
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
        return self._events.subscribe(observer)
    }
    

    OK 很明显我们的 ControlEvent 的序列 subscribe 是调用了一个函数就是:subscribeOn,其中ConcurrentMainScheduler.instance 内部封装了 主队列

    public func subscribeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        return SubscribeOn(source: self, scheduler: scheduler)
    }
    

    看到返回值的类型就知道,原来的序列是被subscribeOn进行处理了,封装了中间层:SubscribeOn 的序列

    final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
        let source: Ob
        let scheduler: ImmediateSchedulerType
        
        init(source: Ob, scheduler: ImmediateSchedulerType) {
            self.source = source
            self.scheduler = scheduler
        }
        
        override func run(_ observer: cancel:) -> (sink:subscription:) {
            let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
    

    看到 SubscribeOn 的继承关系(Producer)
    序列订阅的时候,会创建一个observer的观察者
    经过Producer 流回SubscribeOn的run
    在经过 SubscribeOnSink.run 到观察者的回调(或者内部源序列的订阅,传sink作为观察者回调,后面的流程只是重复走了一次)
    由观察者的发送响应,回到 sink 的 on
    由 sink的属性观察者(也就是中间封装保存的)响应event事件
    最后调用外界的subscribe的闭包

    调度源码:

    func run() -> Disposable {
        let disposeEverything = SerialDisposable()
        let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) {
    
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: disposable:)
            return Disposables.create()
        }
        cancelSchedule.setDisposable(disposeSchedule)
        return disposeEverything
    }
    

    }
    这里就有一个非常重要的方法:self.parent.scheduler.schedule()调用self.scheduleInternal(state, action: action)

    func schedule<StateType>(_ state: action: ) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        self.queue.async {
            if cancel.isDisposed {
                return
            }
            cancel.setDisposable(action(state))
        }
        return cancel
    }
    

    其实这里的action就是一个 schduler 调用时候的闭包,就会执行:let subscription = self.parent.source.subscribe(self), 源序列的subscribe,必然会来到Producer

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
        }
    }
    

    这里会根据当前的调度环境来判断

    public func schedule<StateType>(_ state: action: ) -> Disposable {
    
        if CurrentThreadScheduler.isScheduleRequired {
          // 已经标记,就置false
            CurrentThreadScheduler.isScheduleRequired = false
         // 外界闭包调用执行
            let disposable = action(state)
          // 延迟销毁 
            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }
          ...
            return disposable
        }
         ...
        return scheduledItem
    }
    

    如果你当前调度环境不变,那就没有问题,如果我这里调度的是子线程,那么就完全不一样,针对当前队列,还有线程安全都是需要处理的

    public func scheduleRecursive<State>(_ state: action: ) -> Disposable {
        // 递归调度者
        let recursiveScheduler = RecursiveImmediateScheduler(action: scheduler:)
        // 调度状态执行
        recursiveScheduler.schedule(state)
        return Disposables.create(with: recursiveScheduler.dispose)
    }
    

    递归调用

    func schedule(_ state: State) {
        var scheduleState: ScheduleState = .initial
        let d = self._scheduler.schedule(state) { state -> Disposable in     
            // 这里因为在递归环境,加了一把锁递归锁,保障安全   
            let action = self._lock.calculateLocked { () -> Action? in
                     return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
    ...
    }
    

    这里因为在递归环境,加了一把锁递归锁,保障安全,通过保护,获取action执行,也就是外界传给递归调度者的闭包任务,RxSwift 的数组调度出来是有顺序的,因为在递归调度,已经加锁了,保障线程资源安全
    执行完源序列的响应,会把任务保存进队列

    public func schedule<StateType>(_ state: StateType, action: ) -> Disposable {
        // 上面的流程就省略了
        let existingQueue = CurrentThreadScheduler.queue
    
        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }
    
        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)
    
        return scheduledItem
    }
    

    把任务和状态封装成了ScheduledItem,面向对象,更容易传输&执行,把这个事务queue.value.enqueue(scheduledItem),排队进队列

    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
    
            CurrentThreadScheduler.isScheduleRequired = false
            let disposable = action(state)  
    
             // 判断当前队列情况,是否存在
            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }
            // 从队列去除任务
            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }
    
            return disposable
        }
       ...
    }
    

    流程任务执行action(state) 完毕之后,又会执行下面的流程
    判断当前队列情况,是否存在,从队列去除任务 : queue.value.dequeue()
    latest.invoke()

    func invoke() {
         self._disposable.setDisposable(self._action(self._state))
    }
    

    就是原来响应回来时候保存的 action执行,只不过加了销毁的机制,这个时候我们的流程就会由原来的 源序列 流进 ObserveOnSink,保障了在 ObserveOnSink 的调度环境是有序的进队的:self._queue.enqueue(event)
    执行self._scheduler.scheduleRecursive((), action: self.run)

    override func onCore(_ event: Event<Element>) {
        let shouldStart = self._lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)
        }
        if shouldStart {
            self._scheduleDisposable.disposable = 
            self._scheduler.scheduleRecursive((), action: self.run)
        }
    }
    

    这里的手法是非常重要的:毕竟并发队列很可能存在

    func run(_ state: (), _ recurse: (()) -> Void) {
        // 加锁获取观察者,很队列任务
        let (nextEvent, observer) = self._lock.calculateLocked { 
            if !self._queue.isEmpty {
                return (self._queue.dequeue(), self._observer)
            }
        }
       
        // 观察者发送响应
        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
    }
    

    加锁获取观察者,很队列任务 : (self._queue.dequeue(), self._observer)
    观察者发送响应: observer.on(nextEvent)

    总结:

    整个流程是比较复杂
    源序列包装
    内部序列创建
    调度环境&观察者传递准备
    源序列订阅 - 根据调度环境调度 - 流程流到观察者就是我们中间内部序列的Sink
    Sink 调度执行 响应发给观察者
    由观察者响应 订阅事件event

    就是两层序列订阅响应,我的第二层的 sink 就是源序列的观察者

    --有人觉得简单到令人发指,却令有些人发脱,献给一个姓Co的人。

    相关文章

      网友评论

        本文标题:RxSwift6.0调度者-Scheduler

        本文链接:https://www.haomeiwen.com/subject/jmqvxhtx.html