NSOperation GUNStep源码
NSInvocationOperation GUNStep源码
NSOperationQueue GUNStep源码
首先是 initialize
方法,在类第一次使用的时候,创建一个实例当做主队列.
+ (void) initialize
{
if (nil == mainQueue)
{
mainQueue = [self new];
}
}
获取当前队列
+ (id) currentQueue
{
if ([NSThread isMainThread]) // 如果是主线程
{
return mainQueue; // 返回主队列
}
// 返回当前线程的 队列信息
return [[[NSThread currentThread] threadDictionary] objectForKey: threadKey];
}
init方法
- (id) init
{
if ((self = [super init]) != nil)
{
GS_CREATE_INTERNAL(NSOperationQueue);
internal->suspended = NO; 默认没有暂停
internal->count = NSOperationQueueDefaultMaxConcurrentOperationCount; // 最大并发数
internal->operations = [NSMutableArray new]; // 操作s
internal->starting = [NSMutableArray new]; // 正在进行的操作s
internal->waiting = [NSMutableArray new]; // 等待的操作s
internal->lock = [NSRecursiveLock new];
[internal->lock setName:
[NSString stringWithFormat: @"lock-for-op-%p", self]];
internal->cond = [[NSConditionLock alloc] initWithCondition: 0];
[internal->cond setName:
[NSString stringWithFormat: @"cond-for-op-%p", self]];
}
return self;
}
添加operation
- (void) addOperation: (NSOperation *)op
{// 异常处理
if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object is not an NSOperation",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
[internal->lock lock];
// 如果任务还没添加到数组并且任务还没开始执行
if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]&& NO == [op isFinished])
{
[op addObserver: self
forKeyPath: @"isReady"
options: NSKeyValueObservingOptionNew
context: isReadyCtxt]; // 监听是否准备好
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->operations addObject: op];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
if (YES == [op isReady]) // 如果准备好了,手动发送KVO通知
{
[self observeValueForKeyPath: @"isReady"
ofObject: op
change: nil
context: isReadyCtxt];
}
}
[internal->lock unlock];
}
添加操作,并是否阻塞当前线程直到结束
- (void) addOperations: (NSArray *)ops
waitUntilFinished: (BOOL)shouldWait
{
NSUInteger total;
NSUInteger index;
// 异常判断
if (ops == nil || NO == [ops isKindOfClass: [NSArray class]])
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object is not an NSArray",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
// 总的任务数量
total = [ops count];
if (total > 0)
{
BOOL invalidArg = NO;
NSUInteger toAdd = total;
// 创建一个新的数组存储operation
GS_BEGINITEMBUF(buf, total, id)
// 获取传进来的 ops 放到 buf里面,之后操作 buf就可以了
[ops getObjects: buf];
//
for (index = 0; index < total; index++)
{
NSOperation *op = buf[index];
// 如果有任务 不是 NSOperation,就默认没有需要添加的任务,后续抛出异常
if (NO == [op isKindOfClass: [NSOperation class]])
{
invalidArg = YES;
toAdd = 0;
break;
}
// 去除已经完成的任务
if (YES == [op isFinished])
{
buf[index] = nil;
toAdd--;
}
}
// 如果要添加的大于0
if (toAdd > 0)
{
// 加锁
[internal->lock lock];
[self willChangeValueForKey: @"operationCount"];
[self willChangeValueForKey: @"operations"];
for (index = 0; index < total; index++)
{
NSOperation *op = buf[index];
if (op == nil)
{
continue; // Not added
}
// 如果已经在 operations 数组里了,就 任务数-1 continue
if (NSNotFound
!= [internal->operations indexOfObjectIdenticalTo: op])
{
buf[index] = nil; // Not added
toAdd--;
continue;
}
//监听 op 的 isReady 状态
[op addObserver: self
forKeyPath: @"isReady"
options: NSKeyValueObservingOptionNew
context: isReadyCtxt];
// 添加操作
[internal->operations addObject: op];
if (NO == [op isReady])// 如果不是 ready 就删除,后续所有ready状态的都会直接调用kvo监听
{
buf[index] = nil; // Not yet ready
}
}
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
for (index = 0; index < total; index++)
{
NSOperation *op = buf[index];
if (op != nil)
{
// 直接调用kvo监听
[self observeValueForKeyPath: @"isReady"
ofObject: op
change: nil
context: isReadyCtxt];
}
}
[internal->lock unlock];
}
GS_ENDITEMBUF()
if (YES == invalidArg)// 抛出异常
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object at index %"PRIuPTR" is not an NSOperation",
NSStringFromClass([self class]), NSStringFromSelector(_cmd),
index];
}
}
// 如果需要监听到完成,就去给op加条件锁
if (YES == shouldWait)
{
[self waitUntilAllOperationsAreFinished];
}
}
给op加条件锁
- (void) waitUntilAllOperationsAreFinished
{
NSOperation *op;
[internal->lock lock];
while ((op = [internal->operations lastObject]) != nil)
{ //从后往前对所有的op加条件锁,
[op retain];
[internal->lock unlock];
[op waitUntilFinished];
[op release];
[internal->lock lock];
}
[internal->lock unlock];
}
设置最大线程数
- (void) setMaxConcurrentOperationCount: (NSInteger)cnt
{
// 异常判断
if (cnt < 0
&& cnt != NSOperationQueueDefaultMaxConcurrentOperationCount)
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] cannot set negative (%"PRIdPTR") count",
NSStringFromClass([self class]), NSStringFromSelector(_cmd), cnt];
}
[internal->lock lock];
//修改
if (cnt != internal->count)
{
[self willChangeValueForKey: @"maxConcurrentOperationCount"];
internal->count = cnt;
[self didChangeValueForKey: @"maxConcurrentOperationCount"];
}
[internal->lock unlock];
// 执行
[self _execute];
}
KVO监听回调,在添加Oeration的时候
- (void) observeValueForKeyPath: (NSString *)keyPath
ofObject: (id)object
change: (NSDictionary *)change
context: (void *)context
{
/* We observe three properties in sequence ...
* isReady (while we wait for an operation to be ready)
* queuePriority (when priority of a ready operation may change)
* isFinished (to see if an executing operation is over).
*/
if (context == isFinishedCtxt)
{
[internal->lock lock];
internal->executing--;
[object removeObserver: self forKeyPath: @"isFinished"];
[internal->lock unlock];
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->lock lock];
[internal->operations removeObjectIdenticalTo: object];
[internal->lock unlock];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
}
else if (context == queuePriorityCtxt || context == isReadyCtxt) // 如果是准备好了
{
NSInteger pos;
[internal->lock lock];
if (context == queuePriorityCtxt)
{
[internal->waiting removeObjectIdenticalTo: object];
}
if (context == isReadyCtxt)
{
[object removeObserver: self forKeyPath: @"isReady"]; // 移除监听准备状态
[object addObserver: self
forKeyPath: @"queuePriority"
options: NSKeyValueObservingOptionNew
context: queuePriorityCtxt]; // 监听优先级
}
pos = [internal->waiting insertionPosition: object
usingFunction: sortFunc
context: 0]; // 获得插入的顺序
[internal->waiting insertObject: object atIndex: pos]; // 插入到waiting里
[internal->lock unlock];
}
[self _execute]; // 开始执行?
}
执行任务代码,判断等待的任务和当前执行数,如果符合条件,取出来任务,同步或者开辟线程执行
/* Check for operations which can be executed and start them.
*/
- (void) _execute
{
NSInteger max;
[internal->lock lock];
max = [self maxConcurrentOperationCount]; // 最大执行数
if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
{
max = maxConcurrent; // 线程数 200
}
while (NO == [self isSuspended]
&& max > internal->executing
&& [internal->waiting count] > 0)
{ // 如果没有挂起,并且有等待的任务,并且正在执行的线程数小于 最大线程数
NSOperation *op;
/* Take the first operation from the queue and start it executing.
* We set ourselves up as an observer for the operating finishing
* and we keep track of the count of operations we have started,
* but the actual startup is left to the NSOperation -start method.
*/
op = [internal->waiting objectAtIndex: 0]; // 取代正在等待的任务
[internal->waiting removeObjectAtIndex: 0];
[op removeObserver: self forKeyPath: @"queuePriority"]; // 移除 queuePriority 监听
[op addObserver: self
forKeyPath: @"isFinished"
options: NSKeyValueObservingOptionNew
context: isFinishedCtxt]; // 监听完成
internal->executing++; // 正在执行数 +1
if (YES == [op isConcurrent]) // 如果是并发,直接执行
{
[op start]; // 执行任务
}
else
{
NSUInteger pending;
[internal->cond lock];
pending = [internal->starting count];
[internal->starting addObject: op]; // 添加到正在执行
/* Create a new thread if all existing threads are busy and
* we haven't reached the pool limit.
*/
if (0 == internal->threadCount
|| (pending > 0 && internal->threadCount < POOL))
{
internal->threadCount++; // 线程数 + 1
[NSThread detachNewThreadSelector: @selector(_thread)
toTarget: self
withObject: nil]; // 创建线程并且执行
}
/* Tell the thread pool that there is an operation to start.
*/
[internal->cond unlockWithCondition: 1];
}
}
[internal->lock unlock];
}
在子线程的执行任务的代码: 获取标记为start的任务,开始执行,直到start数组里没有任务了,退出线程
// 在子线程的执行任务的代码
- (void) _thread
{
NSAutoreleasePool *pool = [NSAutoreleasePool new]; // 自动释放池
[[[NSThread currentThread] threadDictionary] setObject: self
forKey: threadKey]; // 设置线程的 operationQueue 用以 currentQueue 去获取到
for (;;)
{
NSOperation *op;
NSDate *when;
BOOL found;
when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];// 五秒?
found = [internal->cond lockWhenCondition: 1 beforeDate: when]; // 条件锁等待五秒?
RELEASE(when);
if (NO == found)
{
break; // Idle for 5 seconds ... exit thread. // 如果没有等待到任务,就退出
}
if ([internal->starting count] > 0) // 如果有正在执行的任务
{
op = RETAIN([internal->starting objectAtIndex: 0]);// 获取第一个正在执行的任务
[internal->starting removeObjectAtIndex: 0]; // 移除任务
}
else
{
op = nil; // 没有正在执行的任务 op 置为nil
}
if ([internal->starting count] > 0) // 如果还有正在执行的任务
{
// Signal any other idle threads,
[internal->cond unlockWithCondition: 1]; // 解锁 1 条件 , 可以再次获取,再次执行,这个主要是解锁给其他线程用的,其他线程还可以取到任务
}
else
{
// There are no more operations starting.
[internal->cond unlockWithCondition: 0]; // 解锁 0 条件 , 这样其他线程可以等5秒,如果5秒没有任务,直接结束线程
}
if (nil != op) // 如果取到了任务,就开始执行任务
{
NS_DURING
{
NSAutoreleasePool *opPool = [NSAutoreleasePool new];
[NSThread setThreadPriority: [op threadPriority]]; // 设置线程优先级
[op start]; // 开始执行 op
RELEASE(opPool);
}
NS_HANDLER
{
NSLog(@"Problem running operation %@ ... %@",
op, localException);
}
NS_ENDHANDLER
[op _finish]; // 执行完成
RELEASE(op);
}
}
[[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey]; // 移除线程的队列属性
[internal->lock lock];
internal->threadCount--; // 线程数量-1
[internal->lock unlock];
RELEASE(pool);
[NSThread exit]; // 退出线程
}
总结; NSOperationQueue 负责监听Operation 的 ready,queuePriority,finlish.状态.
内部维护了三个数组operations,waiting,starting.
负责创建线程,执行操作.
但GUNStep的代码并没有针对 mainQueue 做特殊的处理.
网友评论