RxSwift-Scheduler

作者: Code_人生 | 来源:发表于2019-08-02 16:57 被阅读22次
            let ob = Observable.of(1,2,3,4,5)
            ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
                .subscribe {
                    print("observeOn",$0,Thread.current)
                }
            .disposed(by: disposeBag)
    
    步骤1、代码改写

    1.1 ob -> ObservableSequence<Array<Int>>
    1.2 obOn -> ObserveOn<Int>
    1.3 obOnSubscribe -> SinkDisposer

            let ob = Observable.of(1,2,3,4,5)
            let obOn = ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
            let obOnSubscribe = obOn.subscribe {
                print("observeOn",$0,Thread.current)
            }
            let obOnSubscribeDisposed = obOnSubscribe.disposed(by: disposeBag)
    
    步骤2、点击observeOn

    2.1 ObservableSequence继承Producer,所以点击observeOn来到ObservableType协议的observeOn方法,如下
    2.2 scheduler -> ConcurrentDispatchQueueScheduler
    2.3 self.asObservable() -> ObservableSequence<Array<Int>>

    extension ObservableType {
        public func observeOn(_ scheduler: ImmediateSchedulerType)
            -> Observable<Element> {
                if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                    return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
                }
                else {
                    return ObserveOn(source: self.asObservable(), scheduler: scheduler)
                }
        }
    }
    
    步骤3、点击ObserveOn(source: self.asObservable(), scheduler: scheduler)

    3.1 ObserveOn 继承 Producer
    3.2 初始化 ObserveOn
    3.3 self.scheduler = scheduler 保存传进来的 ConcurrentDispatchQueueScheduler 2.2
    3.4 self.source = source 保存传进来的 self.asObservable() 即 ObservableSequence<Array<Int>> 2.3

    final private class ObserveOn<Element>: Producer<Element> {
        let scheduler: ImmediateSchedulerType
        let source: Observable<Element>
    
        init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
            self.scheduler = scheduler
            self.source = source
    
    #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    #endif
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
            let subscription = self.source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    
    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
    }
    
    步骤4、外界.subscribe(on) 即ObserveOn.subscribe(on)

    4.1 保存AnonymousObserver

    extension ObservableType {
        public func subscribe(_ on: @escaping (Event<Element>) -> Void)
            -> Disposable {
                let observer = AnonymousObserver { e in
                    on(e)
                }
                return self.asObservable().subscribe(observer)
        }
    }
    
    步骤5、 来到Producer 的 subscribe方法

    5.1 CurrentThreadScheduler.isScheduleRequired 为 true,来到else ,执行CurrentThreadScheduler.instance.schedule 来到步骤6
    5.2 去到步骤3中的run方法,ObservableSequence调用subscribe,把ObserveOnSink作为观察者传入进去,来到5.3
    5.2 进入!CurrentThreadScheduler.isScheduleRequired 中,此时的self是 ObservableSequence

    class Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            rxAbstractMethod()
        }
    }
    
    步骤6 点击 schedule

    6.1 schedule 方法
    6.1.1 CurrentThreadScheduler.isScheduleRequired = false 置为false
    6.1.2 let disposable = action(state) 执行闭包 回到5.1

    public class CurrentThreadScheduler : ImmediateSchedulerType {
        typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
    
        /// The singleton instance of the current thread scheduler.
        public static let instance = CurrentThreadScheduler()
    
        private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
            let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
            defer { key.deallocate() }
                                                                   
            guard pthread_key_create(key, nil) == 0 else {
                rxFatalError("isScheduleRequired key creation failed")
            }
    
            return key.pointee
        }()
    
        private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
            return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
        }()
    
        static var queue : ScheduleQueue? {
            get {
                return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
            }
            set {
                Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
            }
        }
    
        /// Gets a value that indicates whether the caller must call a `schedule` method.
        public static fileprivate(set) var isScheduleRequired: Bool {
            get {
                return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
            }
            set(isScheduleRequired) {
                if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                    rxFatalError("pthread_setspecific failed")
                }
            }
        }
    
        public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            if CurrentThreadScheduler.isScheduleRequired {
                CurrentThreadScheduler.isScheduleRequired = false
    
                let disposable = action(state)
    
                defer {
                    CurrentThreadScheduler.isScheduleRequired = true
                    CurrentThreadScheduler.queue = nil
                }
    
                guard let queue = CurrentThreadScheduler.queue else {
                    return disposable
                }
    
                while let latest = queue.value.dequeue() {
                    if latest.isDisposed {
                        continue
                    }
                    latest.invoke()
                }
    
                return disposable
            }
    
            let existingQueue = CurrentThreadScheduler.queue
    
            let queue: RxMutableBox<Queue<ScheduledItemType>>
            if let existingQueue = existingQueue {
                queue = existingQueue
            }
            else {
                queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
                CurrentThreadScheduler.queue = queue
            }
    
            let scheduledItem = ScheduledItem(action: action, state: state)
            queue.value.enqueue(scheduledItem)
    
            return scheduledItem
        }
    }
    
    
    步骤6 来到 ObservableSequence的run 方法

    6.1 run方法中的observer -> <ObserveOnSink<AnonymousObserver<Int>>

    final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
        fileprivate let _elements: Sequence
        fileprivate let _scheduler: ImmediateSchedulerType
    
        init(elements: Sequence, scheduler: ImmediateSchedulerType) {
            self._elements = elements
            self._scheduler = scheduler
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
    
    步骤7 来到ObservableSequenceSink的 run方法

    7.1 self._parent -> ObservableSequence
    7.2 self._parent._scheduler -> CurrentThreadScheduler
    7.3 self._parent._element -> (1,2,3,4,5)
    7.4 CurrentThreadScheduler类继承ImmediateSchedulerType协议
    7.4 self._parent._scheduler.scheduleRecursive -> CurrentThreadScheduler.scheduleRecursive 来到 步骤8

    final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
        typealias Parent = ObservableSequence<Sequence>
    
        private let _parent: Parent
    
        init(parent: Parent, observer: Observer, cancel: Cancelable) {
            self._parent = parent
            super.init(observer: observer, cancel: cancel)
        }
    
        func run() -> Disposable {
            return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
                var mutableIterator = iterator
                if let next = mutableIterator.next() {
                    self.forwardOn(.next(next))
                    recurse(mutableIterator)
                }
                else {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
        }
    }
    
    步骤8 点击scheduleRecursive

    8.1 self -> CurrentThreadScheduler
    8.2 recursiveScheduler -> RecursiveImmediateScheduler
    8.3 初始化 RecursiveImmediateScheduler
    8.4 执行 schedule 来到 步骤9

    extension ImmediateSchedulerType {
        public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
            let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
            
            recursiveScheduler.schedule(state)
            
            return Disposables.create(with: recursiveScheduler.dispose)
        }
    }
    
    步骤9 点击RecursiveImmediateScheduler

    9.1 let d = self._scheduler.schedule(state),self._scheduler是CurrentThreadScheduler,即CurrentThreadScheduler执行schedule,来到步骤6.1

    final class RecursiveImmediateScheduler<State> {
        typealias Action =  (_ state: State, _ recurse: (State) -> Void) -> Void
        private var _lock = SpinLock()
        private let _group = CompositeDisposable()
        private var _action: Action?
        private let _scheduler: ImmediateSchedulerType
        init(action: @escaping Action, scheduler: ImmediateSchedulerType) {
            self._action = action
            self._scheduler = scheduler
        }
        func schedule(_ state: State) {
            var scheduleState: ScheduleState = .initial
            let d = self._scheduler.schedule(state) { state -> Disposable in
                // best effort
                if self._group.isDisposed {
                    return Disposables.create()
                }            
                let action = self._lock.calculateLocked { () -> Action? in
                    switch scheduleState {
                    case let .added(removeKey):
                        self._group.remove(for: removeKey)
                    case .initial:
                        break
                    case .done:
                        break
                    }
                    scheduleState = .done
                    return self._action
                }
                if let action = action {
                    action(state, self.schedule)
                }
                return Disposables.create()
            }
            self._lock.performLocked {
                switch scheduleState {
                case .added:
                    rxFatalError("Invalid state")
                case .initial:
                    if let removeKey = self._group.insert(d) {
                        scheduleState = .added(removeKey)
                    }
                    else {
                        scheduleState = .done
                    }
                case .done:
                    break
                }
            }
        }
        func dispose() {
            self._lock.performLocked {
                self._action = nil
            }
            self._group.dispose()
        }
    }
    
    步骤10 self.forwardOn(.next(next)) 此时的观察者是ObserveOnSink

    10.1 来到 ObserveOnSink 的 onCore
    10.2 self._scheduler -> ConcurrentDispatchQueueScheduler,ConcurrentDispatchQueueScheduler类继承SchedulerType协议,SchedulerType协议继承ImmediateSchedulerType协议

    final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
        typealias Element = Observer.Element 
    
        let _scheduler: ImmediateSchedulerType
    
        var _lock = SpinLock()
        let _observer: Observer
    
        // state
        var _state = ObserveOnState.stopped
        var _queue = Queue<Event<Element>>(capacity: 10)
    
        let _scheduleDisposable = SerialDisposable()
        let _cancel: Cancelable
    
        init(scheduler: ImmediateSchedulerType, observer: Observer, cancel: Cancelable) {
            self._scheduler = scheduler
            self._observer = observer
            self._cancel = cancel
        }
    
        override func onCore(_ event: Event<Element>) {
            let shouldStart = self._lock.calculateLocked { () -> Bool in
                self._queue.enqueue(event)
    
                switch self._state {
                case .stopped:
                    self._state = .running
                    return true
                case .running:
                    return false
                }
            }
    
            if shouldStart {
                self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
            }
        }
    
        func run(_ state: (), _ recurse: (()) -> Void) {
            let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
                if !self._queue.isEmpty {
                    return (self._queue.dequeue(), self._observer)
                }
                else {
                    self._state = .stopped
                    return (nil, self._observer)
                }
            }
    
            if let nextEvent = nextEvent, !self._cancel.isDisposed {
                observer.on(nextEvent)
                if nextEvent.isStopEvent {
                    self.dispose()
                }
            }
            else {
                return
            }
    
            let shouldContinue = self._shouldContinue_synchronized()
    
            if shouldContinue {
                recurse(())
            }
        }
    
        func _shouldContinue_synchronized() -> Bool {
            self._lock.lock(); defer { self._lock.unlock() } // {
                if !self._queue.isEmpty {
                    return true
                }
                else {
                    self._state = .stopped
                    return false
                }
            // }
        }
    
        override func dispose() {
            super.dispose()
    
            self._cancel.dispose()
            self._scheduleDisposable.dispose()
        }
    }
    

    有点晕了,先放一放

    相关文章

      网友评论

        本文标题:RxSwift-Scheduler

        本文链接:https://www.haomeiwen.com/subject/kwrldctx.html