美文网首页ReactiveX
RxSwift(7)-调度者

RxSwift(7)-调度者

作者: xxxxxxxx_123 | 来源:发表于2020-04-23 19:22 被阅读0次

前言

scheduler,也称为调度者,作为RxSwift的四大核心之一,是RxSwift实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

下面我们先来看一个例子:

func schedulerTest01() {
    DispatchQueue.global().async {
        self.mButton.rx.tap.subscribe { (element) in
            print("==currentThread==\(Thread.current)==")
        }.disposed(by: self.disposeBag)
    }
}

控制台输出:

==currentThread==<NSThread: 0x600002c44000>{number = 1, name = main}==

如果没有RxSwift,我们在子线程操作UI就会出错。而此处我们可以看到,虽然我们在子线程订阅了信号,但是还是在主线程输出了。那么在这其中,调度者做了什么?

extension Reactive where Base: UIButton {
    
    /// Reactive wrapper for `TouchUpInside` control event.
    public var tap: ControlEvent<Void> {
        return controlEvent(.touchUpInside)
    }
}

extension Reactive where Base: UIControl {
    
    public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
        let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
                // 线程判断 不是主线程就报错
                MainScheduler.ensureRunningOnMainThread()

                ......
            }
            .takeUntil(deallocated)

        return ControlEvent(events: source)
    }
}

可以看出在Observable.create的闭包里就已经做了主线程判断,也就是说在create的时候,就已经把线程切换到了主线程。根据我们之前分析的事件流程可知,当代码执行到create的闭包里面,也就是观察者发送的事件的时候,是在订阅观察事件之后,所以我们可以大胆猜测,切换线程的操作是在订阅subscribe方法中,订阅方法最终会是ControlEvent的重写的订阅方法。

public struct ControlEvent<PropertyType> : ControlEventType {
    public typealias Element = PropertyType

    let _events: Observable<PropertyType>

    public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
        self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
        // subscribeOn就是给序列指定运行的线程
    }

    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return self._events.subscribe(observer)
    }
}

public final class ConcurrentMainScheduler : SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date

    private let _mainScheduler: MainScheduler
    private let _mainQueue: DispatchQueue

    private init(mainScheduler: MainScheduler) {
        self._mainQueue = DispatchQueue.main
        self._mainScheduler = mainScheduler
    }

    public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
}


public final class MainScheduler : SerialDispatchQueueScheduler {
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }

    public static let instance = MainScheduler()
}

ControlEvent在初始化的时候调用events.subscribeOn(ConcurrentMainScheduler.instance)方法,而ConcurrentMainScheduler.instance就是在主队列操作。

MainScheduler

MainScheduler,主线程调度者,它抽取了需要在DispatchQueue.main上执行的任务,通常用于UI操作。是SerialDispatchQueueScheduler的一种特殊化。如果一旦从主队列调用了schedule方法,它将立即执行操作而不进行调度。

另外,MainScheduler是为observeOn运算符优化的。如果要使用subscribeOn在主线程上订阅可观察的序列,建议使用ConcurrentMainScheduler

public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    let numberEnqueued = AtomicInt(0)

    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }

    public static let instance = MainScheduler()

    public static let asyncInstance = SerialDispatchQueueScheduler(serialQueue: DispatchQueue.main)

    public class func ensureExecutingOnScheduler(errorMessage: String? = nil) {
        if !DispatchQueue.isMain {
            rxFatalError(errorMessage ?? "Executing on background thread. Please use `MainScheduler.instance.schedule` to schedule work on main thread.")
        }
    }

    /// In case this method is running on a background thread it will throw an exception.
    public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
        #if !os(Linux) // isMainThread is not implemented in Linux Foundation
            guard Thread.isMainThread else {
                rxFatalError(errorMessage ?? "Running on background thread.")
            }
        #endif
    }

    override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let previousNumberEnqueued = increment(self.numberEnqueued)

        if DispatchQueue.isMain && previousNumberEnqueued == 0 {
            let disposable = action(state)
            decrement(self.numberEnqueued)
            return disposable
        }

        let cancel = SingleAssignmentDisposable()

        self._mainQueue.async {
            if !cancel.isDisposed {
                _ = action(state)
            }

            decrement(self.numberEnqueued)
        }

        return cancel
    }
}

CurrentThreadScheduler

当前线程调度者,是生成事件元素的默认调度者。

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    public static let instance = CurrentThreadScheduler()

    ......

    // 获取当前线程的队列
    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// 该值用来判断调用者是否必须调用`schedule`方法。
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }

    ......
}

CurrentThreadSchedulerqueue属性就是当前线程的队列。RxSwift给系统的Thread添加了两个扩展方法,用来存储和获取当前线程队列。

extension Thread {
        static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary

            if let newValue = value {
                threadDictionary[key] = newValue
            }
            else {
                threadDictionary[key] = nil
            }
        }

        static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary
            
            return threadDictionary[key] as? T
        }
}

SerialDispatchQueueScheduler

串行队列调度者。SerialDispatchQueueScheduler会抽取出需要在特定的队列上执行的工作。而且它能够确保即使是并发队列,也会被转换为串行队列。也就是说不管任务在外部是以何种形式执行的,到这里来都会转为串行执行。

public class SerialDispatchQueueScheduler : SchedulerType {
   
    let configuration: DispatchQueueConfiguration
    
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }
    
    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
    
    public convenience init(queue: DispatchQueue, internalSerialQueueName: String, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let serialQueue = DispatchQueue(label: internalSerialQueueName,
                                        attributes: [],
                                        target: queue)
        self.init(serialQueue: serialQueue, leeway: leeway)
    }
   
    public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue.global(qos: qos.qosClass), internalSerialQueueName: internalSerialQueueName, leeway: leeway)
    }
}

从代码可以看出来串行队列调度者的初始化方法要么就是外界传入一个队列,要么就是自己创建一个串行队列。

ConcurrentDispatchQueueScheduler

并发队列调度者,一般适用于需要在后台执行某些任务。可以传递一个串行调度队列。

public class ConcurrentDispatchQueueScheduler: SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    public var now : Date {
        return Date()
    }

    let configuration: DispatchQueueConfiguration
    
    public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
    }
    
    public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway
        )
    }
}

可以看出并发队列调度者和串行队列调度者的逻辑类似,初始化的时候要么外界传入一个队列,要么就是自己创建一个并发队列。

OperationQueueScheduler

操作队列调度者,是对NSOperationQueue的封装。可以通过设置 maxConcurrentOperationCount来控制同时执行并发任务的最大数量。

public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public let queuePriority: Operation.QueuePriority
    
    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
    
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        let operation = BlockOperation {
            if cancel.isDisposed {
                return
            }
            // 回调block
            cancel.setDisposable(action(state))
        }

        operation.queuePriority = self.queuePriority
        self.operationQueue.addOperation(operation)
        
        return cancel
    }
}

observeOn

observeOn用于指定可观察序列发出通知的调度者。默认情况下,Observable的创建,应用操作符以及发出通知都会在subscribe方法调用的scheduler执行,observeOn将指定一个scheduler来让Observable通知观察者。

需要注意的是,一旦产生了onError事件,observeOn操作符将立即转发。它不会等待onError之前的事件全部被收到。这意味着onError事件可能会跳过一些元素提前发送出去。

串行队列调度者observeOn

func serialObserveOnTest() {
    Observable.of(1, 2, 3, 4, 5)
        .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "serialObserveOn"))
        .subscribe { (element) in
            print("==\(element)==\(Thread.current)==")
    }.disposed(by: disposeBag)
}

控制台输出:

==next(1)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(2)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(3)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(4)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(5)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==completed==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==

下面我们来具体分析一下其流程:

  1. 首先调用of生成可观察序列的时候会将当前线程调度者作为参数传入
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
    return ObservableSequence(elements: elements, scheduler: scheduler)
}
  1. 将元素和当前线程调度者保存在ObservableSequence中:
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }
}
  1. 调用observeOn方法的时候,会将我们创建的串行队列调度者传入ObserveOnSerialDispatchQueue中保存。
public func observeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    // scheduler: SerialDispatchQueueScheduler
    // self = <ObservableSequence<Array<Int>>
    return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}

final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    
    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
    }
}
  1. 在调用subscribe方法的时候就会进行一系列的判断,首先是ObserveOnSerialDispatchQueue.subscribe,然后会转交给ObservableSequence.subscribe
// ObserveOnSerialDispatchQueue
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    // self.scheduler = SerialDispatchQueueScheduler
    // observer = AnonymousObserver
    // self.source = ObservableSequence
    let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}

// ObservableSequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    // observer = ObserveOnSerialDispatchQueueSink
    let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}
  1. 调度者的处理
// ObserveOnSerialDispatchQueueSink
override func onCore(_ event: Event<Element>) {
    // self = ObserveOnSerialDispatchQueueSink
    // self.scheduler = SerialDispatchQueueScheduler
    _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}

public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.scheduleInternal(state, action: action)
}

func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}
  1. 在队列中异步执行保存的block,终极回调
// DispatchQueueConfiguration
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()
    
    // 串行队列异步执行
    self.queue.async {
        if cancel.isDisposed {
            return
        }
        cancel.setDisposable(action(state))
    }
    return cancel
}
  1. 执行回调的block,观察者发送事件。
// ObserveOnSerialDispatchQueueSink

self.cachedScheduleLambda = { pair in
    guard !cancel.isDisposed else { return Disposables.create() }
    // 发送事件
    // pair.sink = ObserveOnSerialDispatchQueueSink
    // pair.sink.observer = AnonymousObserver
    pair.sink.observer.on(pair.event)

    if pair.event.isStopEvent {
        pair.sink.dispose()
    }

    return Disposables.create()
}

并发队列调度者observeOn

func concurrentObserveOnTest() {
    let ob = Observable.of(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
    21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 
    41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 
    61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 
    81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
    ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
    .subscribe { (element) in
        print("==\(element)==\(Thread.current)==")
    }.disposed(by: disposeBag)
}

控制台输出:

// 输出较长 此处就不一一罗列
.....
==next(66)==<NSThread: 0x600002454140>{number = 6, name = (null)}==
==next(67)==<NSThread: 0x60000245f400>{number = 7, name = (null)}==
==next(68)==<NSThread: 0x600002456200>{number = 8, name = (null)}==
......

可以看到的是,虽然是并发队列,也存在多个子线程,但是我们发出的事件是按顺序执行的。

源码如下:

public func observeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
  1. 和串行队列不同的是,并发队列调用ObserveOn方法返回的是ObserveOn对象,此时保存的是schedulerConcurrentDispatchQueueSchedulersourceObservableSequence
final private class ObserveOn<Element>: Producer<Element> {
    let scheduler: ImmediateSchedulerType
    let source: Observable<Element>
    
    init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
        self.scheduler = scheduler
        self.source = source
    }
}
  1. 调用订阅方法的时候就会执行ObserveOnon方法,此时会再次调用ObservableSequencesubscribe,也就是Producer的方法。
// ObserveOn
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}
  1. 调用ObservableSequencerun方法:
// ObservableSequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}

// ObservableSequenceSink
func run() -> Disposable {
    // self._parent = ObservableSequence
    // self._parent._scheduler = CurrentThreadScheduler 
    // of函数默认的scheduler
    return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
        var mutableIterator = iterator
        if let next = mutableIterator.next() {
            self.forwardOn(.next(next))
            recurse(mutableIterator)
        }
        else {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}
  1. 队列的相关操作:
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
    recursiveScheduler.schedule(state)
        
    return Disposables.create(with: recursiveScheduler.dispose)
}

func schedule(_ state: State) {
        let d = self._scheduler.schedule(state) { state -> Disposable in
            ......
            
            if let action = action {
                action(state, self.schedule)
            }
            
            ......
        }
        ......
    }
  1. 在调用action的时候,会进入ObservableSequenceSinkrun方法的闭包执行forwardOn方法,接着调用SinkforwardOn方法,接着ObserverBaseon,接着ObserveOnSinkonCore方法。此时会把所有的事件都装入_queue中,而在条件允许的情况就会调用scheduleRecursive方法,并将ObserveOnSink.run作为闭包传入,一旦执行到这个闭包,就会把_queue中的事件发出,然后接着递归。
final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    .....
    
    override func onCore(_ event: Event<Element>) {
        let shouldStart = self._lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)

            switch self._state {
            case .stopped:
                self._state = .running
                return true
            case .running:
                return false
            }
        }

        if shouldStart {
            self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
    }

    func run(_ state: (), _ recurse: (()) -> Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
            if !self._queue.isEmpty {
                // 取出一个事件
                return (self._queue.dequeue(), self._observer)
            }
        }

        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            // 观察者发送事件
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
            
        ......

        let shouldContinue = self._shouldContinue_synchronized()

        if shouldContinue {
            // 接着递归
            recurse(())
        }
    }
    ......
}

_queueRxSwift自己封装的一个队列Queue<Event<Element>>(capacity: 10),由于队列先进先出的特性,所以虽然是并发事件,入队出队都有锁的保护,所以会按顺序入队、按顺序出队,只是入队和出队的操作是并发的,比如可以在4入队的时候1出队。

可以看出串行队列和并发队列还是有很多相似点的。

subscribeOn

subscribeOn操作符指定Observable在哪个scheduler开始执行。默认情况下,Observable创建,应用操作符以及发出通知都会在subscribe方法调用的scheduler执行。subscribeOn可以指定一个不同的scheduler来让Observable执行。

源码如下:

public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}

可以看到SubscribeOn是继承自Producer类的,这也和我们之前分析过的流程一样。经过订阅方法之后,会调用SubscribeOn.on方法,然后会走到SubscribeOnSink.run()方法。

final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

func run() -> Disposable {
    let disposeEverything = SerialDisposable()
    let cancelSchedule = SingleAssignmentDisposable()
        
    disposeEverything.disposable = cancelSchedule
        
    let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
        // 订阅
        let subscription = self.parent.source.subscribe(self)
        disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
        return Disposables.create()
    }

    cancelSchedule.setDisposable(disposeSchedule)
    return disposeEverything
}

self.parent.scheduler.schedule(()){}就是线程调度的方法,最终会到DispatchQueueConfiguration.schedule()

extension DispatchQueueConfiguration {
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        self.queue.async {
            if cancel.isDisposed {
                return
            }
            // 回调
            cancel.setDisposable(action(state))
        }

        return cancel
    }
}

后面的逻辑就和ObserveOn的逻辑类似,这里就不赘述了。

总结

RxSwift的调度者就是其实现多线程的核心模块,是对GCDOperationQueue的封装,主要用于控制任务在哪个线程或队列运行。其串行、并发队列的流程大致相同:

  • 针对源序列进行处理、订阅
  • 调度环境、观察者传值、保存
  • 转移到子序列订阅、调度
  • 队列异步回调,观察者响应、发送事件

此处附上一张总结的串行ObserveOn流程图:

SerialDispatchQueueScheduler&observeOn流程.png

相关文章

网友评论

    本文标题:RxSwift(7)-调度者

    本文链接:https://www.haomeiwen.com/subject/rxhzihtx.html