NSOperation是苹果官方提供的多线程解决方案,相比GCD,最大的特点是支持任务间的依赖和优先级的调整,这里我们结合GNU Step的源码来看下别人是怎么实现的。
源码下载地址:https://github.com/gnustep/libs-base
1.如何实现operationA 对B的依赖?
首先把依赖的任务B插入A的dependencies 数组
判断当前任务和要依赖的任务的状态,如果A或B已结束,或者A被取消或在执行中 则跳过。
- (void) addDependency: (NSOperation *)op
{
if (NO == [op isKindOfClass: [NSOperation class]])
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] dependency is not an NSOperation",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
if (op == self)
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] attempt to add dependency on self",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
[internal->lock lock];
if (internal->dependencies == nil)
{
internal->dependencies = [[NSMutableArray alloc] initWithCapacity: 5];
}
NS_DURING
{
if (NSNotFound == [internal->dependencies indexOfObjectIdenticalTo: op])
{
[self willChangeValueForKey: @"dependencies"];
[internal->dependencies addObject: op];
/* We only need to watch for changes if it's possible for them to
* happen and make a difference.
*/
if (NO == [op isFinished]
&& NO == [self isCancelled]
&& NO == [self isExecuting]
&& NO == [self isFinished])
{
/* Can change readiness if we are neither cancelled nor
* executing nor finished. So we need to observe for the
* finish of the dependency.
*/
[op addObserver: self
forKeyPath: @"isFinished"
options: NSKeyValueObservingOptionNew
context: isFinishedCtxt];
if (internal->ready == YES)
{
/* The new dependency stops us being ready ...
* change state.
*/
[self willChangeValueForKey: @"isReady"];
internal->ready = NO;
[self didChangeValueForKey: @"isReady"];
}
}
[self didChangeValueForKey: @"dependencies"];
}
}
NS_HANDLER
{
[internal->lock unlock];
NSLog(@"Problem adding dependency: %@", localException);
return;
}
NS_ENDHANDLER
[internal->lock unlock];
}
然后通过KVO添加对operation B 的isReady状态监听,当B的isReady发生变化时,修改A的isReady状态为YES。
- (void) observeValueForKeyPath: (NSString *)keyPath
ofObject: (id)object
change: (NSDictionary *)change
context: (void *)context
{
[internal->lock lock];
/* We only observe isFinished changes, and we can remove self as an
* observer once we know the operation has finished since it can never
* become unfinished.
*/
[object removeObserver: self forKeyPath: @"isFinished"];
if (object == self)
{
/* We have finished and need to unlock the condition lock so that
* any waiting thread can continue.
*/
[internal->cond lock];
[internal->cond unlockWithCondition: 1];
[internal->lock unlock];
return;
}
if (NO == internal->ready)
{
NSEnumerator *en;
NSOperation *op;
/* Some dependency has finished (or been removed) ...
* so we need to check to see if we are now ready unless we know we are.
* This is protected by locks so that an update due to an observed
* change in one thread won't interrupt anything in another thread.
*/
en = [internal->dependencies objectEnumerator];
while ((op = [en nextObject]) != nil)
{
if (NO == [op isFinished])
break;
}
if (op == nil)
{
[self willChangeValueForKey: @"isReady"];
internal->ready = YES;
[self didChangeValueForKey: @"isReady"];
}
}
[internal->lock unlock];
}
同时operationQueue在add Operation的时候监听了operation的isReady状态,所以当前面A的isReady发生变化时,operationQueue也会得到通知
- (void) addOperation: (NSOperation *)op
{
if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object is not an NSOperation",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
[internal->lock lock];
if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]
&& NO == [op isFinished])
{
[op addObserver: self
forKeyPath: @"isReady"
options: NSKeyValueObservingOptionNew
context: isReadyCtxt];
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->operations addObject: op];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
if (YES == [op isReady])
{
[self observeValueForKeyPath: @"isReady"
ofObject: op
change: nil
context: isReadyCtxt];
}
}
[internal->lock unlock];
}
在operationQueue的KVO回调方法里面,会移除isReady的监听,添加一个对operationA 优先级变化的监听,然后找到一个适合operationA位置并插入到waiting数组里面,然后执行_execute方法。
//NSOperationQueue (Private)
- (void) observeValueForKeyPath: (NSString *)keyPath
ofObject: (id)object
change: (NSDictionary *)change
context: (void *)context
{
/* We observe three properties in sequence ...
* isReady (while we wait for an operation to be ready)
* queuePriority (when priority of a ready operation may change)
* isFinished (to see if an executing operation is over).
*/
if (context == isFinishedCtxt)
{
[internal->lock lock];
internal->executing--;
[object removeObserver: self forKeyPath: @"isFinished"];
[internal->lock unlock];
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->lock lock];
[internal->operations removeObjectIdenticalTo: object];
[internal->lock unlock];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
}
else if (context == queuePriorityCtxt || context == isReadyCtxt)
{
NSInteger pos;
[internal->lock lock];
if (context == queuePriorityCtxt)
{
[internal->waiting removeObjectIdenticalTo: object];
}
if (context == isReadyCtxt)
{
[object removeObserver: self forKeyPath: @"isReady"];
[object addObserver: self
forKeyPath: @"queuePriority"
options: NSKeyValueObservingOptionNew
context: queuePriorityCtxt];
}
pos = [internal->waiting insertionPosition: object
usingFunction: sortFunc
context: 0];
[internal->waiting insertObject: object atIndex: pos];
[internal->lock unlock];
}
[self _execute];
}
2.放进operationQueue的任务当isReady状态改变后是立刻执行吗?
不一定,参考_execute方法:
首先进入一个while循环,判断operationQuque是否为suspend状态,是否超过当前最大并发数,以及waiting数组的数量是否<=0,如果都不是,则拿出Waiting 数组里面的第一个任务,同时移除对该任务优先级的监听,添加对该任务isFinished的监听,增加executing的计数。 然后,看要执行的任务是否支持并发,如果支持,则调用该任务的start方法开始执行,如果不支持,则先把当前任务加到starting数组内,然后看当前正在使用的线程数是否大于线程池内的线程数量(默认是8),如果不超过,则开启一个新线程,并执行_thread方法。
//NSOperationQueue (Private)
/* Check for operations which can be executed and start them.
*/
- (void) _execute
{
NSInteger max;
[internal->lock lock];
max = [self maxConcurrentOperationCount];
if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
{
max = maxConcurrent;
}
NS_DURING
while (NO == [self isSuspended]
&& max > internal->executing
&& [internal->waiting count] > 0)
{
NSOperation *op;
/* Take the first operation from the queue and start it executing.
* We set ourselves up as an observer for the operating finishing
* and we keep track of the count of operations we have started,
* but the actual startup is left to the NSOperation -start method.
*/
op = [internal->waiting objectAtIndex: 0];
[internal->waiting removeObjectAtIndex: 0];
[op removeObserver: self forKeyPath: @"queuePriority"];
[op addObserver: self
forKeyPath: @"isFinished"
options: NSKeyValueObservingOptionNew
context: isFinishedCtxt];
internal->executing++;
if (YES == [op isConcurrent])
{
[op start];
}
else
{
NSUInteger pending;
[internal->cond lock];
pending = [internal->starting count];
[internal->starting addObject: op];
/* Create a new thread if all existing threads are busy and
* we haven't reached the pool limit.
*/
if (0 == internal->threadCount
|| (pending > 0 && internal->threadCount < POOL))
{
internal->threadCount++;
NS_DURING
{
[NSThread detachNewThreadSelector: @selector(_thread)
toTarget: self
withObject: nil];
}
NS_HANDLER
{
NSLog(@"Failed to create thread for %@: %@",
self, localException);
}
NS_ENDHANDLER
}
/* Tell the thread pool that there is an operation to start.
*/
[internal->cond unlockWithCondition: 1];
}
}
NS_HANDLER
{
[internal->lock unlock];
[localException raise];
}
NS_ENDHANDLER
[internal->lock unlock];
}
_thread方法:
首先会创建一个自动释放池(因为开启了一个新线程),然后进入一个无限循环,在循环体里面会遍历starting数组,每次循环取出第一个任务,设置好他的线程优先级然后执行,执行完后调用该任务的_finish 方法修改执行状态。最后再销毁自动释放池。这个循环的退出条件是循环开始执行后5秒内没有要执行的任务(starting数组为空),内部是通过一个条件锁来控制这个时间的。
//NSOperationQueue (Private)
- (void) _thread
{
CREATE_AUTORELEASE_POOL(arp);
[[[NSThread currentThread] threadDictionary] setObject: self
forKey: threadKey];
for (;;)
{
NSOperation *op;
NSDate *when;
BOOL found;
/* We use a pool for each operation in case releasing the operation
* causes it to be deallocated, and the deallocation of the operation
* autoreleases something which needs to be cleaned up.
*/
RECREATE_AUTORELEASE_POOL(arp);
when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];
found = [internal->cond lockWhenCondition: 1 beforeDate: when];
RELEASE(when);
if (NO == found)
{
break; // Idle for 5 seconds ... exit thread.
}
if ([internal->starting count] > 0)
{
op = RETAIN([internal->starting objectAtIndex: 0]);
[internal->starting removeObjectAtIndex: 0];
}
else
{
op = nil;
}
if ([internal->starting count] > 0)
{
// Signal any other idle threads,
[internal->cond unlockWithCondition: 1];
}
else
{
// There are no more operations starting.
[internal->cond unlockWithCondition: 0];
}
if (nil != op)
{
NS_DURING
{
ENTER_POOL
[NSThread setThreadPriority: [op threadPriority]];
[op start];
LEAVE_POOL
}
NS_HANDLER
{
NSLog(@"Problem running operation %@ ... %@",
op, localException);
}
NS_ENDHANDLER
[op _finish];
RELEASE(op);
}
}
[[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey];
[internal->lock lock];
internal->threadCount--;
[internal->lock unlock];
DESTROY(arp);
[NSThread exit];
}
由此可见,如果任务支持并发,并且operationQueue没有suspend的话,会马上执行该任务。否则会看是否有可用的线程,如果有会在新开启的线程立刻执行,如果没有,则暂存在starting数组中等待执行,当任意一个线程执行完当前任务就会接着执行这个排队的任务。
查阅源码的时候我们会发现到处都有 internal 这个变量,这个变量实际就是self, 只是为了方便内存管理,封装成了一个类,我们直接当成self理解就可以。
网友评论