美文网首页
NSOperationQueue GUNStep源码

NSOperationQueue GUNStep源码

作者: forping | 来源:发表于2020-11-17 10:34 被阅读0次

    NSOperation GUNStep源码
    NSInvocationOperation GUNStep源码
    NSOperationQueue GUNStep源码

    首先是 initialize 方法,在类第一次使用的时候,创建一个实例当做主队列.

    + (void) initialize
    {
      if (nil == mainQueue)
        {
          mainQueue = [self new];
        }
    }
    

    获取当前队列

    + (id) currentQueue
    {
      if ([NSThread isMainThread]) // 如果是主线程
        {
          return mainQueue; // 返回主队列
        }
    // 返回当前线程的 队列信息
      return [[[NSThread currentThread] threadDictionary] objectForKey: threadKey]; 
    }
    

    init方法

    - (id) init
    {
      if ((self = [super init]) != nil)
        {
          GS_CREATE_INTERNAL(NSOperationQueue);
          internal->suspended = NO; 默认没有暂停
          internal->count = NSOperationQueueDefaultMaxConcurrentOperationCount; // 最大并发数
          internal->operations = [NSMutableArray new]; // 操作s
          internal->starting = [NSMutableArray new]; // 正在进行的操作s
          internal->waiting = [NSMutableArray new]; // 等待的操作s
          internal->lock = [NSRecursiveLock new];
          [internal->lock setName:
            [NSString stringWithFormat: @"lock-for-op-%p", self]];
          internal->cond = [[NSConditionLock alloc] initWithCondition: 0];
          [internal->cond setName:
            [NSString stringWithFormat: @"cond-for-op-%p", self]];
        }
      return self;
    }
    

    添加operation

    - (void) addOperation: (NSOperation *)op
    {// 异常处理
      if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] object is not an NSOperation",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
        }
      [internal->lock lock];
    // 如果任务还没添加到数组并且任务还没开始执行
      if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]&& NO == [op isFinished]) 
        {
          [op addObserver: self
           forKeyPath: @"isReady"
              options: NSKeyValueObservingOptionNew
              context: isReadyCtxt]; // 监听是否准备好
          [self willChangeValueForKey: @"operations"];
          [self willChangeValueForKey: @"operationCount"];
          [internal->operations addObject: op];
          [self didChangeValueForKey: @"operationCount"];
          [self didChangeValueForKey: @"operations"];
          if (YES == [op isReady]) // 如果准备好了,手动发送KVO通知
        {
          [self observeValueForKeyPath: @"isReady"
                      ofObject: op
                    change: nil
                       context: isReadyCtxt];
        }
        }
      [internal->lock unlock];
    }
    

    添加操作,并是否阻塞当前线程直到结束

    - (void) addOperations: (NSArray *)ops
         waitUntilFinished: (BOOL)shouldWait
    {
      NSUInteger    total;
      NSUInteger    index;
    
        // 异常判断
      if (ops == nil || NO == [ops isKindOfClass: [NSArray class]])
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] object is not an NSArray",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
        }
        // 总的任务数量
      total = [ops count];
        
      if (total > 0)
        {
          BOOL      invalidArg = NO;
          NSUInteger    toAdd = total;
            // 创建一个新的数组存储operation
          GS_BEGINITEMBUF(buf, total, id)
    
            // 获取传进来的 ops 放到 buf里面,之后操作 buf就可以了
          [ops getObjects: buf];
            //
          for (index = 0; index < total; index++)
        {
          NSOperation   *op = buf[index];
    
            // 如果有任务 不是 NSOperation,就默认没有需要添加的任务,后续抛出异常
          if (NO == [op isKindOfClass: [NSOperation class]])
            {
              invalidArg = YES;
              toAdd = 0;
              break;
            }
            // 去除已经完成的任务
          if (YES == [op isFinished])
            {
              buf[index] = nil;
              toAdd--;
            }
        }
            // 如果要添加的大于0
          if (toAdd > 0)
        {
            // 加锁
              [internal->lock lock];
          [self willChangeValueForKey: @"operationCount"];
          [self willChangeValueForKey: @"operations"];
            
          for (index = 0; index < total; index++)
            {
              NSOperation   *op = buf[index];
    
              if (op == nil)
            {
              continue;     // Not added
            }
                // 如果已经在 operations 数组里了,就 任务数-1 continue
              if (NSNotFound
            != [internal->operations indexOfObjectIdenticalTo: op])
            {
              buf[index] = nil; // Not added
              toAdd--;
              continue;
            }
                
                //监听 op 的 isReady 状态
              [op addObserver: self
               forKeyPath: @"isReady"
                  options: NSKeyValueObservingOptionNew
                  context: isReadyCtxt];
                // 添加操作
              [internal->operations addObject: op];
              if (NO == [op isReady])// 如果不是 ready 就删除,后续所有ready状态的都会直接调用kvo监听
            {
              buf[index] = nil; // Not yet ready
            }
            }
          [self didChangeValueForKey: @"operationCount"];
          [self didChangeValueForKey: @"operations"];
            
          for (index = 0; index < total; index++)
            {
              NSOperation   *op = buf[index];
    
              if (op != nil)
            {
                // 直接调用kvo监听
              [self observeValueForKeyPath: @"isReady"
                          ofObject: op
                        change: nil
                           context: isReadyCtxt];
            }
            }
              [internal->lock unlock];
        }
          GS_ENDITEMBUF()
          if (YES == invalidArg)// 抛出异常
        {
          [NSException raise: NSInvalidArgumentException
            format: @"[%@-%@] object at index %"PRIuPTR" is not an NSOperation",
            NSStringFromClass([self class]), NSStringFromSelector(_cmd),
            index];
        }
        }
        // 如果需要监听到完成,就去给op加条件锁
      if (YES == shouldWait)
        {
          [self waitUntilAllOperationsAreFinished];
        }
    }
    

    给op加条件锁

    - (void) waitUntilAllOperationsAreFinished
    {
      NSOperation   *op;
    
      [internal->lock lock];
      while ((op = [internal->operations lastObject]) != nil)
        { //从后往前对所有的op加条件锁, 
          [op retain];
          [internal->lock unlock];
          [op waitUntilFinished];
          [op release];
          [internal->lock lock];
        }
      [internal->lock unlock];
    }
    

    设置最大线程数

    - (void) setMaxConcurrentOperationCount: (NSInteger)cnt
    {
    // 异常判断
      if (cnt < 0
        && cnt != NSOperationQueueDefaultMaxConcurrentOperationCount)
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] cannot set negative (%"PRIdPTR") count",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd), cnt];
        }
      [internal->lock lock];
    //修改
      if (cnt != internal->count)
        {
          [self willChangeValueForKey: @"maxConcurrentOperationCount"];
          internal->count = cnt;
          [self didChangeValueForKey: @"maxConcurrentOperationCount"];
        }
      [internal->lock unlock];
    // 执行
      [self _execute];
    }
    

    KVO监听回调,在添加Oeration的时候

    - (void) observeValueForKeyPath: (NSString *)keyPath
                   ofObject: (id)object
                             change: (NSDictionary *)change
                            context: (void *)context
    {
      /* We observe three properties in sequence ...
       * isReady (while we wait for an operation to be ready)
       * queuePriority (when priority of a ready operation may change)
       * isFinished (to see if an executing operation is over).
       */
      if (context == isFinishedCtxt)
        {
          [internal->lock lock];
          internal->executing--;
          [object removeObserver: self forKeyPath: @"isFinished"];
          [internal->lock unlock];
          [self willChangeValueForKey: @"operations"];
          [self willChangeValueForKey: @"operationCount"];
          [internal->lock lock];
          [internal->operations removeObjectIdenticalTo: object];
          [internal->lock unlock];
          [self didChangeValueForKey: @"operationCount"];
          [self didChangeValueForKey: @"operations"];
        }
      else if (context == queuePriorityCtxt || context == isReadyCtxt) // 如果是准备好了
        {
          NSInteger pos;
    
          [internal->lock lock];
          if (context == queuePriorityCtxt)
            {
              [internal->waiting removeObjectIdenticalTo: object];
            }
          if (context == isReadyCtxt)
            {
              [object removeObserver: self forKeyPath: @"isReady"]; // 移除监听准备状态
              [object addObserver: self
                       forKeyPath: @"queuePriority"
                          options: NSKeyValueObservingOptionNew
                          context: queuePriorityCtxt]; // 监听优先级
            }
          pos = [internal->waiting insertionPosition: object
                                       usingFunction: sortFunc
                                             context: 0]; // 获得插入的顺序
          [internal->waiting insertObject: object atIndex: pos]; // 插入到waiting里
            
          [internal->lock unlock];
        }
      [self _execute]; // 开始执行?
    }
    

    执行任务代码,判断等待的任务和当前执行数,如果符合条件,取出来任务,同步或者开辟线程执行

    /* Check for operations which can be executed and start them.
     */
    - (void) _execute
    {
      NSInteger max;
    
      [internal->lock lock];
    
      max = [self maxConcurrentOperationCount]; // 最大执行数
      if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
        {
          max = maxConcurrent; // 线程数 200
        }
    
      while (NO == [self isSuspended]
        && max > internal->executing
        && [internal->waiting count] > 0)
        { // 如果没有挂起,并且有等待的任务,并且正在执行的线程数小于 最大线程数
          NSOperation   *op;
    
          /* Take the first operation from the queue and start it executing.
           * We set ourselves up as an observer for the operating finishing
           * and we keep track of the count of operations we have started,
           * but the actual startup is left to the NSOperation -start method.
           */
          op = [internal->waiting objectAtIndex: 0]; // 取代正在等待的任务
          [internal->waiting removeObjectAtIndex: 0];
          [op removeObserver: self forKeyPath: @"queuePriority"]; // 移除 queuePriority 监听
          [op addObserver: self
           forKeyPath: @"isFinished"
              options: NSKeyValueObservingOptionNew
              context: isFinishedCtxt]; // 监听完成
          internal->executing++; // 正在执行数 +1
          if (YES == [op isConcurrent]) // 如果是并发,直接执行
        {
              [op start]; // 执行任务
        }
          else
        {
          NSUInteger    pending;
    
          [internal->cond lock];
          pending = [internal->starting count];
          [internal->starting addObject: op]; // 添加到正在执行
    
          /* Create a new thread if all existing threads are busy and
           * we haven't reached the pool limit.
           */
          if (0 == internal->threadCount
            || (pending > 0 && internal->threadCount < POOL))
            {
              internal->threadCount++; // 线程数 + 1
              [NSThread detachNewThreadSelector: @selector(_thread)
                           toTarget: self
                         withObject: nil]; // 创建线程并且执行
            }
          /* Tell the thread pool that there is an operation to start.
           */
          [internal->cond unlockWithCondition: 1];
        }
        }
      [internal->lock unlock];
    }
    

    在子线程的执行任务的代码: 获取标记为start的任务,开始执行,直到start数组里没有任务了,退出线程
    // 在子线程的执行任务的代码

    - (void) _thread
    {
      NSAutoreleasePool *pool = [NSAutoreleasePool new]; // 自动释放池
    
      [[[NSThread currentThread] threadDictionary] setObject: self
                                                      forKey: threadKey]; // 设置线程的 operationQueue 用以 currentQueue 去获取到
      for (;;)
        {
          NSOperation   *op;
          NSDate        *when;
          BOOL      found;
            
          when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];// 五秒?
          found = [internal->cond lockWhenCondition: 1 beforeDate: when]; // 条件锁等待五秒?
          RELEASE(when);
          if (NO == found)
        {
          break;    // Idle for 5 seconds ... exit thread. // 如果没有等待到任务,就退出
        }
    
          if ([internal->starting count] > 0) // 如果有正在执行的任务
        {
              op = RETAIN([internal->starting objectAtIndex: 0]);// 获取第一个正在执行的任务
          [internal->starting removeObjectAtIndex: 0]; // 移除任务
        }
          else
        {
          op = nil; // 没有正在执行的任务 op 置为nil
        }
    
          if ([internal->starting count] > 0) // 如果还有正在执行的任务
        {
          // Signal any other idle threads,
              [internal->cond unlockWithCondition: 1]; // 解锁 1 条件 , 可以再次获取,再次执行,这个主要是解锁给其他线程用的,其他线程还可以取到任务
        }
          else
        {
          // There are no more operations starting.
              [internal->cond unlockWithCondition: 0]; // 解锁 0 条件 , 这样其他线程可以等5秒,如果5秒没有任务,直接结束线程
        }
    
          if (nil != op) // 如果取到了任务,就开始执行任务
        {
              NS_DURING
            {
              NSAutoreleasePool *opPool = [NSAutoreleasePool new];
    
                  [NSThread setThreadPriority: [op threadPriority]]; // 设置线程优先级
                  [op start]; // 开始执行 op
              RELEASE(opPool);
            }
              NS_HANDLER
            {
              NSLog(@"Problem running operation %@ ... %@",
            op, localException);
            }
              NS_ENDHANDLER
          [op _finish]; // 执行完成
              RELEASE(op);
        }
        }
    
      [[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey]; // 移除线程的队列属性
      [internal->lock lock];
      internal->threadCount--; // 线程数量-1
      [internal->lock unlock];
      RELEASE(pool);
      [NSThread exit]; // 退出线程
    }
    

    总结; NSOperationQueue 负责监听Operation 的 ready,queuePriority,finlish.状态.
    内部维护了三个数组operations,waiting,starting.
    负责创建线程,执行操作.

    但GUNStep的代码并没有针对 mainQueue 做特殊的处理.

    相关文章

      网友评论

          本文标题:NSOperationQueue GUNStep源码

          本文链接:https://www.haomeiwen.com/subject/gmaabktx.html