美文网首页今日看点iOS开发程序员
自己动手创建Dispatch Queue

自己动手创建Dispatch Queue

作者: Mad_Mark | 来源:发表于2016-12-04 01:13 被阅读197次

    这篇文章来自 @我就叫Sunny怎么了 推荐的Mike Ash的博客,主要是讲如何自己动手实现dispatch queue的基本功能。翻译过程中个别地方稍作调整。初次翻译,欢迎纠正!
    原文地址:[https://www.mikeash.com/pyblog/friday-qa-2015-09-04-lets-build-dispatch_queue.html]
    代码地址:[https://github.com/mikeash/MADispatchQueue]

    GCD是Apple近年来开发的很棒的API之一,在"Lets Bulid"系列最新的一期中,我将对dispatch_queue的最基础的特性做重新实现,这个主题由Rob Rix推荐。

    概览
    一个dispatch queue是一个由全局线程池支持的工作队列。典型的,工作提交到一个队列在后台线程异步执行。所有线程共享一个单一的后台线程池,这能使系统更高效。

    这是我要重现的这个API的本质。我为了简单会忽略GCD提供的高级的特性。比如,完成在全局池中增加以及减少线程数量的大量工作,以及系统对CPU的利用。如果你有一堆任务占用了CPU并且你提交了其他任务,GCD会避免为他创建其他的工作线程,因为此时CPU使用率已经达到100%,其他的线程工作会变得低效。我将跳过这点,对线程的数量使用硬编码。我也会略过其他的特性,像定位队列和封闭并发队列。

    我们的目标是关注dispatch queues的本质:串行和并发,他们能同步或者异步的派发任务,并且由一个共享的全局线程池支持。

    接口:
    GCD是一个C语言的API。虽然GCD对象已经在最近的OS版本中转换为OC的对象,但是API维持纯粹的C(附加苹果block的扩展)。这是一个很棒的低层API,GCD提供了非常清晰的接口,但是为了完成我的目标,我宁愿用OC来重现。
    OC类名为MADispatchQueue
    他只有4个调用方法:
    一个获取共享全局队列的方法。GCD有多个不同优先级的全局队列,但是我们为了简单只有一个。
    一个初始化方法,为了能创建并发或者串行的队列。
    一个异步的dispatch调用
    一个同步的dispatch调用
    方法的声明:

    @interface MADispatchQueue : NSObject 
    
    + (MADispatchQueue *)globalQueue; 
    - (id)initSerial: (BOOL)serial;
     - (void)dispatchAsync: (dispatch_block_t)block;
     - (void)dispatchSync: (dispatch_block_t)block;
     @end
     
    

    之后去完成他们所描述的要做的事情。

    线程池接口
    线程池有一个简单的接口支持队列。他将会做一些实际运行中的,已提交的任务的繁重工作。队列能够可靠的在一个正确的时机提交他们队列中的任务。线程池有一个单一的任务:提交一些工作来运行。因此接口只有一个方法:

    @interface MAThreadPool : NSObject 
    - (void)addBlock: (dispatch_block_t)block; 
    @end
    

    由于这是核心,所以让我们先实现他。

    线程池的实现
    先来看实例变量。线程池是能够被多线程访问的,包括内部和外部,并且需要线程安全。GCD脱离他自己的方法尽可能的使用快速原子化的操作,在我的重建中我坚持使用老式的锁。我需要这个锁能够等待以及发送信号,不只是实施互斥,所以我使用了NSCondition而不是一个普通的NSLock。如果你对他不熟悉,可以理解为:NSCondition基本上就是一个锁和一个单一条件变量的封装。
    NSCondition *_lock;

    为了知道何时自旋向上的新增工作线程,我需要知道线程池里有多少线程,有多少实际在工作的,以及线程的最大数量:

    NSUInteger _threadCount; 
    NSUInteger _activeThreadCount;
    NSUInteger _threadCountLimit;
    

    最终,有一串block去执行。使用NSMutableArray,通过在末尾添加新的block以及从开头移除来模拟一个队列。

    NSMutableArray *_blocks;
    

    初始化工作很简单。初始化锁,初始化block数组,使用一个任意数量来设置线程的数量限制,这里用128:

    - (id)init {
            if((self = [super init])) {
                _lock = [[NSCondition alloc] init];
                _blocks = [[NSMutableArray alloc] init];
                _threadCountLimit = 128;
            }
            return self;
        }
    

    工作的线程在一个简单的无限循环中运行,直到blocks数组为空,状态置为等待。一旦有可获取的block,这个block会从数组中出列并执行。当我们这样做的时候,将增加活动的线程数量,那么在结束时需要减少数量。

    - (void)workerThreadLoop: (id)ignore {
    

    第一件事是获取锁,记住这必须在循环开始之前。至于理由,在循环的结束时候你将会明白这点。

    [_lock lock];
    

    无限循环:

    while(1){
    

    如果队列为空,那么锁是等待状态:

    while([_blocks count] == 0) {
     [_lock wait]; 
    }
    

    记住这个需要通过循环完成,而不是一个if语句。理由可参考:[https://en.wikipedia.org/wiki/Spurious_wakeup]
    简单来说,wait这个状态即便没有signaled也有可能return,所以为了修正这种行为,当wait return时,需要重新检验条件。
    一旦block可以获得,让丫的出列:

    dispatch_block_t block = [_blocks firstObject];
    [_blocks removeObjectAtIndex: 0];
    

    通过增加活动线程数量来表明该线程正在活动:

    _activeThreadCount++;
    

    现在是时候执行block了,但是我们必须先释放锁,否则我们做不到并发,同时我们将会有各种各样好玩的死锁:

    [_lock unlock];
    

    锁安全的放手后,执行block:

    block();
    

    block结束后,是时候减少活动线程的数量了。这必须结合锁来完成,以避免资源竞争,循环的最后:

    [_lock lock];
                _activeThreadCount--;
            }
    }
    

    现在你能看到为什么在循环的最上层要获取锁。循环中做的最后一件事是减少活动线程的数量,这需要保持锁的状态。在循环的顶层第一件事是检查block的队列。通过在循环外执行第一个锁,后续重复的事情的所有操作能够使用一个单一的锁来操作,二不是锁,解锁,再锁…

    addBlock方法:

        - (void)addBlock: (dispatch_block_t)block {
    

    这里的每件事情都需要结合锁的获取来完成

            [_lock lock];
    

    第一个任务是添加一个新的block到block队列:

            [_blocks addObject: block];
    

    如果有一个闲置的线程准备取走这个block,那接下来没什么可做的了。如果没有足够的闲置线程来执行未完成的block,而且工作线程的数量还没有到上限,那么是时候创建一个新的线程了:

    NSUInteger idleThreads = _threadCount - _activeThreadCount;
    if([_blocks count] > idleThreads && _threadCount < _threadCountLimit) {
            [NSThread detachNewThreadSelector: @selector(workerThreadLoop:)
                                     toTarget: self
                                   withObject: nil];
            _threadCount++;
    }
    

    现在一个工作线程启动的所有准备工作已经完成。 假设他们都是沉睡状态,唤醒一个

    [_lock signal];
    

    然后释放锁就完成了

     [_lock unlock];    
    } 
    

    这为我们提供了一个线程池,来产出预先设定数量的工作线程,用于为进入的block服务。现在为这个队列做基础的实现。

    队列实现
    像线程池一样,队列将使用锁来保护他的内容。和线程池不一样的地方是,他不需要做任何等待或者发信号的动作,只是基本的互斥,所以我们使用普通的NSLock:

    NSLock *_lock;
    

    像线程池一样,他维护一个挂起的block的队列,使用NSMutableArray:

    NSMutableArray *_pendingBlocks;
    

    队列需要知道这是串行的还是并发的:

    BOOL _serial;
    

    当这个值为真,它还需要跟踪是否有一个block在线程池中运行:

    BOOL _serialRunning;
    

    并发队列无论是否有任务在运行都表现的一样,所以不跟踪这些。

    全局队列作为一个全局变量来存储,底层共享的线程池也是。他们都在+initialize方法中创建:

    static MADispatchQueue *gGlobalQueue;
        static MAThreadPool *gThreadPool;
    
        + (void)initialize {
            if(self == [MADispatchQueue class]) {
                gGlobalQueue = [[MADispatchQueue alloc] initSerial: NO];
                gThreadPool = [[MAThreadPool alloc] init];
            }
        }
    

    获取全局队列方法只是返回这个变量,因为initialize方法中确保已经创建了他:

    + (MADispatchQueue *)globalQueue {
            return gGlobalQueue;
    }
    

    初始化队列由分配锁,挂起block队列以及设置_serial变量这些工作组成:

    - (id)initSerial: (BOOL)serial {
            if ((self = [super init])) {
                _lock = [[NSLock alloc] init];
                _pendingBlocks = [[NSMutableArray alloc] init];
                _serial = serial;
            }
            return self;
    }
    

    在我们接触剩余的公开API之前,有一个底层的方法需要创建,这个方法将在线程池派发一个单一的block,然后调用他自己来运行另一个block:

    - (void)dispatchOneBlock {
    

    这个方法的目的是在线程池运行东西,所以他在这里派发:

        [gThreadPool addBlock: ^{
    

    然后他抓住了队列里的第一个block。自然的,这必须结合锁来完成,以避免灾难事故:

        [_lock lock];
        dispatch_block_t block = [_pendingBlocks firstObject];
        [_pendingBlocks removeObjectAtIndex: 0];
        [_lock unlock];
    

    随着获得block以及释放锁,block能够安全得在后台线程执行

    block();
    

    如果队列是并发的,那么这就是所有要做的。如果这是串行的,还需要:

     if(_serial) {
    

    在一个串行队列,将建立额外的block,但是不能在block完成之前唤起。当一个block完成, dispatchOneBlock会查看队列中是否有其他挂起的block,如果有,他会调用自己去派发下一个block,如果没有,他会将队列的运行状态设回NO:

                    [_lock lock];
                    if([_pendingBlocks count] > 0) {
                        [self dispatchOneBlock];
                    } else {
                        _serialRunning = NO;
                    }
                    [_lock unlock];
                }
            }];
        }
    

    用这个方法来实现dispatchAsync是相当简单的。添加block到挂起的block的队列,设置状态并且视情况唤起dispatchOneBlock:

    - (void)dispatchAsync: (dispatch_block_t)block {
            [_lock lock];
            [_pendingBlocks addObject: block];
    

    如果一个串行队列是闲置状态,那么设置为运行状态并调用dispatchOneBlock来执行要做的事:

    if(_serial && !_serialRunning) {
                _serialRunning = YES;
                [self dispatchOneBlock];
    

    如果队列是并发的,那么无条件的调用dispatchOneBlock。这能确保新的block能够尽可能快的执行,尽管另一个block已经在运行中,因为在并发的情况下允许多个blocks执行:

     } else if (!_serial) {
                [self dispatchOneBlock];
            }
    

    如果一个串行已经运行,那没什么更多要做的了。dispatchOneBlock会执行完所有添加到队列的block。现在释放锁:

        [_lock unlock];
        }
    

    在dispatchSync方面,GCD当停止队列里其他block时,在调用的线程上直接运行block(如果这是串行)。我们不想尝试做到这么智能。取而代之的,我们只是包装一下dispatchAsync:,使他能够等待完成执行。
    他使用一个局部NSCondition变量,附加一个done的BOOL变量来表明什么时候block已经完成:

    - (void)dispatchSync: (dispatch_block_t)block {
            NSCondition *condition = [[NSCondition alloc] init];
            __block BOOL done = NO;
    

    然后他异步的派发block。这里调用的是传入的block,然后设置状态为完成并且让条件锁发送信号:

     [self dispatchAsync: ^{
                block();
                [condition lock];
                done = YES;
                [condition signal];
                [condition unlock];
     }];
    

    回到原来正在调用的线程,我们要做的是等待状态被设为done,然后返回。

            [condition lock];
            while (!done) {
                [condition wait];
            }
            [condition unlock];
        }
    

    到此,block的执行已经完成了,这也是MADispatchQueue的API最后一点要做的了。

    结论
    一个全局的线程池能够通过一组工作的block和一些比较智能的线程来实现。使用一个共享的全局线程池,能够创建一个提供串行/并发和同步/异步派发的基础派发队列的API。本次重建缺少了许多GCD很棒的特性,并且非常低效。不管怎样这让我们很好的了解了内部工作原理。

    相关文章

      网友评论

        本文标题:自己动手创建Dispatch Queue

        本文链接:https://www.haomeiwen.com/subject/zpxqmttx.html