使用 NSOperationQueue 时控制任务数量会并不总是有效,原因何在?利用 NSOperation 封装异步代码有什么需要注意的地方?是否有更好的方法来控制任务的并发数量?为此,我们需要深入了解 NSOperation 的运作机制,现在我们从实际应用场景出发探讨这些问题。
本文没有 Demo,我开源了一个下载库 SDEDownloadManager ,实现了常见的下载管理需求,还有配套的 UI 组件。
下载管理需求
作为开发人员的你接到为 App 添加下载管理功能的需求,初始版本的要求很简单:为避免下载带宽被过多任务分散,只允许同时最多下载3个文件;只提供全部暂停/开始的功能。你不想大动干戈去使用那些大名鼎鼎的网络库,想看看能否利用系统框架现有的工具来快速完成这个功能。看了一番,决定使用NSOperation
封装NSURLSessionDownloadTask
,利用NSOperationQueue
的maxConcurrentOperationCount
来限制任务数量。
DownloadOperation
的部分代码:
private(set) var isStarted: Bool = false // 启动的标记
// 非异步代码只需要重写 main(),封装异步代码必须重写 start()
// 以及部分属性,以及决定结束 NSOperation 生命周期的时机,
// 这部分关键细节暂且假设已实现,放在下一节讨论。
override func start() {
isStarted = true
resume()
}
func resume(){
guard isStarted else {return}
downloadTask.resume()
isExecuting = true
}
func suspend(){
guard isStarted else {return}
downloadTask.suspend()
isExecuting = false
}
实现全部暂停/开始功能的部分代码:
var downloadOperations: [DownloadOperation]{
return operationQueue.operations as! [DownloadOperation]
}
func pauseAllDownloads(){
downloadOperations.filter({$0.isExecuting == true}).forEach({$0.suspend()})
}
func resumeAllDownloads(){
let pasuedOps = downloadOperations.filter({ $0.isStarted == true && $0.isExecuting == false})
pausedOps.forEach({$0.resume()})
if /* 存在其他未完成的任务 */{
for task in unfinishedTasks{
operationQueue.addOperation(operation)
}
}
}
快速实现后,上手测试了下,符合要求。第二个版本很快来了:
- 开放针对单个任务的暂停/开始功能,便于用户对任务进行调度,比如某个任务进展缓慢,暂停这个任务后自动启动等待中的其它任务,很明显,
downloadOpeation.suspend()
能暂停任务,但等待中的任务没有自动启动。 - 提供实时调节最大下载量的功能:3个太少,提升到10个,将
maxConcurrentOperationCount
设为10,等待中的任务陆续启动了;10个似乎太多了,有些任务分配到的速度太低了,还是把速度集中利用起来,重新设置为5,不对劲,还是有10个任务在下载,查看文档发现这个值减少时不会对已启动的NSOperation
产生影响,因此只能手动处理了,选中5个任务暂停,达到了要求。但任务陆续完成后,等待中的任务并没有自动启动。
下载任务可能在任意时刻结束,同时用户也可能随时调整下载量,情况会比上面遇到的情况更加复杂,为了彻底解决各种可能的问题,我们有必要深入了解下NSOperation
和NSOperationQueue
的运作机制。
NSOperation 的生命周期
WWDC 2015 Session 226: Advanced NSOperations该图截自 WWDC 2015 Session 226: Advanced NSOperations。NSOperation 的以上状态并非通过一个枚举,而是多个 Bool 值来表示,除了 Pending 状态(可简单理解为isReady == false
),其他四个状态有对应的属性来表示:
var isReady: Bool { get }
var isExecuting: Bool { get }
var isFinished: Bool { get }
var isCancelled: Bool { get }
执行operationQueue.addOperation(operation)
这行代码后,NSOperationQueue
为其分配线程并调用operation.start()
来启动这个任务,启动条件如下:
-
isReady == true
(NSOperation
的isReady
的默认值是true
,如果添加了依赖则需要等待依赖的NSOperation
结束后才能进入isReady
状态); - 已启动的
NSOperation
数量少于maxConcurrentOperationCount
(默认值为-1,无限制); -
isReady == true
的NSOperation
的数量多于剩余可启动数量时,较高queuePriority
值的有优先启动权; -
queuePriority
相同时再比较加入队列的时间顺序,NSOperationQueue
基于 GCD 实现,采用 FIFO 机制。
满足以上条件的NSOperation
会分配线程启动。start()
是NSOperation
的启动入口,而且只能由其所属的NSOperationQueue
来调用,否则抛出异常。从文档描述和实际测试来看,start()
的默认实现的逻辑是这样的:
// 由于相关属性都是只读的,我猜测实际的代码里是设置对应的私有属性
// 并手动发布 KVO 通知,我在重写`isFinished`时也是这么做的。
func start() {
if !isCancelled{
isExecuting = true
main()
isExecuting = false
}
isFinished = true
}
start()
返回前设置isFinished = true
并发出了 KVO 通知,NSOperation
所属的NSOperationQueue
在收到这个通知后会执行它的completionBlock
,并收回分配给该NSOperation
的线程,该NSOperation
就此结束了它的任务生涯。整个过程可简单归纳为: isReady
-> start()
-> isFinished
->completionBlock
。
对NSOperationQueue
来说,isFinished
的 KVO 通知是NSOperation
生命周期结束的唯一标志,在NSOperation
生命周期的任意时刻发出isFinished
的 KVO 通知(并且isFinished
确实为true
),该NSOperation
会被视为结束,completionBlock
被执行;相反,只要NSOperation
没有发出isFinished
的 KVO 通知,这个NSOperation
会持续占据一个maxConcurrentOperationCount
指定的名额。
重写start()
的注意事项:封装非异步的代码时,由于start()
的默认实现已经替我们在合适的时机更新了相关状态,start()
返回后NSOperation
的生命周期就结束了,重写main()
就够了,取消和暂停功能也在main()
里实现;而封装异步代码时,比如这里的NSURLSessionDownloadTask
,应该观察它的状态,待其结束时更新isFinished
的状态并且发布 KVO 通知,而不要在start()
返回前发出这个通知,所以封装异步代码必须重写start()
以及isFinished
属性,另外isExecuting
这个属性对于外部了解NSOperation
的执行状态是必要的因此也必须重写,这三点也是文档里对实现异步NSOperation
所要求的。
重写isFinished
属性,isExecuting
类似:
private var _isFinished: Bool = false
override private(set) var isFinished: Bool{
get {return _isFinished}
set{// 手动维护 KVO 通知,尽管这里使用 #keyPath 更安全(包括实现 KVO 时),
// 但是在 iOS 8/9/10 里使用 #keyPath 无法收到通知,在 iOS 11 里
// 使用 #keyPath 是正常的,而且你会发现它实际观察的键为 finished,
// 手动指定为 isFinished 在所有版本里正常。
self.willChangeValueForKey("isFinished")
self._isFinished = newValue
self.didChangeValueForKey("isFinished")
}
}
重写start()
:
override func start() {
if isCancelled || isFinished{
isExecuting = false
isFinished = true
}else if !isStarted{
isStarted = true
resume()
}
}
定义“暂停“
为何开头里暂停任务后无法自动启动其他等待的任务?这里创建两个变量便于大家理解:
actualConcurrentOperationCount(实际并发数) = 已经启动的 Operation 的数量
availableConcurrentOperationCount(剩余并发数) = maxConcurrentOperationCount - actualConcurrentOperationCount
只有当availableConcurrentOperationCount > 0
时,等待的NSOperation
才有机会启动。
第一个需求:针对单个任务的暂停/开始功能,在DownloadOpeation
上调用suspend()
暂停了任务,但对其所属的NSOperationQueue
来说并没有NSOperation
结束,availableConcurrentOperationCount
没有增加,所以什么都不会发生。
第二个需求:在开头的例子里,actualConcurrentOperationCount
为10,当maxConcurrentOperationCount
从10减少到5后,availableConcurrentOperationCount == -5
,不会有NSOperation
启动,我在这里手动暂停了5个NSOperation
并没有改变这个情况,而其它5个NSOperation
陆续完成时,availableConcurrentOperationCount
从-5递增至0,在此期间依然不会有其它NSOperation
启动。
问题的根源在于用户和NSOperationQueue
对"运行"的定义偏差,一个例子:设置maxConcurrentOperationCount = 5
,此时有两个正在下载的任务,还有3个任务可启动,暂停1个下载的任务后,用户认为还可运行4个任务,而在NSOperationQueue
那里,只剩下3个NSOperation
能够启动。通常,我们在NSOperation
子类里重写isExecuting
用来作为封装的任务是否真正执行的标记;但是从设计上来讲,isExecuting
对NSOperationQueue
没有意义,对后者来说,NSOperation
只有三种状态:未启动(Non-Started),已启动(Started),结束(Finished)。启动后的NSOperation
只要还没有发出isFinished
的 KVO 通知,从NSOperationQueue
的角度来看它的状态就是广义上的运行状态(Started and Unfinished)。
怎么解决?两种方案:开源,节流。
maxConcurrentOperationCount
是源(这里不考虑是无限的情况),通过“开源”弥补DownloadOperation
进入暂停状态时造成的偏差,做法很简单:
var maxDownloadCount: Int // 记录最大下载量
func pauseTask(_ task: String){
if downloadOperation.isExecuting{
downloadOperation.suspend()
// 暂停下载后用户的期待:剩余可下载的任务数量+1,
// 通过增加整体可启动的任务数量来实现。
operationQueue.maxConcurrentOperationCount += 1
}
}
func resumePausedTask(_ task: String){
// 任务启动后,可以在运行和暂停状态间随意切换,OperationQueue 无法干涉,
// 所以恢复暂停的下载任务要小心,避免下载的总数量超出用户的预期。
guard /* 正在下载的数量 < maxDownloadCount */ else{return}
if downloadOperation.isStarted && !downloadOperation.isExecuting{
downloadOperation.resume()
// 恢复下载后用户的期待:剩余可下载的任务数量-1,
// 通过减少整体可启动的任务数量来实现。
operationQueue.maxConcurrentOperationCount -= 1
}
}
以上面的例子来说:maxConcurrentOperationCount
为5,现在有2个已经启动的任务,以“开源”的方式暂停1个下载的任务后,maxConcurrentOperationCount
变为6,此时下载的任务数量为1,暂停的任务数量为1,剩余可启动的任务数量为 6-1-1 = 4,符合用户预期;如果恢复被暂停的下载任务,maxConcurrentOperationCount
变回5,此时下载的任务数量为2,剩余可启动的任务数量为 5-2 = 3,符合用户预期。
暂停的任务是actualConcurrentOperationCount
中可以被节省的流,所谓“暂停”任务,就是保留任务当时的状态以便后续从这个状态继续任务,如果在“暂停”任务的同时结束NSOperation
,可以直接消除DownloadOperation
进入暂停状态时造成的偏差。
首先来看看具体在什么时机结束DownloadOperation
,这是NSURLSessionDownloadTask
的状态URLSessionTask.State
,和NSOperation
的状态类似:
enum State : Int {
case running
case suspended
case canceling
// 与 Operation 的 isFinished 状态类似,代表了结束。
case completed
}
一种很自然的选择是当NSURLSessionDownloadTask
的状态变为completed
时,让DownloadOperation
成为isFinished
,可以方便地通过 KVO 观察来实现。在DownloadOperation
中实现如下方法:
func stop(){
guard isStarted else{return}
// 状态变化:running/suspended -> canceling -> completed
downloadTask.cancel(byProducingResumeData: { resumeData in
/* 稍后可以利用 resumeData 从中断的地方继续下载 */
})
isExecuting = false
}
从用户的角度来看,suspend()
和stop()
并没有什么区别;在DownloadOperation
上调用suspend()
后downloadTask
的状态变化:running -> suspended,从NSOperationQueue
的角度来看,suspend()
没有任何影响,但是stop()
会结束所在的DownloadOperation
,用stop()
实现的暂停将会直接空出一个启动名额。不过,在外部调用stop()
时,需要谨慎处理:
func stopTask(_ task: String){
// 如果任务处于暂停状态,那么 maxConcurrentOperationCount 肯定
// 增加过了,现在要结束该 Operation,必须进行平衡(不考虑无限的情况)
if downloadOperation.isStarted && !downloadOperation.isExecuting{
operationQueue.maxConcurrentOperationCount -= 1
}
downloadOperation.stop()
}
采用suspend()
实现暂停时,前面的代码都需要重新调整maxConcurrentOperationCount
,而采用stop()
来实现用户角度的暂停功能时,代码要简单得多,在几个需求里所有需要暂停任务的地方直接调用stopTask(_:)
,调整下载量的时候直接设置maxConcurrentOperationCount
就行了,而且测试起来成本也要小得多。stop()
这种方式的缺点在于:1. 可能有些服务器不支持断点续传,2.会断开和服务器的连接,恢复下载时需要重新连接。
如果你考虑到这样的场景:用户暂停了所有的下载然后退出 App,可能一开始就使用了stop()
这种方式来实现暂停,那么本文的问题就不存在了。
调整最大下载量
采用上面这两种方案来实现对最大下载量的调整,重写上面的maxDownloadCount
,逻辑大致是这样的:
var pauseBySuspendingSessionTask: Bool // 决定采用哪种方案
public var maxDownloadCount: Int{
didSet{
// 为避免 maxConcurrentOperationCount 增加时会启动任务产生干扰,
// 等调整完毕后再开放,这个期间 operationQueue 不会再启动 Operation
operationQueue.isSuspended = true
let executingCount = downloadOperations.filter({$0.executing == true}).count
// 处理超额的任务
if executingCount > maxDownloadCount{
if pauseBySuspendingSessionTask{
/* 暂停(suspend())超额的任务 */
}else{
/* 停止(stop())超额的任务 */
}
}else{
let pendingCount = downloadOperations.filter({$0.isStarted == false}).count
if pendingCount < maxDownloadCount - executingCount{
/* 如果等待中的任务全部启动后,全部启动的任务数量 < maxDownloadCount,
可以选择恢复暂停的任务进行补充,是否实现这个功能酌情处理 */
}
}
// 调整 maxConcurrentOperationCount
if pauseBySuspendingSessionTask{
// 超额的任务被暂停(suspend())后 DownloadOperation 依然占据了
// 一个 maxConcurrentOperationCount 的名额,需要”开源“补充回来
let pausedCount = downloadOperations.filter({
$0.isStarted == true && $0.isExecuting == false && $0.isFinished == false}).count
operationQueue.maxConcurrentOperationCount = maxDownloadCount + pausedCount
}else{
// 超额的任务被停止(stop())后 DownloadOperation 的生命结束,不占据名额,直接设置
operationQueue.maxConcurrentOperationCount = maxDownloadCount
}
downloadOperation.isSuspended = false
}
}
在实际中,在即将结束的NSURLSessionDownloadTask
上调用suspend()
,当NSOperationQueue
里有其他任务结束时,在没有调用resume()
的情况下,这个任务可能会主动继续并结束,在“开源”这种方式下,这个任务应该调用resumePausedTask(_:)
去平衡maxConcurrentOperationCount
的值,这个问题可以在NSOperation
的completionBlock
里检查修正,而另一种方式则不会存在这个问题。
上面的代码无法处理DownloadOperation
的suspend()
和stop()
混合使用的情况,有两种解决方式:1. 只使用一种暂停方式,譬如在DownloadOperation
的pauseTask(_:)
根据pauseBySuspendingSessionTask
的值决定是否改为调用stopTask(_:)
;2. 考虑两者混用的情况,在调整maxConcurrentOperationCount
时下面这行代码就是通用的:
operationQueue.maxConcurrentOperationCount = maxDownloadCount + pausedCount
至此,实现同时至多下载 N 个文件的核心部分完成了。
start() 补遗
isStarted
除了用来标记是否启动,主要是用来做安全隔离,在被NSOperationQueue
启动前,不能让外界通过resume()
执行计划之外的下载,其它会更改NSURLSessionDownloadTask
状态的操作,包括stop()
和cancel()
也采取相同的安全设计。
NSOperation
添加到NSOperationQueue
执行时,只能由NSOperationQueue
来调用start()
,否则会抛出异常。但这里重写的start()
方法可能会被其它对象调用而导致任务被意外启动,安全设计从根源上就被破解了。如何防范呢?虽然可以在DownloadOperation
的start()
里调用super.start()
来沿用原来的安全机制,不过文档里强烈建议不要这么做。最直接的办法还是防止外界调用start()
,要确保两点:
- 避免外界获取到你使用的
DownloadOperation
对象,比如上面的operatonQueue
,可以通过其operations
属性来间接调用start()
。 - 避免在内部任何地方调用
DownloadOperation
对象的start()
。
NSOperation 状态补遗
isFinished
的 KVO 通知是NSOperation
生命结束的标志,如果在NSOperation
生命周期正常结束之前发出isFinished
的 KVO 通知会发生什么?NSOperationQueue
接到通知后,对isFinished
进行校验:如果值确实为true
,那么会按照正常流程处理,将该NSOperation
视为结束,执行它的completionBlock
,如果此时有等待中的NSOperation
,选择并启动一个;如果isFinished
值不为true
,什么都不会发生。这个提前发出了isFinished
通知的NSOperation
,如果此时其start()
尚未返回,它依然会占据分配给它的线程(这里我揣测下为何maxConcurrentOperationCount
减少时不会对已启动的NSOperation
产生影响,NSOperation
封装的代码对于NSOperationQueue
来说是未知的,不干涉是明智的选择),继续执行,如果此后的流程再次发出了 KVO 通知,是否会按原本的流程走一遍:执行completionBlock
,启动一个等待的任务?在 Objective-C 类里,completionBlock
会再次执行;而在 Swift 类里,可考证的是从 Swift 3.1 起,completionBlock
不会被重复执行。而是否会启动一个等待的任务,不管是 Objective-C 类还是 Swift 类里,代码都有着可靠的校验机制,使得启动的NSOperation
数量保持在maxConcurrentOperationCount
的范围内。
isCancelled
是启动任务的一个校验点,如果在任务在启动之前就被取消了,显然就没有必要为这个任务分配线程并启动了,不过现实并非如此,文档里的关键说明如下:
Canceling an operation that is currently in an operation queue, but not yet executing, makes it possible to remove the operation from the queue sooner than usual.
Because it is already cancelled, this behavior allows the operation queue to call the operation’s start method sooner and clear the object out of the queue.
虽然是 sooner,还是会调用start()
!有多 sooner?在启动前调用cancel()
会移除NSOperation
的依赖并让其立刻进入isReady
状态,就快了这部分而已,除此之外,上面的启动流程一样没省,只不过启动后会检查isCancelled
属性而不会执行main()
。这个实现就好比你在排队,由于各种原因不想排队了,结果被告知必须排到你的时候才能取消排队!这显然不是我们想要的,但,测试表明,并没有妥善的办法在其它部分不出纰漏的情况下省去启动流程直接结束该NSOperation
。显然NSOperationQueue
能做到我们期待的那样,但还是没有那样做,我猜测这是为了坚持isFinished
是NSOperation
的最终状态这个设计所造成的,由于isFinished
可能会在NSOperation
子类里重写(设置isFinished
的权限就转移到了子类里),这样一来只有在NSOperation
子类启动后才能设置isFinished
。这也造成了另外一个结果:即使在启动前cancel()
了,NSOperation
的状态变化:isCancelled
->isFinished
,completionBlock
还是会被执行的,这也增加了我们的工作,比如在开头我添加了isStarted
来标记任务是否真的启动过。我们剩下能做的就是加速启动,queuePriority
,直接设置至最高级别,一旦有前面的任务完成,优先启动,即使有多个待取消的NSOperation
,处理起来是很快的。
NSOperationQueue
由于限制只能提供半自动的启动数量管理,而且cancel()
的逻辑设计,可能令人无法接受,如果只是需要控制任务的启动数量,我们可以实现一个简化版的NSOperationQueue
,对DownloadOperation
实行全程掌控:使用数组实现一个 FIFO 的队列来添加DownloadOperation
,根据queuePriority
来动态调整在队列中的位置;使用 GCD 来分配线程,手动执行start()
,响应DownloadOperation
状态的 KVO 通知来实现NSOperationQueue
的相应行为;如果想采取cancel()
后提前移除NSOperation
的设计,可以通过 KVO 观察来跟踪isCancelled
的状态,如果DownloadOperation
尚未启动,直接将其移出队列。
网友评论