需求:实现一系列异步(如网络请求)的操作,这些操作间彼此依赖(顺序执行),全部操作完成后进行通知(回调)。
最终实现
- 调用
lazy var ops: [GCD.Task.Operation] = (1...10).map
{ i in
let op = GCD.Task.Operation(taskId: .init(i), timeout: 3)
op.handler = { [weak self, weak op] in
guard let strongSelf = self, let strongOp = op else { return }
strongSelf.request(task: i, time: Double(i), completion: {
strongSelf.label.text = "Finished: \(strongOp.taskId) ... ✅✅✅✅✅✅✅✅✅✅✅"
GCD.asyncAfter(seconds: 1, handler: strongOp.completionBlock ?? {})
})
}
return op
}
override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
super.touchesBegan(touches, with: event)
GCD.Task.async(
operations: ops,
queuePolicy: .serial,
timeoutPolicy: .cancelOthers
) { [weak self] result in
guard let self = self else {
return
}
print("Tasks completed with result: \(result)")
self.view.backgroundColor = result.isAllSuccess ? .systemGreen : .systemRed
}
ops[5].cancel()
ops[6].cancel()
ops[7].cancel()
}
func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
DispatchQueue.global().async { [weak self] in
GCD.asyncInMainQueue {
self?.label.text = "Started: \(task) ... ➡️➡️➡️➡️➡️➡️"
}
GCD.asyncAfter(seconds: time, in: .main, handler: completion ?? {})
}
}
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
GCD.Task.cancel(ops)
}
- 代码
extension GCD {
public enum Task {
public static func cancel(_ operations: [Operation]) {
operations.forEach{ op in
if !op.isCancelled, !op.isFinished {
op.cancel()
}
}
}
private static func executeResult(of operations: [Operation]) -> ExecuteResult {
var result = ExecuteResult()
for op in operations {
if op.isCancelled {
result.cancelled.append(op.taskId)
}
else if op.isTimedout {
result.timedout.append(op.taskId)
}
else {
result.success.append(op.taskId)
}
}
result.isAllSuccess = result.success.count == operations.count
return result
}
public static func async(
operations: [Operation],
queuePolicy: QueuePolicy = .concurrent(maxCount: -1),
timeoutPolicy: TimeoutPolicy = .waiting,
notify notifyQueue: DispatchQueue = .main,
completion: @escaping BlockT<ExecuteResult>
)
{
GCD.asyncInGlobalQueue
{
var count = -1
switch queuePolicy {
case .serial:
count = 1
case .concurrent(let maxCount):
count = maxCount
}
var isFinished = false
let opQueue = GCD.Queue.operation(maxConcurrentCount: count)
let newOperations = operations.filter{
!$0.isFinished && !opQueue.operations.contains($0)
}
for op in newOperations {
op.timedoutHandler = {
switch timeoutPolicy {
case .cancelOthers:
guard !isFinished else { break }
cancel(newOperations)
default: break
}
return timeoutPolicy
}
opQueue.addOperation(op)
}
opQueue.waitUntilAllOperationsAreFinished()
notifyQueue.async{
isFinished = true
completion(self.executeResult(of: operations))
}
}
}
}
}
extension GCD.Task {
public enum QueuePolicy {
case serial
case concurrent(maxCount: Int)
}
public enum TimeoutPolicy {
case waiting
case continueNext
case cancelOthers
}
public struct ExecuteResult {
public fileprivate(set) var success: [String] = []
public fileprivate(set) var cancelled: [String] = []
public fileprivate(set) var timedout: [String] = []
public fileprivate(set) var isAllSuccess = false
}
}
extension GCD.Task {
public class Operation: Foundation.Operation {
public var taskId: String
public var timeouts: TimeInterval
public var handler: BlockVoid?
public private(set) var isTimedout = false
internal var timedoutHandler: BlockR<TimeoutPolicy>?
fileprivate lazy var completion: BlockVoid = {
return { [weak self] in
guard let strongSelf = self else { return }
strongSelf.semaphore.signal()
}
}()
public override var completionBlock: (() -> Void)? {
set{}
get {
return completion
}
}
private lazy var semaphore = DispatchSemaphore(value: 0)
public enum Status {
case ready, timedout, finished, cancelled
}
public var status: Status {
switch (isTimedout, isFinished, isCancelled) {
case (true, _, _): return .timedout
case (_, true, _): return .finished
case (_, _, true): return .cancelled
default: return .ready
}
}
public override var description: String {
return "[Task: \(taskId)] [Status: \(status)]"
}
public init(
taskId: String,
timeout seconds: TimeInterval = 0
) {
self.taskId = taskId
self.timeouts = seconds
}
public override func main()
{
guard !isCancelled, let handler = handler else {
return
}
GCD.Queue.global(qos: .userInitiated).queue.async(execute: handler)
if timeouts > 0
{
switch (semaphore.wait(timeout: .now() + timeouts)) {
case .timedOut:
isTimedout = true
guard let policy = timedoutHandler?() else {
return
}
switch policy {
case .continueNext: semaphore.signal()
default: break
}
case .success:
break
}
}
else {
semaphore.wait()
}
}
}
}
方法1
通过直接重写
main
方法,通过GCDsemaphore
实现异步线程的同步阻塞,达到“串行”需求。
实现:
class SerialOperation: Operation {
var task: Int
init(task: Int) {
self.task = task
}
override func main() {
print("[\(task)] main Start")
/// 注意这里`value`为0时,若外部调用`wait()`则会阻塞当前operation所在的queue
/// 直到异步操作完成,释放信号量`semaphore.signal()`,才会执行下一个operation
let semaphore = DispatchSemaphore(value: 0)
DispatchQueue.global().async { [weak self] in
guard let self = self else { return }
print("[\(self.task)] Serial begin...\(Thread.current)")
request(task: self.task, time: 3) {
semaphore.signal()
print("[\(self.task)] Serial end...\(Thread.current)")
}
/*
print("[\(self.task)] Serial begin...\(Thread.current)")
Thread.sleep(forTimeInterval: 3)
print("[\(self.task)] Serial end...\(Thread.current)")
semaphore.signal() */
}
semaphore.wait()
print("[\(task)] main End")
}
}
调用:
let queue = OperationQueue()
// count为1类似于`异步串行`队列,达到operation`依赖`的效果
queue.maxConcurrentOperationCount = 1
for i in 1...3 {
let serial = SerialOperation(task: i)
serial.completionBlock = {
print("Serial operation finished! \(i)")
}
queue.addOperation(serial)
}
输出:
[1] main Start
[1] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[1] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[1] main End
Serial operation finished! 1
[2] main Start
[2] Serial begin...<NSThread: 0x600000925a00>{number = 7, name = (null)}
[2] Serial end...<NSThread: 0x600000925a00>{number = 7, name = (null)}
[2] main End
[3] main Start
Serial operation finished! 2
[3] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[3] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[3] main End
Serial operation finished! 3
方法2
重写
start
方法,并手动维护isFinished、isCacelled、isExecuting
等状态,此种方式较为复杂。
实现:
/// 公用网络请求方法
func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
DispatchQueue.global().async {
print("Requesting task: \(task) ... started")
Thread.sleep(forTimeInterval: time)
print("Requesting task: \(task) ... finished")
DispatchQueue.main.async(execute: completion ?? {})
}
}
class RequestOperation: Operation {
var task: Int
var time: TimeInterval
init (task: Int, time: TimeInterval, completion: @escaping ()->Void) {
self.task = task
self.time = time
super.init()
self.completionBlock = completion
}
private var _finished: Bool = false {
willSet{
willChangeValue(forKey: "isFinished")
}
didSet{
didChangeValue(forKey: "isFinished")
}
}
private var _executing: Bool = false {
willSet{
willChangeValue(forKey: "isExecuting")
}
didSet{
didChangeValue(forKey: "isExecuting")
}
}
private var _cancelled: Bool = false {
willSet{
willChangeValue(forKey: "isCancelled")
}
didSet{
didChangeValue(forKey: "isCancelled")
}
}
override var isFinished: Bool {
return _finished
}
override var isExecuting: Bool {
return _executing
}
override var isCancelled: Bool {
return _cancelled
}
override var isAsynchronous: Bool {
return true
}
private func done() {
super.cancel()
_cancelled = true
_executing = false
_finished = true
completionBlock?()
}
override func cancel() {
objc_sync_enter(self)
done()
objc_sync_exit(self)
}
override func start() {
guard !isCancelled else {
done()
return
}
_executing = true
request(task: task, time: time) { [weak self] in
self?.done()
}
}
}
调用:
let queue = OperationQueue()
var previousOperation: Operation?
for i in 1...3 {
let op = RequestOperation(task: i, time: Double(3-i), completion: {
print("Operation \(i) is completed!")
if i == 3 {
print("All tasks are done !!!")
}
})
if let previous = previousOperation {
op.addDependency(previous)
}
queue.addOperation(op)
previousOperation = op
}
输出:
Requesting task: 1 ... started
Requesting task: 1 ... finished
Operation 1 is completed!
Requesting task: 2 ... started
Requesting task: 2 ... finished
Operation 2 is completed!
Requesting task: 3 ... started
Requesting task: 3 ... finished
Operation 3 is completed!
All tasks are done !!!
网友评论