美文网首页
RxSwift中的Scheduler调度者(上)

RxSwift中的Scheduler调度者(上)

作者: 简_爱SimpleLove | 来源:发表于2019-08-18 02:16 被阅读0次

    初识

    为了初步认识调度者,我们先来看下面一段代码的打印:

            DispatchQueue.global().async {
                print("global --- \(Thread.current)")
               self.actionBtn.rx.tap
                .subscribe(onNext: { () in
                    // 订阅回来打印的线程在主线程,是因为订阅就是订阅在主队列的
                    print("tap --- \(Thread.current)")
                })
            }
            /*
             打印结果:
             global --- <NSThread: 0x600002696380>{number = 3, name = (null)}
             tap --- <NSThread: 0x6000026f9900>{number = 1, name = main}
             */
    

    从上面可以看出来,在全局并发线程number=3的线程中订阅的button点击事件,但是订阅回来的打印却在主线程。

    所以到底发生了什么呢?为什么订阅回来会回到主线程了呢?

    我们猜想RxSwift中肯定做了什么操作,才会使button点击的订阅事件回来在主线程了。于是我们点击tap进去:

    public var tap: ControlEvent<Void> {
            return controlEvent(.touchUpInside)
        }
    

    发现tap其实就是controlEvent的一个点击事件,然后再点击controlEvent进去:

    public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
        let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
            // 走这个代码的时候,就是发送事件的时候,上一个流程应该是订阅
                MainScheduler.ensureRunningOnMainThread()  // 确保在主线程,已经在主线程了
                guard let control = control else {
                    observer.on(.completed)
                    return Disposables.create()
                }
                let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
                    observer.on(.next(()))
                }
                return Disposables.create(with: controlTarget.dispose)
            }
            .takeUntil(deallocated)
        return ControlEvent(events: source)
    }
    

    发现里面有句关键代码MainScheduler.ensureRunningOnMainThread(),这句代码确保了当前环境是主线程环境(注意现在已经是主线了),如果不是主线程,就会抛出错误。
    因为这是用户交互的事件,也就是说是UI交互,所以必须要在主线程,可以理解。

    但是是在哪一步切换到主线程中去的呢?

    分析:通过前面的文章,我们已经知道了这个大括号里面,即序列创建的这个逃逸闭包里面是发送事件的流程,所以切换到主线程的操作,只可能是上一步订阅流程切换的(因为这里就是序列的创建,并没有在主线程中创建序列,所以只有可能是在订阅流程中切换到主线程的)。

    点击进入返回的结构体ControlEvent中:

        public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
            self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
        }
    
    • 发现在ControlEvent初始化的时候,它先让它的事件订阅在了主线程ConcurrentMainScheduler.instance,即是说,在events订阅上面的打印事件之前,先订阅了一个在主线程这个事件,然后返回的序列才订阅的上面打印事件。
    • 这里有两个关键词subscribeOnConcurrentMainScheduler
    • 其实除了subscribeOn,还有一个关键方法observeOn下一篇文章再具体讲。
    • 这篇文章我们先具体认识和ConcurrentMainScheduler一样的,一些关键类。

    ConcurrentMainScheduler

    并发主线程调度者:当一些操作需要在主线程执行,如果它的schedule是在主线程调用的,那它就会立即执行,而不需要调度。

    public final class ConcurrentMainScheduler : SchedulerType {
        private let _mainScheduler: MainScheduler
        private let _mainQueue: DispatchQueue
    
        private init(mainScheduler: MainScheduler) {
            self._mainQueue = DispatchQueue.main  // 主队列
            self._mainScheduler = mainScheduler
        }
        /// Singleton instance of `ConcurrentMainScheduler`
        public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
    }
    
    • 初始化的时候_mainQueue绑定了主队列
    • 传进来MainScheduler参数来初始化

    MainScheduler

    主线程调度者:主要用来处理UI相关的任务。

    public final class MainScheduler : SerialDispatchQueueScheduler {
        private let _mainQueue: DispatchQueue
        let numberEnqueued = AtomicInt(0)
        /// Initializes new instance of `MainScheduler`.
        public init() {
            self._mainQueue = DispatchQueue.main  // 绑定主队列
            super.init(serialQueue: self._mainQueue)
        }
        /// Singleton instance of `MainScheduler`
        public static let instance = MainScheduler()
    }
    
    • 绑定了主队列DispatchQueue.main
    • 主队列其实就是一种串行队列,继承自串行调度者SerialDispatchQueueScheduler

    SerialDispatchQueueScheduler

    串行调度者:封装了 GCD 的串行队列。在特定的队列执行,并且保证即使传进来的是一个并发队列,也会被转换成一个串行队列。

    public class SerialDispatchQueueScheduler : SchedulerType {
        let configuration: DispatchQueueConfiguration
        // leeway :延迟执行时间
        init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            // 初始化的时候,保存了一个串行队列
            self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
        }
        public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
            serialQueueConfiguration?(queue)
            self.init(serialQueue: queue, leeway: leeway)
        }
    }
    
    • 如果在使用前需要对其进行一些定制,内部串行队列可以使用serialQueueConfiguration回调来进行定制

    ConcurrentDispatchQueueScheduler

    串行调度者:封装了 GCD 的并发队列。用来执行一些并发任务。

    /// Abstracts the work that needs to be performed on a specific `dispatch_queue_t`. You can also pass a serial dispatch queue, it shouldn't cause any problems.
    /// This scheduler is suitable when some work needs to be performed in background.
    public class ConcurrentDispatchQueueScheduler: SchedulerType {
        let configuration: DispatchQueueConfiguration
        public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
        }
    }
    
    • 在特定的队列执行,并且即使传进来的是一个串行队列,也不会产生错误
    • 可以用来执行在后台进行的任务

    OperationQueueScheduler

    Operation调度者:封装了NSOperationQueue。有操作队列和操作优先级的属性

    /// Abstracts the work that needs to be performed on a specific `NSOperationQueue`.
    /// This scheduler is suitable for cases when there is some bigger chunk of work that needs to be performed in background and you want to fine tune concurrent processing using `maxConcurrentOperationCount`.
    public class OperationQueueScheduler: ImmediateSchedulerType {
        public let operationQueue: OperationQueue   // 封装的Operation
        public let queuePriority: Operation.QueuePriority  // 优先级
            /// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.
        /// - parameter operationQueue: Operation queue targeted to perform work on.
        /// - parameter queuePriority: Queue priority which will be assigned to new operations.
        public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
            self.operationQueue = operationQueue
            self.queuePriority = queuePriority
        }
    }
    
    • 用于在特定的NSOperationQueue上执行
    • 适合用于在后台处理大量任务的时候
    • 适合用于设置最大并发数

    CurrentThreadScheduler

    当前线程调度者:表示当前线程的调度者,默认使用这个(当很多时候参数里面没有传调度者的时候,给的默认值就是这个

    public class CurrentThreadScheduler : ImmediateSchedulerType {
        typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
        public static let instance = CurrentThreadScheduler()
        // isScheduleRequiredKey 就是一个默认的key值,字符串
        private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
            let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
            defer { key.deallocate() }
            guard pthread_key_create(key, nil) == 0 else {
                rxFatalError("isScheduleRequired key creation failed")
            }
            return key.pointee
        }()
        private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
            return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
        }()
        // 直接调CurrentThreadScheduler.queue方法,获取出来的,一般都是主线程
        static var queue : ScheduleQueue? {
            get {
                // CurrentThreadSchedulerQueueKey.instance 也是一个key值字符串
                // 里面是一个字典,获取这个key对应的value
                return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
            }
            set {
                // 将newValue和key,对应起来,存储在字典里
                Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
            }
        }
        /// Gets a value that indicates whether the caller must call a `schedule` method.
        // 判断当前线程是否需要调度
        public static fileprivate(set) var isScheduleRequired: Bool {
            get {
                // 判断key是否为nil,如果是nil,说明没有被调度过
                return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
            }
            set(isScheduleRequired) {
                // 三目运算,isScheduleRequired为false的时候,就是取后面的那个
                // 也就是给isScheduleRequiredKey赋值
                if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                    rxFatalError("pthread_setspecific failed")
                }
            }
        }
    }
    
    • 声明了一个默认的key值字符串isScheduleRequiredKey
    • 根据isScheduleRequiredKey是否有值,即isScheduleRequired,来判断当前线程是否被调度过
    • queueset,get方法中通过扩展Thread的方法,来获取当前线程,或者赋值一个新的线程

    Thread的扩展

        // 给Thread扩展了一个set和get方法
        extension Thread {
            static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
                let currentThread = Thread.current
                let threadDictionary = currentThread.threadDictionary
    
                if let newValue = value {
                    threadDictionary[key] = newValue // 赋值新的线程
                }
                else {
                    threadDictionary[key] = nil
                }
            }
            static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
                let currentThread = Thread.current // 获取当前线程
                let threadDictionary = currentThread.threadDictionary
                
                return threadDictionary[key] as? T
            }
        }
    

    关于上面介绍的类的继承关系图:

    继承关系图

    上面我们具体了解了一些关键的和重要的类,这些类里面其实有个重要的方法schedule,这个方法和调度密切相关。由于篇幅原因,我并没有粘贴出来。但是具体是怎么调度的呢,我们就下一篇文章根据流程来具体分析。

    相关文章

      网友评论

          本文标题:RxSwift中的Scheduler调度者(上)

          本文链接:https://www.haomeiwen.com/subject/ujlqsctx.html