美文网首页
NSOperationQueue GUNStep源码

NSOperationQueue GUNStep源码

作者: forping | 来源:发表于2020-11-17 10:34 被阅读0次

NSOperation GUNStep源码
NSInvocationOperation GUNStep源码
NSOperationQueue GUNStep源码

首先是 initialize 方法,在类第一次使用的时候,创建一个实例当做主队列.

+ (void) initialize
{
  if (nil == mainQueue)
    {
      mainQueue = [self new];
    }
}

获取当前队列

+ (id) currentQueue
{
  if ([NSThread isMainThread]) // 如果是主线程
    {
      return mainQueue; // 返回主队列
    }
// 返回当前线程的 队列信息
  return [[[NSThread currentThread] threadDictionary] objectForKey: threadKey]; 
}

init方法

- (id) init
{
  if ((self = [super init]) != nil)
    {
      GS_CREATE_INTERNAL(NSOperationQueue);
      internal->suspended = NO; 默认没有暂停
      internal->count = NSOperationQueueDefaultMaxConcurrentOperationCount; // 最大并发数
      internal->operations = [NSMutableArray new]; // 操作s
      internal->starting = [NSMutableArray new]; // 正在进行的操作s
      internal->waiting = [NSMutableArray new]; // 等待的操作s
      internal->lock = [NSRecursiveLock new];
      [internal->lock setName:
        [NSString stringWithFormat: @"lock-for-op-%p", self]];
      internal->cond = [[NSConditionLock alloc] initWithCondition: 0];
      [internal->cond setName:
        [NSString stringWithFormat: @"cond-for-op-%p", self]];
    }
  return self;
}

添加operation

- (void) addOperation: (NSOperation *)op
{// 异常处理
  if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] object is not an NSOperation",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
    }
  [internal->lock lock];
// 如果任务还没添加到数组并且任务还没开始执行
  if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]&& NO == [op isFinished]) 
    {
      [op addObserver: self
       forKeyPath: @"isReady"
          options: NSKeyValueObservingOptionNew
          context: isReadyCtxt]; // 监听是否准备好
      [self willChangeValueForKey: @"operations"];
      [self willChangeValueForKey: @"operationCount"];
      [internal->operations addObject: op];
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
      if (YES == [op isReady]) // 如果准备好了,手动发送KVO通知
    {
      [self observeValueForKeyPath: @"isReady"
                  ofObject: op
                change: nil
                   context: isReadyCtxt];
    }
    }
  [internal->lock unlock];
}

添加操作,并是否阻塞当前线程直到结束

- (void) addOperations: (NSArray *)ops
     waitUntilFinished: (BOOL)shouldWait
{
  NSUInteger    total;
  NSUInteger    index;

    // 异常判断
  if (ops == nil || NO == [ops isKindOfClass: [NSArray class]])
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] object is not an NSArray",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
    }
    // 总的任务数量
  total = [ops count];
    
  if (total > 0)
    {
      BOOL      invalidArg = NO;
      NSUInteger    toAdd = total;
        // 创建一个新的数组存储operation
      GS_BEGINITEMBUF(buf, total, id)

        // 获取传进来的 ops 放到 buf里面,之后操作 buf就可以了
      [ops getObjects: buf];
        //
      for (index = 0; index < total; index++)
    {
      NSOperation   *op = buf[index];

        // 如果有任务 不是 NSOperation,就默认没有需要添加的任务,后续抛出异常
      if (NO == [op isKindOfClass: [NSOperation class]])
        {
          invalidArg = YES;
          toAdd = 0;
          break;
        }
        // 去除已经完成的任务
      if (YES == [op isFinished])
        {
          buf[index] = nil;
          toAdd--;
        }
    }
        // 如果要添加的大于0
      if (toAdd > 0)
    {
        // 加锁
          [internal->lock lock];
      [self willChangeValueForKey: @"operationCount"];
      [self willChangeValueForKey: @"operations"];
        
      for (index = 0; index < total; index++)
        {
          NSOperation   *op = buf[index];

          if (op == nil)
        {
          continue;     // Not added
        }
            // 如果已经在 operations 数组里了,就 任务数-1 continue
          if (NSNotFound
        != [internal->operations indexOfObjectIdenticalTo: op])
        {
          buf[index] = nil; // Not added
          toAdd--;
          continue;
        }
            
            //监听 op 的 isReady 状态
          [op addObserver: self
           forKeyPath: @"isReady"
              options: NSKeyValueObservingOptionNew
              context: isReadyCtxt];
            // 添加操作
          [internal->operations addObject: op];
          if (NO == [op isReady])// 如果不是 ready 就删除,后续所有ready状态的都会直接调用kvo监听
        {
          buf[index] = nil; // Not yet ready
        }
        }
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
        
      for (index = 0; index < total; index++)
        {
          NSOperation   *op = buf[index];

          if (op != nil)
        {
            // 直接调用kvo监听
          [self observeValueForKeyPath: @"isReady"
                      ofObject: op
                    change: nil
                       context: isReadyCtxt];
        }
        }
          [internal->lock unlock];
    }
      GS_ENDITEMBUF()
      if (YES == invalidArg)// 抛出异常
    {
      [NSException raise: NSInvalidArgumentException
        format: @"[%@-%@] object at index %"PRIuPTR" is not an NSOperation",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd),
        index];
    }
    }
    // 如果需要监听到完成,就去给op加条件锁
  if (YES == shouldWait)
    {
      [self waitUntilAllOperationsAreFinished];
    }
}

给op加条件锁

- (void) waitUntilAllOperationsAreFinished
{
  NSOperation   *op;

  [internal->lock lock];
  while ((op = [internal->operations lastObject]) != nil)
    { //从后往前对所有的op加条件锁, 
      [op retain];
      [internal->lock unlock];
      [op waitUntilFinished];
      [op release];
      [internal->lock lock];
    }
  [internal->lock unlock];
}

设置最大线程数

- (void) setMaxConcurrentOperationCount: (NSInteger)cnt
{
// 异常判断
  if (cnt < 0
    && cnt != NSOperationQueueDefaultMaxConcurrentOperationCount)
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] cannot set negative (%"PRIdPTR") count",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd), cnt];
    }
  [internal->lock lock];
//修改
  if (cnt != internal->count)
    {
      [self willChangeValueForKey: @"maxConcurrentOperationCount"];
      internal->count = cnt;
      [self didChangeValueForKey: @"maxConcurrentOperationCount"];
    }
  [internal->lock unlock];
// 执行
  [self _execute];
}

KVO监听回调,在添加Oeration的时候

- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  /* We observe three properties in sequence ...
   * isReady (while we wait for an operation to be ready)
   * queuePriority (when priority of a ready operation may change)
   * isFinished (to see if an executing operation is over).
   */
  if (context == isFinishedCtxt)
    {
      [internal->lock lock];
      internal->executing--;
      [object removeObserver: self forKeyPath: @"isFinished"];
      [internal->lock unlock];
      [self willChangeValueForKey: @"operations"];
      [self willChangeValueForKey: @"operationCount"];
      [internal->lock lock];
      [internal->operations removeObjectIdenticalTo: object];
      [internal->lock unlock];
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
    }
  else if (context == queuePriorityCtxt || context == isReadyCtxt) // 如果是准备好了
    {
      NSInteger pos;

      [internal->lock lock];
      if (context == queuePriorityCtxt)
        {
          [internal->waiting removeObjectIdenticalTo: object];
        }
      if (context == isReadyCtxt)
        {
          [object removeObserver: self forKeyPath: @"isReady"]; // 移除监听准备状态
          [object addObserver: self
                   forKeyPath: @"queuePriority"
                      options: NSKeyValueObservingOptionNew
                      context: queuePriorityCtxt]; // 监听优先级
        }
      pos = [internal->waiting insertionPosition: object
                                   usingFunction: sortFunc
                                         context: 0]; // 获得插入的顺序
      [internal->waiting insertObject: object atIndex: pos]; // 插入到waiting里
        
      [internal->lock unlock];
    }
  [self _execute]; // 开始执行?
}

执行任务代码,判断等待的任务和当前执行数,如果符合条件,取出来任务,同步或者开辟线程执行

/* Check for operations which can be executed and start them.
 */
- (void) _execute
{
  NSInteger max;

  [internal->lock lock];

  max = [self maxConcurrentOperationCount]; // 最大执行数
  if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
    {
      max = maxConcurrent; // 线程数 200
    }

  while (NO == [self isSuspended]
    && max > internal->executing
    && [internal->waiting count] > 0)
    { // 如果没有挂起,并且有等待的任务,并且正在执行的线程数小于 最大线程数
      NSOperation   *op;

      /* Take the first operation from the queue and start it executing.
       * We set ourselves up as an observer for the operating finishing
       * and we keep track of the count of operations we have started,
       * but the actual startup is left to the NSOperation -start method.
       */
      op = [internal->waiting objectAtIndex: 0]; // 取代正在等待的任务
      [internal->waiting removeObjectAtIndex: 0];
      [op removeObserver: self forKeyPath: @"queuePriority"]; // 移除 queuePriority 监听
      [op addObserver: self
       forKeyPath: @"isFinished"
          options: NSKeyValueObservingOptionNew
          context: isFinishedCtxt]; // 监听完成
      internal->executing++; // 正在执行数 +1
      if (YES == [op isConcurrent]) // 如果是并发,直接执行
    {
          [op start]; // 执行任务
    }
      else
    {
      NSUInteger    pending;

      [internal->cond lock];
      pending = [internal->starting count];
      [internal->starting addObject: op]; // 添加到正在执行

      /* Create a new thread if all existing threads are busy and
       * we haven't reached the pool limit.
       */
      if (0 == internal->threadCount
        || (pending > 0 && internal->threadCount < POOL))
        {
          internal->threadCount++; // 线程数 + 1
          [NSThread detachNewThreadSelector: @selector(_thread)
                       toTarget: self
                     withObject: nil]; // 创建线程并且执行
        }
      /* Tell the thread pool that there is an operation to start.
       */
      [internal->cond unlockWithCondition: 1];
    }
    }
  [internal->lock unlock];
}

在子线程的执行任务的代码: 获取标记为start的任务,开始执行,直到start数组里没有任务了,退出线程
// 在子线程的执行任务的代码

- (void) _thread
{
  NSAutoreleasePool *pool = [NSAutoreleasePool new]; // 自动释放池

  [[[NSThread currentThread] threadDictionary] setObject: self
                                                  forKey: threadKey]; // 设置线程的 operationQueue 用以 currentQueue 去获取到
  for (;;)
    {
      NSOperation   *op;
      NSDate        *when;
      BOOL      found;
        
      when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];// 五秒?
      found = [internal->cond lockWhenCondition: 1 beforeDate: when]; // 条件锁等待五秒?
      RELEASE(when);
      if (NO == found)
    {
      break;    // Idle for 5 seconds ... exit thread. // 如果没有等待到任务,就退出
    }

      if ([internal->starting count] > 0) // 如果有正在执行的任务
    {
          op = RETAIN([internal->starting objectAtIndex: 0]);// 获取第一个正在执行的任务
      [internal->starting removeObjectAtIndex: 0]; // 移除任务
    }
      else
    {
      op = nil; // 没有正在执行的任务 op 置为nil
    }

      if ([internal->starting count] > 0) // 如果还有正在执行的任务
    {
      // Signal any other idle threads,
          [internal->cond unlockWithCondition: 1]; // 解锁 1 条件 , 可以再次获取,再次执行,这个主要是解锁给其他线程用的,其他线程还可以取到任务
    }
      else
    {
      // There are no more operations starting.
          [internal->cond unlockWithCondition: 0]; // 解锁 0 条件 , 这样其他线程可以等5秒,如果5秒没有任务,直接结束线程
    }

      if (nil != op) // 如果取到了任务,就开始执行任务
    {
          NS_DURING
        {
          NSAutoreleasePool *opPool = [NSAutoreleasePool new];

              [NSThread setThreadPriority: [op threadPriority]]; // 设置线程优先级
              [op start]; // 开始执行 op
          RELEASE(opPool);
        }
          NS_HANDLER
        {
          NSLog(@"Problem running operation %@ ... %@",
        op, localException);
        }
          NS_ENDHANDLER
      [op _finish]; // 执行完成
          RELEASE(op);
    }
    }

  [[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey]; // 移除线程的队列属性
  [internal->lock lock];
  internal->threadCount--; // 线程数量-1
  [internal->lock unlock];
  RELEASE(pool);
  [NSThread exit]; // 退出线程
}

总结; NSOperationQueue 负责监听Operation 的 ready,queuePriority,finlish.状态.
内部维护了三个数组operations,waiting,starting.
负责创建线程,执行操作.

但GUNStep的代码并没有针对 mainQueue 做特殊的处理.

相关文章

网友评论

      本文标题:NSOperationQueue GUNStep源码

      本文链接:https://www.haomeiwen.com/subject/gmaabktx.html