美文网首页
Swift | 自定义顺序执行的异步Operation

Swift | 自定义顺序执行的异步Operation

作者: 清無 | 来源:发表于2022-03-17 18:32 被阅读0次

    需求:实现一系列异步(如网络请求)的操作,这些操作间彼此依赖(顺序执行),全部操作完成后进行通知(回调)。

    最终实现

    • 调用
    lazy var ops: [GCD.Task.Operation] = (1...10).map
        { i in
            let op = GCD.Task.Operation(taskId: .init(i), timeout: 3)
            op.handler =  { [weak self, weak op] in
                guard let strongSelf = self, let strongOp = op else { return }
                strongSelf.request(task: i, time: Double(i), completion: {
                    strongSelf.label.text = "Finished: \(strongOp.taskId) ... ✅✅✅✅✅✅✅✅✅✅✅"
                    GCD.asyncAfter(seconds: 1, handler: strongOp.completionBlock ?? {})
                })
            }
            return op
        }
        
        override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
            super.touchesBegan(touches, with: event)
            
            GCD.Task.async(
                operations: ops,
                queuePolicy: .serial,
                timeoutPolicy: .cancelOthers
            ) { [weak self] result in
                guard let self = self else {
                    return
                }
                print("Tasks completed with result: \(result)")
                self.view.backgroundColor = result.isAllSuccess ? .systemGreen : .systemRed
            }
            ops[5].cancel()
            ops[6].cancel()
            ops[7].cancel()
        }
        
        func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
            DispatchQueue.global().async { [weak self] in
                GCD.asyncInMainQueue {
                    self?.label.text = "Started: \(task) ... ➡️➡️➡️➡️➡️➡️"
                }
                GCD.asyncAfter(seconds: time, in: .main, handler: completion ?? {})
            }
        }
        
        override func viewWillDisappear(_ animated: Bool) {
            super.viewWillDisappear(animated)
            
            GCD.Task.cancel(ops)
        }
    
    • 代码
    extension GCD {
        public enum Task {
            public static func cancel(_ operations: [Operation]) {
                operations.forEach{ op in
                    if !op.isCancelled, !op.isFinished {
                        op.cancel()
                    }
                }
            }
            
            private static func executeResult(of operations: [Operation]) -> ExecuteResult {
                var result = ExecuteResult()
                for op in operations {
                    if op.isCancelled {
                        result.cancelled.append(op.taskId)
                    }
                    else if op.isTimedout {
                        result.timedout.append(op.taskId)
                    }
                    else {
                        result.success.append(op.taskId)
                    }
                }
                result.isAllSuccess = result.success.count == operations.count
                return result
            }
            
            public static func async(
                operations: [Operation],
                queuePolicy: QueuePolicy = .concurrent(maxCount: -1),
                timeoutPolicy: TimeoutPolicy = .waiting,
                notify notifyQueue: DispatchQueue = .main,
                completion: @escaping BlockT<ExecuteResult>
            )
            {
                GCD.asyncInGlobalQueue
                {
                    var count = -1
                    switch queuePolicy {
                    case .serial:
                        count = 1
                    case .concurrent(let maxCount):
                        count = maxCount
                    }
                    
                    var isFinished = false
                    let opQueue = GCD.Queue.operation(maxConcurrentCount: count)
                    let newOperations = operations.filter{
                        !$0.isFinished && !opQueue.operations.contains($0)
                    }
                    for op in newOperations {
                        op.timedoutHandler = {
                            switch timeoutPolicy {
                            case .cancelOthers:
                                guard !isFinished else { break }
                                cancel(newOperations)
                            default: break
                            }
                            return timeoutPolicy
                        }
                        opQueue.addOperation(op)
                    }
                    opQueue.waitUntilAllOperationsAreFinished()
                    
                    notifyQueue.async{
                        isFinished = true
                        completion(self.executeResult(of: operations))
                    }
                }
            }
        }
        
    }
    
    extension GCD.Task {
        public enum QueuePolicy {
            case serial
            case concurrent(maxCount: Int)
        }
        public enum TimeoutPolicy {
            case waiting
            case continueNext
            case cancelOthers
        }
        public struct ExecuteResult {
            public fileprivate(set) var success: [String] = []
            public fileprivate(set) var cancelled: [String] = []
            public fileprivate(set) var timedout: [String] = []
            public fileprivate(set) var isAllSuccess = false
        }
    }
    
    extension GCD.Task {
        public class Operation: Foundation.Operation {
            public var taskId: String
            public var timeouts: TimeInterval
            public var handler: BlockVoid?
            public private(set) var isTimedout = false
            internal var timedoutHandler: BlockR<TimeoutPolicy>?
            
            fileprivate lazy var completion: BlockVoid = {
                return { [weak self] in
                    guard let strongSelf = self else { return }
                    strongSelf.semaphore.signal()
                }
            }()
            public override var completionBlock: (() -> Void)? {
                set{}
                get {
                    return completion
                }
            }
            private lazy var semaphore = DispatchSemaphore(value: 0)
            
            public enum Status {
                case ready, timedout, finished, cancelled
            }
            public var status: Status {
                switch (isTimedout, isFinished, isCancelled) {
                case (true, _, _): return .timedout
                case (_, true, _): return .finished
                case (_, _, true): return .cancelled
                default: return .ready
                }
            }
            
            public override var description: String {
                return "[Task: \(taskId)] [Status: \(status)]"
            }
            
            public init(
                taskId: String,
                timeout seconds: TimeInterval = 0
            ) {
                self.taskId = taskId
                self.timeouts = seconds
            }
            
            public override func main()
            {
                guard !isCancelled, let handler = handler else {
                    return
                }
                GCD.Queue.global(qos: .userInitiated).queue.async(execute: handler)
                
                if timeouts > 0
                {
                    switch (semaphore.wait(timeout: .now() + timeouts)) {
                    case .timedOut:
                        isTimedout = true
                        guard let policy = timedoutHandler?() else {
                            return
                        }
                        switch policy {
                        case .continueNext: semaphore.signal()
                        default: break
                        }
                    case .success:
                        break
                    }
                }
                else {
                    semaphore.wait()
                }
            }
        }
    }
    

    方法1

    通过直接重写main方法,通过GCDsemaphore实现异步线程的同步阻塞,达到“串行”需求。

    实现:

    class SerialOperation: Operation {
        var task: Int
        init(task: Int) {
            self.task = task
        }
        override func main() {
            print("[\(task)] main Start")
    /// 注意这里`value`为0时,若外部调用`wait()`则会阻塞当前operation所在的queue
    /// 直到异步操作完成,释放信号量`semaphore.signal()`,才会执行下一个operation
            let semaphore = DispatchSemaphore(value: 0)
            DispatchQueue.global().async { [weak self] in
                guard let self = self else { return }
    
                print("[\(self.task)] Serial begin...\(Thread.current)")
                request(task: self.task, time: 3) {
                    semaphore.signal()
                    print("[\(self.task)] Serial end...\(Thread.current)")
                }
    /*
                print("[\(self.task)] Serial begin...\(Thread.current)")
                Thread.sleep(forTimeInterval: 3)
                print("[\(self.task)] Serial end...\(Thread.current)")
                semaphore.signal() */
            }
            semaphore.wait()
            print("[\(task)] main End")
        }
    }
    

    调用:

    let queue = OperationQueue()
    // count为1类似于`异步串行`队列,达到operation`依赖`的效果
            queue.maxConcurrentOperationCount = 1
            for i in 1...3 {
                let serial = SerialOperation(task: i)
                serial.completionBlock = {
                    print("Serial operation finished! \(i)")
                }
                queue.addOperation(serial)
            }
    

    输出:

    [1] main Start
    [1] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
    [1] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
    [1] main End
    Serial operation finished! 1
    [2] main Start
    [2] Serial begin...<NSThread: 0x600000925a00>{number = 7, name = (null)}
    [2] Serial end...<NSThread: 0x600000925a00>{number = 7, name = (null)}
    [2] main End
    [3] main Start
    Serial operation finished! 2
    [3] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
    [3] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
    [3] main End
    Serial operation finished! 3
    

    方法2

    重写start方法,并手动维护isFinished、isCacelled、isExecuting等状态,此种方式较为复杂。

    实现:

    /// 公用网络请求方法
    func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
        DispatchQueue.global().async {
            print("Requesting task: \(task) ... started")
            Thread.sleep(forTimeInterval: time)
            print("Requesting task: \(task) ... finished")
            DispatchQueue.main.async(execute: completion ?? {})
        }
    }
    
    class RequestOperation: Operation {
        var task: Int
        var time: TimeInterval
        
        init (task: Int, time: TimeInterval, completion: @escaping ()->Void) {
            self.task = task
            self.time = time
            
            super.init()
            self.completionBlock = completion
        }
        
        private var _finished: Bool = false {
            willSet{
                willChangeValue(forKey: "isFinished")
            }
            didSet{
                didChangeValue(forKey: "isFinished")
            }
        }
        private var _executing: Bool = false {
            willSet{
                willChangeValue(forKey: "isExecuting")
            }
            didSet{
                didChangeValue(forKey: "isExecuting")
            }
        }
        private var _cancelled: Bool = false {
            willSet{
                willChangeValue(forKey: "isCancelled")
            }
            didSet{
                didChangeValue(forKey: "isCancelled")
            }
        }
        
        override var isFinished: Bool {
            return _finished
        }
        override var isExecuting: Bool {
            return _executing
        }
        override var isCancelled: Bool {
            return _cancelled
        }
        override var isAsynchronous: Bool {
            return true
        }
        
        private func done() {
            super.cancel()
            
            _cancelled = true
            _executing = false
            _finished = true
            
            completionBlock?()
        }
        
        override func cancel() {
            objc_sync_enter(self)
            done()
            objc_sync_exit(self)
        }
        
        override func start() {
            guard !isCancelled else {
                done()
                return
            }
            _executing = true
            request(task: task, time: time) { [weak self] in
                self?.done()
            }
        }
    }
    

    调用:

    let queue = OperationQueue()
            var previousOperation: Operation?
            for i in 1...3 {
                let op = RequestOperation(task: i, time: Double(3-i), completion: {
                    print("Operation \(i) is completed!")
                    if i == 3 {
                        print("All tasks are done !!!")
                    }
                })
                if let previous = previousOperation {
                    op.addDependency(previous)
                }
                queue.addOperation(op)
                previousOperation = op
            }
    

    输出:

    Requesting task: 1 ... started
    Requesting task: 1 ... finished
    Operation 1 is completed!
    Requesting task: 2 ... started
    Requesting task: 2 ... finished
    Operation 2 is completed!
    Requesting task: 3 ... started
    Requesting task: 3 ... finished
    Operation 3 is completed!
    All tasks are done !!!
    

    相关文章

      网友评论

          本文标题:Swift | 自定义顺序执行的异步Operation

          本文链接:https://www.haomeiwen.com/subject/bxxbdrtx.html