美文网首页
NSOperation源码浅析

NSOperation源码浅析

作者: FingerStyle | 来源:发表于2022-12-14 18:22 被阅读0次

    NSOperation是苹果官方提供的多线程解决方案,相比GCD,最大的特点是支持任务间的依赖和优先级的调整,这里我们结合GNU Step的源码来看下别人是怎么实现的。
    源码下载地址:https://github.com/gnustep/libs-base

    1.如何实现operationA 对B的依赖?

    首先把依赖的任务B插入A的dependencies 数组
    判断当前任务和要依赖的任务的状态,如果A或B已结束,或者A被取消或在执行中 则跳过。

    - (void) addDependency: (NSOperation *)op
    {
      if (NO == [op isKindOfClass: [NSOperation class]])
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] dependency is not an NSOperation",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
        }
      if (op == self)
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] attempt to add dependency on self",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
        }
      [internal->lock lock];
      if (internal->dependencies == nil)
        {
          internal->dependencies = [[NSMutableArray alloc] initWithCapacity: 5];
        }
      NS_DURING
        {
          if (NSNotFound == [internal->dependencies indexOfObjectIdenticalTo: op])
        {
          [self willChangeValueForKey: @"dependencies"];
              [internal->dependencies addObject: op];
          /* We only need to watch for changes if it's possible for them to
           * happen and make a difference.
           */
          if (NO == [op isFinished]
            && NO == [self isCancelled]
            && NO == [self isExecuting]
            && NO == [self isFinished])
            {
              /* Can change readiness if we are neither cancelled nor
               * executing nor finished.  So we need to observe for the
               * finish of the dependency.
               */
              [op addObserver: self
               forKeyPath: @"isFinished"
                  options: NSKeyValueObservingOptionNew
                  context: isFinishedCtxt];
              if (internal->ready == YES)
            {
              /* The new dependency stops us being ready ...
               * change state.
               */
              [self willChangeValueForKey: @"isReady"];
              internal->ready = NO;
              [self didChangeValueForKey: @"isReady"];
            }
            }
          [self didChangeValueForKey: @"dependencies"];
        }
        }
      NS_HANDLER
        {
          [internal->lock unlock];
          NSLog(@"Problem adding dependency: %@", localException);
          return;
        }
      NS_ENDHANDLER
      [internal->lock unlock];
    }
    

    然后通过KVO添加对operation B 的isReady状态监听,当B的isReady发生变化时,修改A的isReady状态为YES。

    - (void) observeValueForKeyPath: (NSString *)keyPath
                   ofObject: (id)object
                             change: (NSDictionary *)change
                            context: (void *)context
    {
      [internal->lock lock];
    
      /* We only observe isFinished changes, and we can remove self as an
       * observer once we know the operation has finished since it can never
       * become unfinished.
       */
      [object removeObserver: self forKeyPath: @"isFinished"];
    
      if (object == self)
        {
          /* We have finished and need to unlock the condition lock so that
           * any waiting thread can continue.
           */
          [internal->cond lock];
          [internal->cond unlockWithCondition: 1];
          [internal->lock unlock];
          return;
        }
    
      if (NO == internal->ready)
        {
          NSEnumerator  *en;
          NSOperation   *op;
    
          /* Some dependency has finished (or been removed) ...
           * so we need to check to see if we are now ready unless we know we are.
           * This is protected by locks so that an update due to an observed
           * change in one thread won't interrupt anything in another thread.
           */
          en = [internal->dependencies objectEnumerator];
          while ((op = [en nextObject]) != nil)
            {
              if (NO == [op isFinished])
            break;
            }
          if (op == nil)
        {
              [self willChangeValueForKey: @"isReady"];
          internal->ready = YES;
              [self didChangeValueForKey: @"isReady"];
        }
        }
      [internal->lock unlock];
    }
    

    同时operationQueue在add Operation的时候监听了operation的isReady状态,所以当前面A的isReady发生变化时,operationQueue也会得到通知

    - (void) addOperation: (NSOperation *)op
    {
      if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
        {
          [NSException raise: NSInvalidArgumentException
              format: @"[%@-%@] object is not an NSOperation",
        NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
        }
      [internal->lock lock];
      if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]
        && NO == [op isFinished])
        {
          [op addObserver: self
           forKeyPath: @"isReady"
              options: NSKeyValueObservingOptionNew
              context: isReadyCtxt];
          [self willChangeValueForKey: @"operations"];
          [self willChangeValueForKey: @"operationCount"];
          [internal->operations addObject: op];
          [self didChangeValueForKey: @"operationCount"];
          [self didChangeValueForKey: @"operations"];
          if (YES == [op isReady])
        {
          [self observeValueForKeyPath: @"isReady"
                      ofObject: op
                    change: nil
                       context: isReadyCtxt];
        }
        }
      [internal->lock unlock];
    }
    

    在operationQueue的KVO回调方法里面,会移除isReady的监听,添加一个对operationA 优先级变化的监听,然后找到一个适合operationA位置并插入到waiting数组里面,然后执行_execute方法。

    //NSOperationQueue (Private)
    - (void) observeValueForKeyPath: (NSString *)keyPath
                   ofObject: (id)object
                             change: (NSDictionary *)change
                            context: (void *)context
    {
      /* We observe three properties in sequence ...
       * isReady (while we wait for an operation to be ready)
       * queuePriority (when priority of a ready operation may change)
       * isFinished (to see if an executing operation is over).
       */
      if (context == isFinishedCtxt)
        {
          [internal->lock lock];
          internal->executing--;
          [object removeObserver: self forKeyPath: @"isFinished"];
          [internal->lock unlock];
          [self willChangeValueForKey: @"operations"];
          [self willChangeValueForKey: @"operationCount"];
          [internal->lock lock];
          [internal->operations removeObjectIdenticalTo: object];
          [internal->lock unlock];
          [self didChangeValueForKey: @"operationCount"];
          [self didChangeValueForKey: @"operations"];
        }
      else if (context == queuePriorityCtxt || context == isReadyCtxt)
        {
          NSInteger pos;
    
          [internal->lock lock];
          if (context == queuePriorityCtxt)
            {
              [internal->waiting removeObjectIdenticalTo: object];
            }
          if (context == isReadyCtxt)
            {
              [object removeObserver: self forKeyPath: @"isReady"];
              [object addObserver: self
                       forKeyPath: @"queuePriority"
                          options: NSKeyValueObservingOptionNew
                          context: queuePriorityCtxt];
            }
          pos = [internal->waiting insertionPosition: object
                                       usingFunction: sortFunc
                                             context: 0];
          [internal->waiting insertObject: object atIndex: pos];
          [internal->lock unlock];
        }
      [self _execute];
    }
    

    2.放进operationQueue的任务当isReady状态改变后是立刻执行吗?
    不一定,参考_execute方法:
    首先进入一个while循环,判断operationQuque是否为suspend状态,是否超过当前最大并发数,以及waiting数组的数量是否<=0,如果都不是,则拿出Waiting 数组里面的第一个任务,同时移除对该任务优先级的监听,添加对该任务isFinished的监听,增加executing的计数。 然后,看要执行的任务是否支持并发,如果支持,则调用该任务的start方法开始执行,如果不支持,则先把当前任务加到starting数组内,然后看当前正在使用的线程数是否大于线程池内的线程数量(默认是8),如果不超过,则开启一个新线程,并执行_thread方法。

    //NSOperationQueue (Private)
    /* Check for operations which can be executed and start them.
     */
    - (void) _execute
    {
      NSInteger max;
    
      [internal->lock lock];
    
      max = [self maxConcurrentOperationCount];
      if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
        {
          max = maxConcurrent;
        }
    
      NS_DURING
      while (NO == [self isSuspended]
        && max > internal->executing
        && [internal->waiting count] > 0)
        {
          NSOperation   *op;
    
          /* Take the first operation from the queue and start it executing.
           * We set ourselves up as an observer for the operating finishing
           * and we keep track of the count of operations we have started,
           * but the actual startup is left to the NSOperation -start method.
           */
          op = [internal->waiting objectAtIndex: 0];
          [internal->waiting removeObjectAtIndex: 0];
          [op removeObserver: self forKeyPath: @"queuePriority"];
          [op addObserver: self
           forKeyPath: @"isFinished"
              options: NSKeyValueObservingOptionNew
              context: isFinishedCtxt];
          internal->executing++;
          if (YES == [op isConcurrent])
        {
              [op start];
        }
          else
        {
          NSUInteger    pending;
    
          [internal->cond lock];
          pending = [internal->starting count];
          [internal->starting addObject: op];
    
          /* Create a new thread if all existing threads are busy and
           * we haven't reached the pool limit.
           */
          if (0 == internal->threadCount
            || (pending > 0 && internal->threadCount < POOL))
            {
              internal->threadCount++;
              NS_DURING
            {
              [NSThread detachNewThreadSelector: @selector(_thread)
                           toTarget: self
                         withObject: nil];
            }
              NS_HANDLER
            {
              NSLog(@"Failed to create thread for %@: %@",
                self, localException);
            }
              NS_ENDHANDLER
            }
          /* Tell the thread pool that there is an operation to start.
           */
          [internal->cond unlockWithCondition: 1];
        }
        }
      NS_HANDLER
        {
          [internal->lock unlock];
          [localException raise];
        }
      NS_ENDHANDLER
      [internal->lock unlock];
    }
    

    _thread方法:
    首先会创建一个自动释放池(因为开启了一个新线程),然后进入一个无限循环,在循环体里面会遍历starting数组,每次循环取出第一个任务,设置好他的线程优先级然后执行,执行完后调用该任务的_finish 方法修改执行状态。最后再销毁自动释放池。这个循环的退出条件是循环开始执行后5秒内没有要执行的任务(starting数组为空),内部是通过一个条件锁来控制这个时间的。

    //NSOperationQueue (Private)
    - (void) _thread
    {
      CREATE_AUTORELEASE_POOL(arp);
    
      [[[NSThread currentThread] threadDictionary] setObject: self
                                                      forKey: threadKey];
      for (;;)
        {
          NSOperation   *op;
          NSDate        *when;
          BOOL      found;
          /* We use a pool for each operation in case releasing the operation
           * causes it to be deallocated, and the deallocation of the operation
           * autoreleases something which needs to be cleaned up.
           */
          RECREATE_AUTORELEASE_POOL(arp);
    
          when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];
          found = [internal->cond lockWhenCondition: 1 beforeDate: when];
          RELEASE(when);
          if (NO == found)
        {
          break;    // Idle for 5 seconds ... exit thread.
        }
    
          if ([internal->starting count] > 0)
        {
              op = RETAIN([internal->starting objectAtIndex: 0]);
          [internal->starting removeObjectAtIndex: 0];
        }
          else
        {
          op = nil;
        }
    
          if ([internal->starting count] > 0)
        {
          // Signal any other idle threads,
              [internal->cond unlockWithCondition: 1];
        }
          else
        {
          // There are no more operations starting.
              [internal->cond unlockWithCondition: 0];
        }
    
          if (nil != op)
        {
              NS_DURING
            {
              ENTER_POOL
                  [NSThread setThreadPriority: [op threadPriority]];
                  [op start];
              LEAVE_POOL
            }
              NS_HANDLER
            {
              NSLog(@"Problem running operation %@ ... %@",
            op, localException);
            }
              NS_ENDHANDLER
          [op _finish];
              RELEASE(op);
        }
        }
    
      [[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey];
      [internal->lock lock];
      internal->threadCount--;
      [internal->lock unlock];
      DESTROY(arp);
      [NSThread exit];
    }
    

    由此可见,如果任务支持并发,并且operationQueue没有suspend的话,会马上执行该任务。否则会看是否有可用的线程,如果有会在新开启的线程立刻执行,如果没有,则暂存在starting数组中等待执行,当任意一个线程执行完当前任务就会接着执行这个排队的任务。

    查阅源码的时候我们会发现到处都有 internal 这个变量,这个变量实际就是self, 只是为了方便内存管理,封装成了一个类,我们直接当成self理解就可以。

    相关文章

      网友评论

          本文标题:NSOperation源码浅析

          本文链接:https://www.haomeiwen.com/subject/zydlqdtx.html