美文网首页iOS基础篇
RxSwift调度者scheduler

RxSwift调度者scheduler

作者: yahibo | 来源:发表于2019-08-02 18:46 被阅读0次

    RxSwift的核心非常简单,无非就是以下四点:

    • 可观察序列 Observalbe
    • 观察者 Observer
    • 调度者 Scheduler
    • 销毁者 Dispose

    可观察序列、观察者,在《RxSwift核心源码探索》中有讲,下面就来看看调度者Scheduler具体做了哪些事情。调度者scheduler主要用于控制任务在哪个线程或队列运行,而scheduler是对GCD的封装,GCD我们很熟悉,通过GCD创建队列,开启线程,开发中所有动作都等价于任务+队列

    任务:

    • 异步任务
    • 同步任务

    队列:

    • 主队列
    • 全局队列
    • 并行队列
    • 串行队列

    参见:《GCD部分总结》

    scheduler中调度队列如下:

    • MainScheduler主线程,与UI相关的任务均在该线程下执行
    • SerialDispatchQueueScheduler相当于GCD对应的串行队列
    • ConcurrentDispatchQueueScheduler相当于GCD并行队列
    • OperationQueueScheduler相当于NSOperationQueue管理者可以设置并发数
    • CurrentThreadScheduler-当前线程
    scheduler.png

    以上几种类型,通过代码能够发现实际上就是对GCD队列创建的封装,以及Operation的封装。

    1、MainScheduler

    表示为主线程。开发中需要执行一些和UI相关的任务,则需要我们切换到该Scheduler上执行。点击进入MainScheduler类,如下:

    public final class MainScheduler : SerialDispatchQueueScheduler {
    
        private let _mainQueue: DispatchQueue
    
        var numberEnqueued = AtomicInt(0)
    
        /// Initializes new instance of `MainScheduler`.
        public init() {
            self._mainQueue = DispatchQueue.main
            super.init(serialQueue: self._mainQueue)
        }
    }
    

    MainScheduler继承了SerialDispatchQueueScheduler串行队列类,当然这里不难理解,因为主队列就是一个特殊的串行队列。在该类中,能够清楚的看到,在初始化对象时,确定了队列类型为主队列self._mainQueue = DispatchQueue.main

    2、SerialDispatchQueueScheduler

    串行队列,有需要串行执行的任务,都需要切换至该scheduler下。

    public class SerialDispatchQueueScheduler : SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
        
        /// - returns: Current time.
        public var now : Date {
            return Date()
        }
    
        let configuration: DispatchQueueConfiguration
        
        /**
        Constructs new `SerialDispatchQueueScheduler` that wraps `serialQueue`.
    
        - parameter serialQueue: Target dispatch queue.
        - parameter leeway: The amount of time, in nanoseconds, that the system will defer the timer.
        */
        init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
        }
    
        public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
            serialQueueConfiguration?(queue)
            self.init(serialQueue: queue, leeway: leeway)
        }
    }
    

    同样初始化的时候,通过DispatchQueue对attributes属性置空操作设定了队列为串行队列。

    3、ConcurrentDispatchQueueScheduler

    并行队列,如下载任务,我们需要多个任务同时进行,则需要切换到当前scheduler

    public class ConcurrentDispatchQueueScheduler: SchedulerType {
        public typealias TimeInterval = Foundation.TimeInterval
        public typealias Time = Date
        
        public var now : Date {
            return Date()
        }
    
        let configuration: DispatchQueueConfiguration
         
        public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
        }
        @available(iOS 8, OSX 10.10, *)
        public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.init(queue: DispatchQueue(
                label: "rxswift.queue.\(qos)",
                qos: qos,
                attributes: [DispatchQueue.Attributes.concurrent],
                target: nil),
                leeway: leeway
            )
        }
    }
    

    和我们的串行队列的配置方法参数,一样,不同的是对队列类型attributes的设置,该处置空为串行,concurrent为并行。

    4、OperationQueueScheduler

    用来获取当前线程,看到名称我们应该就能猜到该类就是对OperationQueue的封装。

    public class OperationQueueScheduler: ImmediateSchedulerType {
        public let operationQueue: OperationQueue
        public let queuePriority: Operation.QueuePriority
    
        public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
            self.operationQueue = operationQueue
            self.queuePriority = queuePriority
        }
    }
    

    5、CurrentThreadScheduler

    表示当前线程,默认就在当前调度上。

    public class CurrentThreadScheduler : ImmediateSchedulerType {
        typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
    
        /// The singleton instance of the current thread scheduler.
        public static let instance = CurrentThreadScheduler()
    
        private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
            let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
            defer {
    #if swift(>=4.1)
                key.deallocate()
    #else
                key.deallocate(capacity: 1)
    #endif
            }
    
        static var queue : ScheduleQueue? {
            get {
                return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
            }
            set {
                Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
            }
        }
    
        /// Gets a value that indicates whether the caller must call a `schedule` method.
        public static fileprivate(set) var isScheduleRequired: Bool {
            get {
                return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
            }
            set(isScheduleRequired) {
                if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                    rxFatalError("pthread_setspecific failed")
                }
            }
        }
    }
    
    • isScheduleRequired:用来表示是否必须调用schedule方法
    • 通过queue方法的set方法将当前你线程绑定到相应的key上,get方法通过key获取当前线程,queue是在schedule<StateType>中调用的

    schedule方法:

    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false
    
            let disposable = action(state)
    
            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }
    
            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }
    
            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }
    
            return disposable
        }
    
        let existingQueue = CurrentThreadScheduler.queue
    
        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }
    
        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)
    
        return scheduledItem
    }
    

    通过Thread.setThreadLocalStorageValue方法看一下内部做了哪些工作,代码如下:

    extension Thread {
        static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary
    
            if let newValue = value {
                threadDictionary[key] = newValue
            }
            else {
                threadDictionary[key] = nil
            }
        }
    
        static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
            let currentThread = Thread.current
            let threadDictionary = currentThread.threadDictionary
            
            return threadDictionary[key] as? T
        }
    }
    
    • threadDictionary一个可变类型的字典,
    • setThreadLocalStorageValue绑定当前线程到key
    • getThreadLocalStorageValueForKey通过key获取绑定的线程

    调度者scheduler使用

    模拟一个异步线程处理数据,完成后在主线程展示。

    GCD实现:

    DispatchQueue.init(label: "label",qos: .default,attributes:.concurrent).async {
        var num = 0
        for i in 0...100{
            num += I
        }
        DispatchQueue.main.sync {
            print("num:\(num)  \(Thread.current)")
        }
    }
    
    • DispatchQueue.init:初始化一个队列
    • label:队列标识
    • qos:设置队列优先级
    • DispatchQueue.main.sync:返回主线程
    • attributes:设置队列类型,不设置默认为串行

    RxSwift实现:

    Observable<Any>.create { (observer) -> Disposable in
            var num = 0
            for i in 0...100{
                num += I
            }
            observer.onNext(num)
            return Disposables.create()
        }
        .subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "yahibo"))
        .observeOn(MainScheduler.instance)
        .subscribe(onNext: { (val) in
            print(val)
        }).disposed(by: disposeBag)
    
    • 创建一个序列,在内部处理耗时操作
    • subscribeOn:决定序列的构造函数在哪个Scheduler上运行,使用SerialDispatchQueueScheduler设置为串行队列
    • ObserverOn:决定在哪个Scheduler上监听序列,使用MainScheduler设置为主线程队列中观察

    还是我们熟悉的编码方式,通过点语法来设置序列所在线程。

    无论是对序列元素的观察,UI绑定,还是多线程,在RxSwift中,统一处理成这种链式的形式,函数与函数之间没有强依赖性,使用灵活,降低了编码的复杂度。

    相关文章

      网友评论

        本文标题:RxSwift调度者scheduler

        本文链接:https://www.haomeiwen.com/subject/ilkrdctx.html