GCD

作者: CrystalZhu | 来源:发表于2019-03-14 11:57 被阅读0次

    Grand Central Dispatch (GCD)

    转载自:https://blog.boolchow.com/2018/04/06/iOS-Concurrency-Programming/
    GCD 是基于 C 实现的一套 API,而且是开源的,如果有兴趣,可以在 这里 down 一份源码研究一下。GCD 是由系统帮我们处理多线程调度,很是方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。

    在讲解之前,我们先有个概览,看一下 GCD 为我们提供了那些东西:

    GCD总体结构

    系统所提供的 API,完全可以满足我们日常开发需求了。下面就根据这些模块分别讲解一下。

    1. Dispatch Queue

    GCD 为我们提供了两类队列,串行队列并行队列。两者的区别是:

    • 串行队列中,按照 FIFO 的顺序执行任务,前面一个任务执行完,后面一个才开始执行。
    • 并行队列中,也是按照 FIFO 的顺序执行任务,只要前一个被拿去执行,继而后面一个就开始执行,后面的任务无需等到前面的任务执行完再开始执行。

    除此之外,还要解释一个容易混淆的概念,并发并行

    • 并发:是指单独部分可以同时执行,但是需要系统决定怎样发生。
    • 并行:两个任务互不干扰,同时执行。单核设备,系统需要通过切换上下文来实现并发;多核设备,系统可以通过并行来执行并发任务。

    最后,还有一个概念,同步异步

    • 同步 : 同步执行的任务会阻塞当前线程。
    • 异步 : 异步执行的任务不会阻塞当前线程。是否开启新的线程,由系统管理。如果当前有空闲的线程,使用当前线程执行这个异步任务;如果没有空闲的线程,而且线程数量没有达到系统最大,则开启新的线程;如果线程数量已经达到系统最大,则需要等待其他线程中任务执行完毕。
    队列

    我们使用时,一般使用这几个队列:

    • 主队列 - dispatch_get_main_queue :一个特殊的串行队列。在 GCD 中,方法主队列中的任务都是在主线程执行。当我们更新 UI 时想 dispatch 到主线程,可以使用这个队列。
      - (void)viewDidLoad {
        [super viewDidLoad];
            dispatch_async(dispatch_get_main_queue(),   ^{
              // UI 相关操作
           });
    }
    
    • 全局并行队列 - dispatch_get_global_queue : 系统提供的一个全局并行队列,我们可以通过指定参数,来获取不同优先级的队列。系统提供了四个优先级,所以也可以认为系统为我们提供了四个并行队列,分别为 :

      • DISPATCH_QUEUE_PRIORITY_HIGH
      • DISPATCH_QUEUE_PRIORITY_DEFAULT
      • DISPATCH_QUEUE_PRIORITY_LOW
      • DISPATCH_QUEUE_PRIORITY_BACKGROUND
    dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
       dispatch_async(queue, ^{
           // 相关操作
       });
    
    • 自定义队列 :你可以自己定义串行或者并行队列,来执行一些相关的任务,平时开发中也建议用自定义队列。创建自定义队列时,需要两个参数。一个是队列的名字,方便我们再调试时查找队列使用,命名方式采用的是反向 DNS 命名规则;一个是队列类型,传 NULL 或者 DISPATCH_QUEUE_SERIAL 代表串行队列,传 DISPATCH_QUEUE_CONCURRENT 代表并行队列,通常情况下,不要传 NULL,会降低可读性。
      DISPATCH_QUEUE_SERIAL_INACTIVE 代表串行不活跃队列,DISPATCH_QUEUE_CONCURRENT_INACTIVE 代表并行不活跃队列,在执行 block 任务时,需要被激活。
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL);
    
    
    • 你可以使用 dispatch_queue_set_specificdispatch_queue_get_specificdispatch_get_specific 方法,为 queue 设置关联的 key 或者根据 key 找到关联对象等操作。

    可以说,系统为我们提供了 5 中不同的队列,运行在主线程中的 main queue;3 个不同优先级的 global queue; 一个优先级更低的 background queue。除此之外,开发者可以自定义一些串行和并行队列,这些自定义队列中被调度的所有 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:

    gcd-queues
    同步 VS 异步

    我们大多数情况下,都是使用 dispatch_asyn() 做异步操作,因为程序本来就是顺序执行,很少用到同步操作。有时候我们会把 dispatch_syn() 当做锁来用,以达到保护的作用。

    系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候我们想把一些任务延后执行以下,例如 App 启动时,我想让主线程中一个耗时的工作放在后,可以尝试用一下 dispatch_asyn(),相当于把任务重新追加到了队尾。

    dispatch_async(dispatch_get_main_queue(), ^{
            // 想要延后的任务
        });
    

    通常情况下,我们使用 dispatch_asyn() 是不会造成死锁的。死锁一般出现在使用 dispatch_syn()的时候。例如:

    dispatch_sync(dispatch_get_main_queue(), ^{
       NSLog(@"dead lock");
    });
    

    想上面这样写,启动就会报错误。以下情况也如此:

    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
        dispatch_async(queue, ^{
            NSLog(@"dispatch asyn");
            dispatch_sync(queue, ^{
                NSLog(@"dispatch asyn -> dispatch syn");
            });
        });
    

    在上面的代码中,dispatch_asyn() 整个 block(称作 blcok_asyn) 当做一个任务追加到串行队列队尾,然后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn(),想想要执行 block_syn。因为是串行队列,需要前一个执行完(block_asyn),再执行后面一个(block_syn);但是要执行完 block_asyn,需要执行内部的 block_syn。互相等待,形成死锁。

    现实开发中,还有更复杂的死锁场景。不过现在编译器很友好,我们能在编译执行时就检测到了。

    基本原理

    针对下面这几行代码,我们分析一下它的底层过程:

    - (void)viewDidLoad {
        [super viewDidLoad];
        dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
        dispatch_async(queue, ^{
            NSLog(@"dispatch asyn test");
        });
    }
    

    创建队列

    源码很长,但实际只有一个方法,逻辑比较清晰,如下:

    /** 开发者调用的方法 */
    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
        return _dispatch_queue_create_with_target(label, attr,
                DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    
    /** 内部实际调用方法 */
    DISPATCH_NOINLINE
    static dispatch_queue_t
    _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
            dispatch_queue_t tq, bool legacy)
    {
        // 1.初步判断
        if (!slowpath(dqa)) {
            dqa = _dispatch_get_default_queue_attr();
        } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
            DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
        }
    
        // 2.配置队列参数
        dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
    #if !HAVE_PTHREAD_WORKQUEUE_QOS
        if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
            qos = DISPATCH_QOS_USER_INITIATED;
        }
        if (qos == DISPATCH_QOS_MAINTENANCE) {
            qos = DISPATCH_QOS_BACKGROUND;
        }
    #endif // !HAVE_PTHREAD_WORKQUEUE_QOS
    
        _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
        if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
            if (tq->do_targetq) {
                DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                        "a non-global target queue");
            }
        }
    
        if (tq && !tq->do_targetq &&
                tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
            // Handle discrepancies between attr and target queue, attributes win
            if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
                if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
                    overcommit = _dispatch_queue_attr_overcommit_enabled;
                } else {
                    overcommit = _dispatch_queue_attr_overcommit_disabled;
                }
            }
            if (qos == DISPATCH_QOS_UNSPECIFIED) {
                dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
                tq = _dispatch_get_root_queue(tq_qos,
                        overcommit == _dispatch_queue_attr_overcommit_enabled);
            } else {
                tq = NULL;
            }
        } else if (tq && !tq->do_targetq) {
            // target is a pthread or runloop root queue, setting QoS or overcommit
            // is disallowed
            if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
                DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
                        "and use this kind of target queue");
            }
            if (qos != DISPATCH_QOS_UNSPECIFIED) {
                DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
                        "and use this kind of target queue");
            }
        } else {
            if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
                 // Serial queues default to overcommit!
                overcommit = dqa->dqa_concurrent ?
                        _dispatch_queue_attr_overcommit_disabled :
                        _dispatch_queue_attr_overcommit_enabled;
            }
        }
        if (!tq) {
            tq = _dispatch_get_root_queue(
                    qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                    overcommit == _dispatch_queue_attr_overcommit_enabled);
            if (slowpath(!tq)) {
                DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
            }
        }
    
        // 3. 初始化队列
        if (legacy) {
            // if any of these attributes is specified, use non legacy classes
            if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
                legacy = false;
            }
        }
    
        const void *vtable;
        dispatch_queue_flags_t dqf = 0;
        if (legacy) {
            vtable = DISPATCH_VTABLE(queue);
        } else if (dqa->dqa_concurrent) {
            vtable = DISPATCH_VTABLE(queue_concurrent);
        } else {
            vtable = DISPATCH_VTABLE(queue_serial);
        }
        switch (dqa->dqa_autorelease_frequency) {
        case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
            dqf |= DQF_AUTORELEASE_NEVER;
            break;
        case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
            dqf |= DQF_AUTORELEASE_ALWAYS;
            break;
        }
        if (legacy) {
            dqf |= DQF_LEGACY;
        }
        if (label) {
            const char *tmp = _dispatch_strdup_if_mutable(label);
            if (tmp != label) {
                dqf |= DQF_LABEL_NEEDS_FREE;
                label = tmp;
            }
        }
    
        dispatch_queue_t dq = _dispatch_object_alloc(vtable,
                sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
        _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
                DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
                (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
    
        dq->dq_label = label;
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        dq->dq_priority = dqa->dqa_qos_and_relpri;
        if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
            dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
        }
    #endif
        _dispatch_retain(tq);
        if (qos == QOS_CLASS_UNSPECIFIED) {
            // legacy way of inherithing the QoS from the target
            _dispatch_queue_priority_inherit_from_target(dq, tq);
        }
        if (!dqa->dqa_inactive) {
            _dispatch_queue_inherit_wlh_from_target(dq, tq);
        }
        dq->do_targetq = tq;
        _dispatch_object_debug(dq, "%s", __func__);
        return _dispatch_introspection_queue_create(dq);
    }
    

    根据代码生成的流程图,不想看代码直接看图,下同:

    Create_Queue

    根据流程图,这个方法的步骤如下:

    • 开发者调用 dispatch_queue_create() 方法之后,内部会调用 _dispatch_queue_create_with_target() 方法。
    • 然后进行初步判断,多数情况下,我们是不会传队列类型的,都是穿 NULL,所以这里是个 slowpath。如果传了参数,但是不是规定的队列类型,系统会认为你是个智障,并抛出错误。
    • 然后初始化一些配置项。主要是 target_queue,overcommit 项和 qos。target_queue 是依赖的目标队列,像任何队列提交的任务(block),最终都会放到目标队列中执行;支持 overcommit 时,每当想队列提交一个任务时,都会开一个新的线程处理,这样是为了避免单一线程任务太多而过载;qos 是队列优先级,之前已经说过。
    • 然后进入判断分支。普通的串行队列的目标队列,就是一个支持 overcommit 的全局队列(对应 else 分支);当前 tq 对象的引用计数为 DISPATCH_OBJECT_GLOBAL_REFCNT (永远不会释放)时,且还没有目标队列时,才可以设置 overcommit 项,而且当优先级为 DISPATCH_QOS_UNSPECIFIED 时,需要重置 tq (对应 if 分支);其他情况(else if 分支)。
    • 然后配置队列的标识,以方便在调试时找到自己的那个队列。
    • 使用 _dispatch_object_alloc 方法申请一个 dispatch_queue_t 对象空间,dq。
    • 根据传入的信息(并行 or 串行;活跃 or 非活跃)来初始化这个队列。并行队列的 width 会设置为 DISPATCH_QUEUE_WIDTH_MAX 即最大,不设限;串行的会设置为 1。
    • 将上面获得配置项,目标队列,是否支持 overcommit,优先级和 dq 绑定。
    • 返回这个队列。返回去还输出了一句信息,便于调试。

    异步执行

    这个版本异步执行的代码,因为方法拆分很多,所以显得很乱。源码如下:

    /** 开发者调用 */
    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
    
        _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
        _dispatch_continuation_async(dq, dc);
    }
    
    /** 内部调用,包一层,再深入调用 */
    DISPATCH_NOINLINE
    void
    _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
    {
        _dispatch_continuation_async2(dq, dc,
                dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
    }
    
    /** 根据 barrier 关键字区别串行还是并行,分两支 */
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
            bool barrier)
    {
        if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
            // 串行
            return _dispatch_continuation_push(dq, dc);
        }
        
        // 并行
        return _dispatch_async_f2(dq, dc);
    }
    
    /** 并行又多了一层调用,就是这个方法 */
    DISPATCH_NOINLINE
    static void
    _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
    {
        if (slowpath(dq->dq_items_tail)) {// 少路径
            return _dispatch_continuation_push(dq, dc);
        }
    
        if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
            return _dispatch_continuation_push(dq, dc);
        }
        // 多路径
        return _dispatch_async_f_redirect(dq, dc,
                _dispatch_continuation_override_qos(dq, dc));
    }
    
    /** 主要用来重定向 */
    DISPATCH_NOINLINE
    static void
    _dispatch_async_f_redirect(dispatch_queue_t dq,
            dispatch_object_t dou, dispatch_qos_t qos)
    {
        if (!slowpath(_dispatch_object_is_redirection(dou))) {
            dou._dc = _dispatch_async_redirect_wrap(dq, dou);
        }
        dq = dq->do_targetq;
    
        // Find the queue to redirect to
        while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
            if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
                break;
            }
            if (!dou._dc->dc_ctxt) {
                dou._dc->dc_ctxt = (void *)
                        (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
            }
            dq = dq->do_targetq;
        }
    
        // 同步异步最终都是调用的这个方法,将任务追加到队列中
        dx_push(dq, dou, qos);
    }
    
    ... 省略一些调用层级,
    
    /** 核心方法,通过 dc_flags 参数区分了是 group,还是串行,还是并行 */
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
            dispatch_invoke_flags_t flags)
    {
        dispatch_continuation_t dc = dou._dc, dc1;
        dispatch_invoke_with_autoreleasepool(flags, {
            uintptr_t dc_flags = dc->dc_flags;
            _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
            if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
                dc1 = _dispatch_continuation_free_cacheonly(dc);
            } else {
                dc1 = NULL;
            }
            if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
                _dispatch_continuation_with_group_invoke(dc);
            } else { // 串行
                _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
                _dispatch_introspection_queue_item_complete(dou);
            }
            if (unlikely(dc1)) {
                _dispatch_continuation_free_to_cache_limit(dc1);
            }
        });
        _dispatch_perfmon_workitem_inc();
    }
    

    不想看代码,直接看图:

    Dispatch_Asyn

    根据流程图描述一下过程:

    • 首先开发者调用 dispatch_async() 方法,然后内部创建了一个 _dispatch_continuation_init队列,将 queue、block 这些信息和这个 dc 绑定起来。这过程中 copy 了 block。
    • 然后经过了几个层次的调用,主要为了区分并行还是串行。
    • 如果是串行(这种情况比较常见,所以是 fastpath),直接就 dx_push 了,其实就是讲任务追加到一个链表里面。
    • 如果是并行,需要做重定向。之前我们说过,放到队列中的任务,最终都会以各种形式追加到目标队列里面。在 _dispatch_async_f_redirect 方法中,重新寻找依赖目标队列,然后追加过去。
    • 经过一系列调用,我们会在 _dispatch_continuation_invoke_inline 方法里区分串行还是并行。因为这个方法会被频繁调用,所以定义成了内联函数。对于串行队列,我们使用信号量控制,执行前信号量置为 wait,执行完毕后发送 singal;对于调度组,我们会在执行完之后调用 dispatch_group_leave
    • 底层的线程池,是使用 pthread 维护的,所以最终都会使用 pthread 来处理这些任务。

    同步执行

    同步执行,相对来说比较简单,源码如下 :

    /** 开发者调用 */
    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_private_data(dq, work, 0);
        }
        dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
    }
    
    /** 内部调用 */
    DISPATCH_NOINLINE
    void
    dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
    {
        if (likely(dq->dq_width == 1)) {
            return dispatch_barrier_sync_f(dq, ctxt, func);
        }
    
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
            return _dispatch_sync_f_slow(dq, ctxt, func, 0);
        }
    
        _dispatch_introspection_sync_begin(dq);
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dq, ctxt, func, 0);
        }
        _dispatch_sync_invoke_and_complete(dq, ctxt, func);
    }
    

    同步执行,相对来说简单些,大体逻辑差不多。偷懒一下,就不画图了,直接描述:

    • 开发者使用 dispatch_sync() 方法,大多数路径,都会调用 dispatch_sync_f() 方法。
    • 如果是串行队列,则通过 dispatch_barrier_sync_f() 方法来保证原子操作。
    • 如果不是串行的(一般很少),我们使用 _dispatch_introspection_sync_begin_dispatch_sync_invoke_and_complete 来保证同步。
    dispatch_after

    dispatch_after 一般用于延后执行一些任务,可以用来代替 NSTimer,因为有时候 NSTimer 问题太多了。在后面的一章里,我会总体讲一下多线程中的问题,这里就不详细说了。一般我们这样来使用 dispatch_after :

    - (void)viewDidLoad {
        [super viewDidLoad];
        dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
            // 2.0 second execute
        });
    }
    

    在做页面过渡时,刚进入到新的页面我们并不会立即更新一些 view,为了引起用户注意,我们会过会儿再进行更新,可以中此 API 来完成。

    源码如下:

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
            void *ctxt, void *handler, bool block)
    {
        dispatch_timer_source_refs_t dt;
        dispatch_source_t ds;
        uint64_t leeway, delta;
    
        if (when == DISPATCH_TIME_FOREVER) {
    #if DISPATCH_DEBUG
            DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
    #endif
            return;
        }
    
        delta = _dispatch_timeout(when);
        if (delta == 0) {
            if (block) {
                return dispatch_async(queue, handler);
            }
            return dispatch_async_f(queue, ctxt, handler);
        }
        leeway = delta / 10; // <rdar://problem/13447496>
    
        if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
        if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
    
        // this function can and should be optimized to not use a dispatch source
        ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
        dt = ds->ds_timer_refs;
    
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        if (block) {
            _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
        } else {
            _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
        }
        // reference `ds` so that it doesn't show up as a leak
        dc->dc_data = ds;
        _dispatch_trace_continuation_push(ds->_as_dq, dc);
        os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
    
        if ((int64_t)when < 0) {
            // wall clock
            when = (dispatch_time_t)-((int64_t)when);
        } else {
            // absolute clock
            dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
            leeway = _dispatch_time_nano2mach(leeway);
        }
        dt->dt_timer.target = when;
        dt->dt_timer.interval = UINT64_MAX;
        dt->dt_timer.deadline = when + leeway;
        dispatch_activate(ds);
    }
    

    dispatch_after() 内部会调用 _dispatch_after() 方法,然后先判断延迟时间。如果为 DISPATCH_TIME_FOREVER(永远不执行),则会出现异常;如果为 0 则立即执行;否则的话会创建一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。然后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。

    dispatch_once

    如果我们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如我们单例中经常这样用:

    + (instancetype)sharedManager {
        static BLDispatchManager *sharedInstance = nil;
        static dispatch_once_t onceToken;
        dispatch_once(&onceToken, ^{
            sharedInstance = [[BLDispatchManager alloc] initPrivate];
        });
        
        return sharedInstance;
    }
    

    还有在定义 NSDateFormatter 时使用:

    - (NSString *)todayDateString {
        static NSDateFormatter *formatter = nil;
        static dispatch_once_t onceToken;
        dispatch_once(&onceToken, ^{
            formatter = [NSDateFormatter new];
            formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
            formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
            formatter.dateFormat = @"yyyyMMdd";
        });
        
        return [formatter stringFromDate:[NSDate date]];
    }
    

    因为这是很常用的一个代码片段,所以被加在了 Xcode 的 code snippet 中。

    它的源代码如下:

    /** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
    typedef struct _dispatch_once_waiter_s {
        volatile struct _dispatch_once_waiter_s *volatile dow_next;
        dispatch_thread_event_s dow_event;
        mach_port_t dow_thread;
    } *_dispatch_once_waiter_t;
    
    /** 我们调用的方法 */
    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    
    /** 实际执行的方法 */
    DISPATCH_NOINLINE
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
    #if !DISPATCH_ONCE_INLINE_FASTPATH
        if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
            return;
        }
    #endif // !DISPATCH_ONCE_INLINE_FASTPATH
        return dispatch_once_f_slow(val, ctxt, func);
    }
    
    DISPATCH_ONCE_SLOW_INLINE
    static void
    dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
    #if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
        if (_dispatch_once_gate_tryenter(l)) {
            _dispatch_client_callout(ctxt, func);
            _dispatch_once_gate_broadcast(l);
        } else {
            _dispatch_once_gate_wait(l);
        }
    #else
        _dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
        struct _dispatch_once_waiter_s dow = { };
        _dispatch_once_waiter_t tail = &dow, next, tmp;
        dispatch_thread_event_t event;
    
        if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
            dow.dow_thread = _dispatch_tid_self();
            _dispatch_client_callout(ctxt, func);
    
            next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
            while (next != tail) {
                tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
                event = &next->dow_event;
                next = tmp;
                _dispatch_thread_event_signal(event);
            }
        } else {
            _dispatch_thread_event_init(&dow.dow_event);
            next = *vval;
            for (;;) {
                if (next == DISPATCH_ONCE_DONE) {
                    break;
                }
                if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
                    dow.dow_thread = next->dow_thread;
                    dow.dow_next = next;
                    if (dow.dow_thread) {
                        pthread_priority_t pp = _dispatch_get_priority();
                        _dispatch_thread_override_start(dow.dow_thread, pp, val);
                    }
                    _dispatch_thread_event_wait(&dow.dow_event);
                    if (dow.dow_thread) {
                        _dispatch_thread_override_end(dow.dow_thread, val);
                    }
                    break;
                }
            }
            _dispatch_thread_event_destroy(&dow.dow_event);
        }
    #endif
    }
    

    不想看代码直接看图 (emmm… 根据逻辑画完图才发现,其实这个图也挺乱的,所以我将两个主分支用不同颜色标记处理):

    Dispatch_Once

    根据这个图,我来表述一下主要过程:

    • 我们调用 dispatch_once() 方法之后,内部多数情况下会调用 dispatch_once_f_slow() 方法,这个方法才是真正的执行方法。
    • os_atomic_cmpxchg(vval, NULL, tail, acquire) 这个方法,执行过程实际是这个样子
        if (*vval == NULL) {
        *vval = tail = &dow;
        return true;
    } else {
        return false
    }
    

    我们初始化的 once_token,也就是 *vval 实际是 0,所以第一次执行时是返回 true 的。if() 中的这个方法是原子操作,也就是说,如果多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其他都进入 else 分支。

    • 这里先说进入 true 分支。进入之后,会执行对应的 block,也就是对应的任务。然后 next 指向 *vval, *vval 标记为 DISPATCH_ONCE_DONE,即执行的是这样一个过程:
        next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
    // 实际执行时这样的
    next = *vval;
    *vval = DISPATCH_ONCE_DONE;
    
    • 然后 tail = &dow。此时我们发现,原来的 *vval = &dow -> next = *vval,实际则是 next = &dow如果没有其他线程(或者调用)进入 else 分支,&dow 实际没有改变,即 tail == tmp。此时 while (tail != tmp) 是不会执行的,分支结束。

    • 如果有其他线程(或者调用)进入了 else 分支,那么就已经生成了一个等待响应的链表。此时进入 &dow 已经改变,成为了链表尾部,*vval 是链表头部。进入 while 循环后,开始遍历链表,依次发送信号进行唤起。

    • 然后说进入 else 分支的这些调用。进入分支后,随即进入一个死循环,直到发现 *vval 已经标记为了 DISPATCH_ONCE_DONE 才跳出循环。

    • 发现 *vval 不是 DISPATCH_ONCE_DONE 之后,会将这个节点追加到链表尾部,并调用信号量的 wait 方法,等待被唤起。

    以上为全部的执行过程。通过源码可以看出,使用的是 原子操作 + 信号量来保证 block 只会被执行多次,哪怕是在多线程情况下。

    这样一个关于 dispatch_once 递归调用会产生死锁的现象,也就很好解释了。看下面代码:

    - (void)dispatchOnceTest {
        static dispatch_once_t onceToken;
        dispatch_once(&onceToken, ^{
            [self dispatchOnceTest];
        });
    }
    

    通过上面分析,在 block 执行完,并将 *vval 置为 DISPATCH_ONCE_DONE 之前,其他的调用都会进入 else 分支。第二次递归调用,信号量处于等待状态,需要等到第一个 block 执行完才能被唤起;但是第一个 block 所执行的内容就是进行第二次调用,这个任务被 wait 了,也即是说 block 永远执行不完。死锁就这样发生了。

    dispatch_apply

    有时候没有时序性依赖的时候,我们会用 dispatch_apply 来代替 for loop。例如我们下载一组图片:

    /** 使用 for loop */
    - (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
        for (NSURL *imageURL in imageURLs) {
            [self downloadImageWithURL:imageURL];
        }
    }
    
    /** dispatch_apply */
    - (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
        dispatch_queue_t downloadQueue = dispatch_queue_create("com.bool.download", DISPATCH_QUEUE_CONCURRENT);
        dispatch_apply(imageURLs.count, downloadQueue, ^(size_t index) {
            NSURL *imageURL = imageURLs[index];
            [self downloadImageWithURL:imageURL];
        });
    }
    

    进行替换是需要注意几个问题:

    • 任务之间没有时序性依赖,谁先执行都可以。
    • 一般在并发队列,并发执行任务时,才替换。串行队列替换没有意义。
    • 如果数组中数据很少,或者每个任务执行时间很短,替换也没有意义。强行进行并发的消耗,可能比使用 for loop 还要多,并不能得到优化。

    至于原理,就不大篇幅讲了。大概是这个样子:这个方法是同步的,会阻塞当前线程,直到所有的 block 任务都完成。如果提交到并发队列,每个任务执行顺序是不一定的。

    更多时候,我们执行下载任务,并不希望阻塞当前线程,这时我们可以使用 dispatch_group

    dispatch_group

    当处理批量异步任务时,dispatch_group 是一个很好的选择。针对上面说的下载图片的例子,我们可以这样做:

    - (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
        dispatch_group_t taskGroup = dispatch_group_create();
        dispatch_queue_t queue = dispatch_queue_create("com.bool.group", DISPATCH_QUEUE_CONCURRENT);
        for (NSURL *imageURL in imageURLs) {
            dispatch_group_enter(taskGroup);
            // 下载方法是异步的
            [self downloadImageWithURL:imageURL withQueue:queue completeHandler:^{
                dispatch_group_leave(taskGroup);
            }];
        }
        
        dispatch_group_notify(taskGroup, queue, ^{
            // all task finish
        });
        
        /** 如果使用这个方法,内部执行异步任务,会立即到 dispatch_group_notify 方法中,因为是异步,系统认为已经执行完了。所以这个方法使用不多。
         */
        dispatch_group_async(taskGroup, queue, ^{
            
        })
    }
    

    关于原理方面,和 dispatch_async() 方法类似,前面也提到。这里只说一段代码:

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc)
    {
        dispatch_group_enter(dg);
        dc->dc_data = dg;
        _dispatch_continuation_async(dq, dc);
    }
    

    这段代码中,调用了 dispatch_group_enter(dg) 方法进行标记,最终都会和 dispatch_async() 走到同样的方法里 _dispatch_continuation_invoke_inline()。在里面判断类型为 group,执行 task,执行结束后调用 dispatch_group_leave((dispatch_group_t)dou),和之前的 enter 对应。

    以上是 Dispatch Queues 内容的介绍,我们平时使用 GCD 的过程中,60% 都是使用的以上内容。

    2. Dispatch Block

    在 iOS 8 中,Apple 为我们提供了新的 API,Dispatch Block 相关。虽然之前我们可以向 dispatch 传递 block 参数,作为任务,但是这里和之前的不一样。之前经常说,使用 NSOperation 创建的任务可以 cancel,使用 GCD 不可以。但是在 iOS 8 之后,可以 cancel 任务了。

    基本使用
    • 创建一个 block 并执行。
        - (void)dispatchBlockTest {
            // 不指定优先级
            dispatch_block_t dsBlock = dispatch_block_create(0, ^{
                NSLog(@"test block");
            });
            
            // 指定优先级
            dispatch_block_t dsQosBlock = dispatch_block_create_with_qos_class(0, QOS_CLASS_USER_INITIATED, -1, ^{
                NSLog(@"test block");
            });
            
            dispatch_async(dispatch_get_main_queue(), dsBlock);
            dispatch_async(dispatch_get_main_queue(), dsQosBlock);
            
            // 直接创建并执行
            dispatch_block_perform(0, ^{
                 NSLog(@"test block");
            });
    }
    
    • 阻塞当前任务,等 block 执行完在继续执行。
        - (void)dispatchBlockTest {
        dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
        dispatch_block_t dsBlock = dispatch_block_create(0, ^{
            NSLog(@"test block");
        });
        dispatch_async(queue, dsBlock);
        // 等到 block 执行完
        dispatch_block_wait(dsBlock, DISPATCH_TIME_FOREVER);
        NSLog(@"block was finished");
    }
    
    • block 执行完后,收到通知,执行其他任务
        - (void)dispatchBlockTest {
        dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
        dispatch_block_t dsBlock = dispatch_block_create(0, ^{
            NSLog(@"test block");
        });
        dispatch_async(queue, dsBlock);
        // block 执行完收到通知
        dispatch_block_notify(dsBlock, queue, ^{
            NSLog(@"block was finished,do other thing");
        });
         NSLog(@"execute first");
    }
    
    • 对 block 进行 cancel 操作
        - (void)dispatchBlockTest {
        dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
        dispatch_block_t dsBlock1 = dispatch_block_create(0, ^{
            NSLog(@"test block1");
        });
        dispatch_block_t dsBlock2 = dispatch_block_create(0, ^{
            NSLog(@"test block2");
        });
        dispatch_async(queue, dsBlock1);
        dispatch_async(queue, dsBlock2);
        
        // 第二个 block 将会被 cancel,不执行
        dispatch_block_cancel(dsBlock2);
    }
    

    3. Dispatch Barriers

    Dispatch Barriers 可以理解为调度屏障,常用于多线程并发读写操作。例如:

    @interface ViewController ()
    @property (nonatomic, strong) dispatch_queue_t imageQueue;
    @property (nonatomic, strong) NSMutableArray *imageArray;
    @end
    
    @implementation ViewController
    
    - (void)viewDidLoad {
        [super viewDidLoad];
        
        self.imageQueue = dispatch_queue_create("com.bool.image", DISPATCH_QUEUE_CONCURRENT);
        self.imageArray = [NSMutableArray array];
    }
    
    /** 保证写入时不会有其他操作,写完之后到主线程更新 UI */
    - (void)addImage:(UIImage *)image {
        dispatch_barrier_async(self.imageQueue, ^{
            [self.imageArray addObject:image];
            dispatch_async(dispatch_get_main_queue(), ^{
                // update UI
            });
        });
    }
    
    /** 这里的 dispatch_sync 起到了 lock 的作用 */
    - (NSArray <UIImage *> *)images {
        __block NSArray *imagesArray = nil;
        dispatch_sync(self.imageQueue, ^{
            imagesArray = [self.imageArray mutableCopy];
        });
        return imagesArray;
    }
    @end
    

    转化成图可能好理解一些:

    Dispatch_Barrier

    dispatch_barrier_async() 的原理和 dispatch_async() 差不多,只不过设置的 flags 不一样:

    void
    dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        // 在 dispatch_async() 中只设置了 DISPATCH_OBJ_CONSUME_BIT
        uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
    
        _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
        _dispatch_continuation_push(dq, dc);
    }
    

    后面都是 push 到队列中,然后,获取任务时一个死循环,在从队列中获取任务一个一个执行,如果判断 flag 为 barrier,终止循环,则单独执行这个任务。它后面的任务放入一个队列,等它执行完了再开始执行。

    DISPATCH_ALWAYS_INLINE
    static dispatch_queue_wakeup_target_t
    _dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
            dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
    {
        ...
        
        for (;;) {
            ...
    first_iteration:
            dq_state = os_atomic_load(&dq->dq_state, relaxed);
            if (unlikely(_dq_state_is_suspended(dq_state))) {
                break;
            }
            if (unlikely(orig_tq != dq->do_targetq)) {
                break;
            }
    
            if (serial_drain || _dispatch_object_is_barrier(dc)) {
                if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
                    if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
                        goto out_with_no_width;
                    }
                    owned = DISPATCH_QUEUE_IN_BARRIER;
                }
                next_dc = _dispatch_queue_next(dq, dc);
                if (_dispatch_object_is_sync_waiter(dc)) {
                    owned = 0;
                    dic->dic_deferred = dc;
                    goto out_with_deferred;
                }
            } else {
                if (owned == DISPATCH_QUEUE_IN_BARRIER) {
                    // we just ran barrier work items, we have to make their
                    // effect visible to other sync work items on other threads
                    // that may start coming in after this point, hence the
                    // release barrier
                    os_atomic_xor2o(dq, dq_state, owned, release);
                    owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
                } else if (unlikely(owned == 0)) {
                    if (_dispatch_object_is_sync_waiter(dc)) {
                        // sync "readers" don't observe the limit
                        _dispatch_queue_reserve_sync_width(dq);
                    } else if (!_dispatch_queue_try_acquire_async(dq)) {
                        goto out_with_no_width;
                    }
                    owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
                } 
                
                next_dc = _dispatch_queue_next(dq, dc);
                if (_dispatch_object_is_sync_waiter(dc)) {
                    owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
                    _dispatch_sync_waiter_redirect_or_wake(dq,
                            DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
                    continue;
                }
                
                ...
        }
    

    4. Dispatch Source

    关于 dispatch_source 我们使用的少之又少,他是 BSD 系统内核功能的包装,经常用来监测某些事件发生。例如监测断点的使用和取消。[这里][https://developer.apple.com/documentation/dispatch/dispatch_source_type_constants?language=objc] 介绍了可以监测的事件:

    • DISPATCH_SOURCE_TYPE_DATA_ADD : 自定义事件
    • DISPATCH_SOURCE_TYPE_DATA_OR : 自定义事件
    • DISPATCH_SOURCE_TYPE_MACH_RECV : MACH 端口接收事件
    • DISPATCH_SOURCE_TYPE_MACH_SEND : MACH 端口发送事件
    • DISPATCH_SOURCE_TYPE_PROC : 进程相关事件
    • DISPATCH_SOURCE_TYPE_READ : 文件读取事件
    • DISPATCH_SOURCE_TYPE_SIGNAL : 信号相关事件
    • DISPATCH_SOURCE_TYPE_TIMER : 定时器相关事件
    • DISPATCH_SOURCE_TYPE_VNODE : 文件属性修改事件
    • DISPATCH_SOURCE_TYPE_WRITE : 文件写入事件
    • DISPATCH_SOURCE_TYPE_MEMORYPRESSURE : 内存压力事件

    例如我们可以通过下面代码,来监测断点的使用和取消:

    @interface ViewController ()
    @property (nonatomic, strong) dispatch_source_t signalSource;
    @property (nonatomic, assign) dispatch_once_t signalOnceToken;
    @end
    
    @implementation ViewController
    
    - (void)viewDidLoad {
        dispatch_once(&_signalOnceToken, ^{
            dispatch_queue_t queue = dispatch_get_main_queue();
            self.signalSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGSTOP, 0, queue);
            
            if (self.signalSource) {
                dispatch_source_set_event_handler(self.signalSource, ^{
                    // 点击一下断点,再取消断点,便会执行这里。
                    NSLog(@"debug test");
                });
                dispatch_resume(self.signalSource);
            }
        });
    }
    

    还有 diapatch_after() 就是依赖 dispatch_source() 来实现的。我们可以自己实现一个类似的定时器:

    - (void)customTimer {
        dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
        dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 5.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 5);
        dispatch_source_set_event_handler(timerSource, ^{
            NSLog(@"dispatch source timer");
        });
        
        self.signalSource = timerSource;
        dispatch_resume(self.signalSource);
    }
    
    基本原理

    使用 dispatch_source 时,大致过程是这样的:我们创建一个 source,然后加到队列中,并调用 dispatch_resume() 方法,便会从队列中唤起 source,执行对应的 block。下面是一个详细的流程图,我们结合这张图来说一下:

    Dispatch_Source
    • 创建一个 source 对象,过程和创建 queue 类似,所以后面一些操作,和操作 queue 很类似。
    dispatch_source_t
    dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
            unsigned long mask, dispatch_queue_t dq)
    {
        dispatch_source_refs_t dr;
        dispatch_source_t ds;
    
        dr = dux_create(dst, handle, mask)._dr;
        if (unlikely(!dr)) {
            return DISPATCH_BAD_INPUT;
        }
        
        // 申请内存空间
        ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
                sizeof(struct dispatch_source_s));
        // 初始化一个队列,然后配置参数,完全被当做一个 queue 来处理
        _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
                DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
        ds->dq_label = "source";
        ds->do_ref_cnt++; // the reference the manager queue holds
        ds->ds_refs = dr;
        dr->du_owner_wref = _dispatch_ptr2wref(ds);
    
        if (slowpath(!dq)) {
            dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
        } else {
            _dispatch_retain((dispatch_queue_t _Nonnull)dq);
        }
        ds->do_targetq = dq;
        if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
            _dispatch_source_set_interval(ds, handle);
        }
        _dispatch_object_debug(ds, "%s", __func__);
        return ds;
    }
    
    • 设置 event_handler。从源码中看出,用的是 dispatch_continuation_t 进行绑定,和之前绑定 queue 一样,将 block copy 了一份。后面执行的时候,拿出来用。然后将这个任务 push 到队列里。

      void
      dispatch_source_set_event_handler(dispatch_source_t ds,
      dispatch_block_t handler)
      {
      dispatch_continuation_t dc;
      // 这里实际就是在初始化 dispatch_continuation_t
      dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
      // 经过一顿操作,将任务 push 到队列中。
      _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
      }

    • 调用 resume 方法,执行 source。一般新创建的都是暂停状态,这里判断是暂停状态,就开始唤起。

        void
    dispatch_resume(dispatch_object_t dou)
    {
        DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
        if (dx_vtable(dou._do)->do_suspend) {
            dx_vtable(dou._do)->do_resume(dou._do, false);
        }
    }
    
    • 最后一步,是最核心的异步,唤起任务开始执行。之前的 queue 最终也是走到这样类似的一步,可以看返回类型都是 dispatch_queue_wakeup_target_t,基本是沿着 queue 的逻辑一路 copy 过来。这个方法,经过一系列判断,保证所有的 source 都会在正确的队列上面执行;如果队列和任务不对应,那么就返回正确的队列,重新派发让任务在正确的队列上执行。
        DISPATCH_ALWAYS_INLINE
    static inline dispatch_queue_wakeup_target_t
    _dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
            dispatch_invoke_flags_t flags, uint64_t *owned)
    {
        dispatch_source_t ds = dou._ds;
        dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
        // 获取当前 queue
        dispatch_queue_t dq = _dispatch_queue_get_current();
        dispatch_source_refs_t dr = ds->ds_refs;
        dispatch_queue_flags_t dqf;
    
        ...
        
        // timer 事件处理
        if (dr->du_is_timer &&
                os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
            dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
            if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
                // timer has to be configured on the kevent queue
                if (dq != dkq) {
                    return dkq;
                }
                _dispatch_source_timer_configure(ds);
            }
        }
    
        // 是否安装 source
        if (!ds->ds_is_installed) {
            // The source needs to be installed on the kevent queue.
            if (dq != dkq) {
                return dkq;
            }
            _dispatch_source_install(ds, _dispatch_get_wlh(),
                    _dispatch_get_basepri());
        }
    
        // 是否暂停,因为之前判断过,一般不可能走到这里
        if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
            // Source suspended by an item drained from the source queue.
            return ds->do_targetq;
        }
    
        // 是否在
        if (_dispatch_source_get_registration_handler(dr)) {
            // The source has been registered and the registration handler needs
            // to be delivered on the target queue.
            if (dq != ds->do_targetq) {
                return ds->do_targetq;
            }
            // clears ds_registration_handler
            _dispatch_source_registration_callout(ds, dq, flags);
        }
    
        ...
            
        if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
                os_atomic_load2o(ds, ds_pending_data, relaxed)) {
            // 有些 source 还有未完成的数据,需要通过目标队列上的回调进行传送;有些 source 则需要切换到管理队列上去。
            if (dq == ds->do_targetq) {
                _dispatch_source_latch_and_call(ds, dq, flags);
                dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
                prevent_starvation = dq->do_targetq ||
                        !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
                if (prevent_starvation &&
                        os_atomic_load2o(ds, ds_pending_data, relaxed)) {
                    retq = ds->do_targetq;
                }
            } else {
                return ds->do_targetq;
            }
        }
    
        if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
            // 已经被取消的 source 需要从管理队列中卸载。卸载完成后,取消的 handler 需要交付到目标队列。
            if (!(dqf & DSF_DELETED)) {
                if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
                    // timers can cheat if not armed because there's nothing left
                    // to do on the manager queue and unregistration can happen
                    // on the regular target queue
                } else if (dq != dkq) {
                    return dkq;
                }
                _dispatch_source_refs_unregister(ds, 0);
                dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
                if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
                    if (!(dqf & DSF_ARMED)) {
                        goto unregister_event;
                    }
                    // we need to wait for the EV_DELETE
                    return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
                }
            }
            if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
                    _dispatch_source_get_cancel_handler(dr) ||
                    _dispatch_source_get_registration_handler(dr))) {
                retq = ds->do_targetq;
            } else {
                _dispatch_source_cancel_callout(ds, dq, flags);
                dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
            }
            prevent_starvation = false;
        }
    
        if (_dispatch_unote_needs_rearm(dr) &&
                !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
            // 需要在管理队列进行 rearm 的
            if (dq != dkq) {
                return dkq;
            }
            if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
                // 如果我们可以直接注销,不需要 resume
                goto unregister_event;
            }
            if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
                // 如果 source 已经暂停,不需要在管理队列 rearm
                return ds->do_targetq;
            }
            if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
                return ds->do_targetq;
            }
            if (unlikely(!_dispatch_source_refs_resume(ds))) {
                goto unregister_event;
            }
            if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
                _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
            }
        }
        return retq;
    }
    

    5. Dispatch I/O

    我们可以使用 Dispatch I/O 快速读取一些文件,例如这样 :

    - (void)readFile {
        NSString *filePath = @"/.../青花瓷.m";
        dispatch_queue_t queue = dispatch_queue_create("com.bool.readfile", DISPATCH_QUEUE_SERIAL);
        dispatch_fd_t fd = open(filePath.UTF8String, O_RDONLY,0);
        dispatch_io_t fileChannel = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
            close(fd);
        });
        
        NSMutableData *fileData = [NSMutableData new];
        dispatch_io_set_low_water(fileChannel, SIZE_MAX);
        dispatch_io_read(fileChannel, 0, SIZE_MAX, queue, ^(bool done, dispatch_data_t  _Nullable data, int error) {
            if (error == 0 && dispatch_data_get_size(data) > 0) {
                [fileData appendData:(NSData *)data];
            }
            
            if (done) {
                NSString *str = [[NSString alloc] initWithData:fileData encoding:NSUTF8StringEncoding];
                NSLog(@"read file completed, string is :\n %@",str);
            }
        });
    }
    

    输出结果:

    ConcurrencyTest[41479:5357296] read file completed, string is :
     天青色等烟雨 而我在等你
    月色被打捞起 晕开了结局
    

    如果读取大文件,我们可以进行切片读取,将文件分割多个片,放在异步线程中并发执行,这样会比较快一些。

    关于源码,简单看了一下,调度逻辑和之前的任务类似。然后读写操作,是调用的一些底层接口实现,这里就偷懒一下不详细说了。使用 Dispatch I/O,多数情况下是为了并发读取一个大文件,提高读取速度。

    6. Other

    上面已经讲了概览图中的大部分东西,还有一些未讲述,这里简单描述一下:

    • dispatch_object。GCD 用 C 函数实现的对象,不能通过集成 dispatch 类实现,也不能用 alloc 方法初始化。GCD 针对 dispatch_object 提供了一些接口,我们使用这些接口可以处理一些内存事件、取消和暂停操作、定义上下文和处理日志相关工作。dispatch_object 必须要手动管理内存,不遵循垃圾回收机制。

    • dispatch_time。在 GCD 中使用的时间对象,可以创建自定义时间,也可以使用 DISPATCH_TIME_NOWDISPATCH_TIME_FOREVER 这两个系统给出的时间。

    以上为 GCD 相关知识,这次使用的源码版本为最新版本 —— 912.30.4.tar.gz,和之前看的版本代码差距很大,因为代码量的增加,新版本代码比较乱,不过基本原理还是差不多的。曾经我一度认为,最上面的是最新版本…

    相关文章

      网友评论

          本文标题:GCD

          本文链接:https://www.haomeiwen.com/subject/kvdqmqtx.html