美文网首页征服iOSiOS 开发每天分享优质文章iOS Developer
NSOperation 高级用法之NSOperation基础(N

NSOperation 高级用法之NSOperation基础(N

作者: abyte | 来源:发表于2017-10-27 02:15 被阅读337次

    这个文章是依照 WWDC 2015 Advanced NSOperations 而来的,主要讲解Operation(OC:NSOperation)的高级用法。
    本篇主要讲NSOperation的基础知识和NSOperation源码分析(Swift版)

    注:Swift的Operation、OperationQueue对应OC中的NSOperation、NSOperationQueue

    目录如下:

    • NSOperation 基础
      • NSOperationQueue 如何管理 NSOperation
    • NSOperationQueue调度的原理
      • _concurrencyGate、_underlyingQueue 和 queueGroup
      • _runOperation:任务执行的核心
      • Operation依赖机制的实现原理

    NSOperation 基础

    NSOperation 是iOS Foundation中描述任务(task)的抽象类。NSOperation自带强大的状态机,有PendingReadyExecutingFinishedCancelled
    通常我们需要继承NSOperation来满足我们需求。NSBlockOperation是官方实现好的子类,简单使用如下:

        NSBlockOperation *op1 = [NSBlockOperation blockOperationWithBlock:^{
            NSLog(@"op1");
        }];
        
        NSBlockOperation *op2 = [NSBlockOperation blockOperationWithBlock:^{
            NSLog(@"op2");
        }];
        //1.
        [op1 addDependency:op2];
        //2.
        [[NSOperationQueue mainQueue] addOperation:op1];
        [[NSOperationQueue mainQueue] addOperation:op2];
    

    以上程序将依次输出 op2 op1

    1. 是NSOperation的依赖机制,op1将依赖op2,也就是说op2执行结束后紧跟着执行op1

    2. 谈NSOperation就离不开NSOperationQueue,NSOperationQueue 是管理NSOperation 的队列,加入队列(queue)中的任务的管理权就交给NSOperationQueue了。

    那么问题来了,NSOperationQueue是怎么管理NSOperation的呢?

    NSOperationQueue 如何管理 NSOperation

    NSOperation 有5种状态

    image1.png

    5种状态转换如图。除了Finished其他状态都可以进入Cancelled。

    假设队列中有多个任务,Pending 表示即将进入Ready状态,第一个进入Ready状态的任务已经做好准备可以进入Executing状态,任务执行完毕后会从 Executing状态进入Finished,接着任务就会从队列中移除。

    ready.png

    绿色任务进入Ready状态

    remove.png

    任务执行完毕后从队列中移除

    NSOperation的依赖机制是当op2进入Finished状态,依赖于op2的op1进入Ready状态准备开始执行。
    由此很清楚了,NOperationQueue是得知任务当前状态的改变来实现任务的调度的,那么Foundation内部是如何实现的呢?

    NSOperationQueue调度的原理

    从调用addOperation开始,封装成数组交给addOperations,任务调度权就交给了operationQueue。执行任务和任务之间的依赖处理的主要方法就是_runOperation。

        open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
            var waitGroup: DispatchGroup?
            if wait {
                waitGroup = DispatchGroup()
            }
    #endif
            /*
             If QueuePriority was not supported this could be much faster
             since it would not need to have the extra book-keeping for managing a priority
             queue. However this implementation attempts to be similar to the specification.
             As a concequence this means that the dequeue may NOT nessicarly be the same as
             the enqueued operation in this callout. So once the dispatch_block is created
             the operation must NOT be touched; since it has nothing to do with the actual
             execution. The only differential is that the block enqueued to dispatch_async
             is balanced with the number of Operations enqueued to the OperationQueue.
             */
            lock.lock()
            ops.forEach { (operation: Operation) -> Void in
                operation._queue = self
                //调用_operations的insert就是按任务的优先级,放入不同的数组中。_operations类型为_OperationList,控制着任务的优先级。
                _operations.insert(operation)
            }
            lock.unlock()
            ops.forEach { (operation: Operation) -> Void in
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
                if let group = waitGroup {
                    group.enter()
                }
                //将Operation封装进block,与queueGroup关联,放到_underlyingQueue中执行。
                let block = DispatchWorkItem(flags: .enforceQoS) { () -> Void in
                    if let sema = self._concurrencyGate {
                        sema.wait()
                        self._runOperation()
                        sema.signal()
                    } else {
                        self._runOperation()
                    }
                    if let group = waitGroup {
                        group.leave()
                    }
                }
                _underlyingQueue.async(group: queueGroup, execute: block)
    #endif
            }
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
            if let group = waitGroup {
                group.wait()
            }
    #endif
        }
    

    _operations 是类型为_OperationList的结构体,内部有多个数组,分别对应着不同的优先级。

        var veryLow = [Operation]()
        var low = [Operation]()
        var normal = [Operation]()
        var high = [Operation]()
        var veryHigh = [Operation]()
        var all = [Operation]()
    

    insert方法的作用就是按任务的优先级,放入不同的任务优先级数组中。与insert相对应的dequeue是按照优先级由高到低从数组中取出任务。在接下来要介绍的_runOperation方法中将会用到dequeue来取出任务执行。

        mutating func dequeue() -> Operation? {
            if !veryHigh.isEmpty {
                return veryHigh.remove(at: 0)
            }
            if !high.isEmpty {
                return high.remove(at: 0)
            }
            if !normal.isEmpty {
                return normal.remove(at: 0)
            }
            if !low.isEmpty {
                return low.remove(at: 0)
            }
            if !veryLow.isEmpty {
                return veryLow.remove(at: 0)
            }
            return nil
        }
    

    _runOperation是任务执行的核心,那么OperationQueue到底是怎么调度Operation的呢?在介绍_runOperation之前,我们来看看什么时候调用_runOperation。

    _concurrencyGate、_underlyingQueue 和 queueGroup

    _concurrencyGate 控制并发执行几个任务的信号量,可以并发的数量就是我们maxConcurrentOperationCount的值。_runOperation的执行受_concurrencyGate信号量控制。wait()信号量减一,signal()信号量加一,当信号量为0时,就会一直等待,直接大于0时才会正常执行。
    将由信号量控制的_runOperation封装进block,这个block与queueGroup关联,放到队列中异步执行。执行_runOperation之前信号量执行一次wait,_runOperation执行完毕之后执行一次signal,确保同时执行的任务数量满足maxConcurrentOperationCount设定的值
    总结:添加至OperationQueue对象中的所有的任务都跟queueGroup关联,并且是放到_underlyingQueue队列中执行的。

    let block = DispatchWorkItem(flags: .enforceQoS) { () -> Void in
                    if let sema = self._concurrencyGate {
                        sema.wait()
                        self._runOperation()
                        sema.signal()
                    } else {
                        self._runOperation()
                    }
                    if let group = waitGroup {
                        group.leave()
                    }
                }
                _underlyingQueue.async(group: queueGroup, execute: block)
    

    哦,其实调度Operation的关键又多了两个:_underlyingQueuequeueGroup
    queueGroup的意义只有一个就是waitUntilAlloperationsAreFinished的实现,
    DispatchGroup的wait函数会阻塞当前线程,直到所有的任务都执行完毕。

        open func waitUntilAllOperationsAreFinished() {
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
            queueGroup.wait()
    #endif
        }
    

    再看_underlyingQueue变量,它的作用是为了获取__underlyingQueue变量,如果__underlyingQueue存在就直接返回,如果不存在就生成一个queue。

    如果是通过OperationQueue的main方法初始化OperationQueue,会走到OperationQueue内部的init(_queue queue: DispatchQueue, maxConcurrentOperations: Int = OperationQueue.defaultMaxConcurrentOperationCount)方法,__underlyingQueue就会被赋值;对于直接调用init方法生成的初始化的OperationQueue,__underlyingQueue是没有赋值的,在调用_underlyingQueue的时候重新创建__underlyingQueue。
    代码逻辑如下:

        private static let _main = OperationQueue(_queue: .main, maxConcurrentOperations: 1)
        
        open class var main: OperationQueue {
            return _main
        }
    
        internal init(_queue queue: DispatchQueue, maxConcurrentOperations: Int = OperationQueue.defaultMaxConcurrentOperationCount) {
            __underlyingQueue = queue
            maxConcurrentOperationCount = maxConcurrentOperations
            super.init()
            queue.setSpecific(key: OperationQueue.OperationQueueKey, value: Unmanaged.passUnretained(self))
        }
    
        internal var _underlyingQueue: DispatchQueue {
            lock.lock()
            if let queue = __underlyingQueue {
                lock.unlock()
                return queue
            } else {
                let effectiveName: String
                if let requestedName = _name {
                    effectiveName = requestedName
                } else {
                    effectiveName = "NSOperationQueue::\(Unmanaged.passUnretained(self).toOpaque())"
                }
                let attr: DispatchQueue.Attributes
                if maxConcurrentOperationCount == 1 {
                    attr = []
                    __concurrencyGate = DispatchSemaphore(value: 1)
                } else {
                    attr = .concurrent
                    if maxConcurrentOperationCount != OperationQueue.defaultMaxConcurrentOperationCount {
                        __concurrencyGate = DispatchSemaphore(value:maxConcurrentOperationCount)
                    }
                }
                let queue = DispatchQueue(label: effectiveName, attributes: attr)
                if _suspended {
                    queue.suspend()
                }
                __underlyingQueue = queue
                lock.unlock()
                return queue
            }
        }
    

    _runOperation:任务执行的核心

    再看_runOperation方法中的_dequeueOperation方法就是前文介绍的:将Operation对象从_operations中取出,最终执行Operation对象的start方法。
    _waitUntilReady的方法也是利用的DispatchGroup的wait函数阻塞线程,等到group中的所有的任务都执行完毕。顺便介绍OperationQueue是如何管理任务之间的依赖的。

        internal func _runOperation() {
            if let op = _dequeueOperation() {
                if !op.isCancelled {
                    op._waitUntilReady()
                    if !op.isCancelled {
                        op.start()
                    }
                }
            }
        }
    
    //class Operation
        internal func _waitUntilReady() {
            _depGroup.wait()
            _ready = true
        }
    }
    
    
    Operation依赖机制的实现原理

    前置知识:DispatchGroup的enter和wait函数必须要搭配使用(看文章引用)。
    我们的目的是要op1依赖于op2执行完毕后再执行。
    addDependency方法会将op1的_depGroup加入到op._groups数组中,同时进入_depGroup。那什么时候leave呢?答案就在Operation的finish方法。finish方法是Operation在执行结束时调用, 而其中的_leaveGroups方法会调用_groups所有的DispatchGroup对象的leave函数,所以_depGroup也将全部执行完毕,_depGroup.wait()之后的代码得以顺利执行。
    总结:op2执行完毕之后会遍历_groups,同时调用leave()。这个时候op1的_depGroup执行完毕,wait()不再等待,op1的start方法开始执行。

    op1.addDependency(op2)
    
    //class Operation
        open func addDependency(_ op: Operation) {
            lock.lock()
            _dependencies.insert(op)
            op.lock.lock()
            _depGroup.enter()
            op._groups.append(_depGroup)
            op.lock.unlock()
            lock.unlock()
        }
    
        internal func finish() {
            lock.lock()
            _finished = true
            _leaveGroups()
            lock.unlock()
            if let queue = _queue {
                queue._operationFinished(self)
            }
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
            // The completion block property is a bit cagey and can not be executed locally on the queue due to thread exhaust potentials.
            // This sets up for some strange behavior of finishing operations since the handler will be executed on a different queue
            if let completion = completionBlock {
                DispatchQueue.global(qos: .background).async { () -> Void in
                    completion()
                }
            }
    #endif
        }
    
        internal func _leaveGroups() {
            // assumes lock is taken
    #if DEPLOYMENT_ENABLE_LIBDISPATCH
            _groups.forEach() { $0.leave() }
            _groups.removeAll()
            _group.leave()
    #endif
        }
    

    如有错误,欢迎斧正=^=

    相关引用
    Foundation/Operation.swift
    WWDC 2015 Advanced NSOperations
    细说GCD(Grand Central Dispatch)如何用

    相关文章

      网友评论

      本文标题:NSOperation 高级用法之NSOperation基础(N

      本文链接:https://www.haomeiwen.com/subject/zakzuxtx.html