美文网首页一些收藏
GCD底层分析(三):栅栏、信号量、调度组以及source

GCD底层分析(三):栅栏、信号量、调度组以及source

作者: HotPotCat | 来源:发表于2021-08-14 23:27 被阅读0次

    一、栅栏函数

    CPU的乱序执行能力让我们对多线程的安全保障的努力变得异常困难。因此要保证线程安全,阻止CPU换序是必需的。遗憾的是,现在并不存在可移植的阻止换序的方法。通常情况下是调用CPU提供的一条指令,这条指令常常被称为barrier。一条barrier指令会阻止CPU将该指令之前的指令交换到barrier之后,反之亦然。换句话说,barrier指令的作用类似于一个拦水坝,阻止换序穿透这个大坝。

    栅栏函数最直接的作用:控制任务执行顺序,导致同步效果。
    有两个函数:

    • dispatch_barrier_async:前面的任务执行完毕才会执行barrier中的逻辑,以及barrier后加入队列的任务。
    • dispatch_barrier_sync:作用相同,但是会堵塞线程,影响后面的任务执行 。

    ⚠️:栅栏函数只能控制同一队列并发,相当于针对队列而言。

    1.1 应用

    1.1.1 dispatch_barrier_async 与 dispatch_barrier_sync 效果

    有如下案例:

    - (void)test {
        dispatch_queue_t concurrentQueue = dispatch_queue_create("HotpotCat", DISPATCH_QUEUE_CONCURRENT);
        dispatch_async(concurrentQueue, ^{
            sleep(3);
            NSLog(@"1");
        });
        dispatch_async(concurrentQueue, ^{
            NSLog(@"2");
        });
        dispatch_barrier_async(concurrentQueue, ^{
            NSLog(@"3:%@",[NSThread currentThread]);
        });
        dispatch_async(concurrentQueue, ^{
            NSLog(@"4");
        });
        NSLog(@"5");
    }
    

    分析:barrier阻塞的是自己以及concurrentQueue队列中在它后面加入的任务。由于这里使用的是异步函数所以任务125顺序不定,34之前。
    输出:

    GCDDemo[49708:5622304] 5
    GCDDemo[49708:5622437] 2
    GCDDemo[49708:5622434] 1
    GCDDemo[49708:5622434] 3:<NSThread: 0x600003439040>{number = 6, name = (null)}
    GCDDemo[49708:5622434] 4
    

    如果将dispatch_barrier_async改为dispatch_barrier_sync同步函数,则任务5会被阻塞。12(顺序不定)在3之前执行,45(顺序不定)在之后。

    1.1.2 栅栏函数存在的问题

    1.1.2.1 栅栏函数与全局队列

    concurrentQueue改为全局队列:

    dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
    dispatch_async(concurrentQueue, ^{
        NSLog(@"1");
    });
    dispatch_async(concurrentQueue, ^{
        NSLog(@"2");
    });
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"3:%@",[NSThread currentThread]);
    });
    dispatch_async(concurrentQueue, ^{
        NSLog(@"4");
    });
    NSLog(@"5");
    

    输出:

    GCDDemo[49872:5632760] 5
    GCDDemo[49872:5632979] 1
    GCDDemo[49872:5633673] 2
    GCDDemo[49872:5633675] 4
    GCDDemo[49872:5633674] 3:<NSThread: 0x600001160240>{number = 10, name = (null)}
    

    这个时候栅栏函数无论同步还是异步都无效了(有可能系统调度刚好符合预期)。
    这也就意味着全局并发队列不允许使用栅栏函数,一定是自定义队列才能使用。

    1.1.2.1 栅栏函数与不同队列

    将任务24放入另外一个队列:

    dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
    dispatch_queue_t concurrentQueue2 = dispatch_queue_create("Cat", DISPATCH_QUEUE_CONCURRENT);
    dispatch_async(concurrentQueue, ^{
        sleep(3);
        NSLog(@"1");
    });
    dispatch_async(concurrentQueue2, ^{
        NSLog(@"2");
    });
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"3:%@",[NSThread currentThread]);
    });
    dispatch_async(concurrentQueue2, ^{
        NSLog(@"4");
    });
    NSLog(@"5");
    

    输出:

    GCDDemo[49981:5639766] 5
    GCDDemo[49981:5640003] 2
    GCDDemo[49981:5639998] 4
    GCDDemo[49981:5639997] 1
    GCDDemo[49981:5639998] 3:<NSThread: 0x600003761500>{number = 5, name = (null)}
    

    这个时候concurrentQueue2中的任务先执行了,它并不受栅栏函数的影响。那么说明 栅栏函数只对同一个队列中的任务起作用

    1.1.3 栅栏函数作为锁使用

    有如下代码:

    NSMutableArray *array = [NSMutableArray array];
    dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
    for (int i = 0; i < 1000; i++) {
        dispatch_async(concurrentQueue, ^{
            [array addObject:@(i)];
        });
    }
    
    • 多个线程同时操作array
    • addObject的时候有可能存在同一时间对同一块内存空间写入数据。
      比如写第3个数据的时候,当前数组中数据是(1、2)这个时候有2个线程同时写入数据就存在了(1、2、3)(1、2、4)`这个时候数据就发生了混乱造成了错误。

    在运行的时候由于线程不安全(可变数组线程不安全),发生了写入错误直接报错:


    image.png

    将数组添加元素的操作放入dispatch_barrier_async中:

    NSMutableArray *array = [NSMutableArray array];
    dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
    for (int i = 0; i < 1000; i++) {
        dispatch_async(concurrentQueue, ^{
            dispatch_barrier_async(concurrentQueue , ^{
                [array addObject:@(i)];
            });
        });
    }
    

    这样就没问题了,加入栅栏函数写入数据的时候相当于加了锁。

    1.2 原理分析

    根据1.1中的案例有3个问题:

    • 1.为什么栅栏函数能起作用?
    • 2.为什么全局队列无效?
    • 3.为什么任务必须在同一队列才有效?

    1.2.1 dispatch_barrier_sync

    dispatch_barrier_sync源码如下:

    void
    dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    

    直接调用_dispatch_barrier_sync_f

    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    

    仍然是对_dispatch_barrier_sync_f_inline的调用:

    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
    
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            //死锁走这里的逻辑,同步栅栏函数也走这里
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    

    栅栏函数这个时候走的也是_dispatch_sync_f_slow逻辑:

    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
        ......
        _dispatch_trace_item_push(top_dq, &dsc);
        //死锁报错
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    

    断点调试走的是_dispatch_sync_complete_recurse

    static void
    _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
            uintptr_t dc_flags)
    {
        bool barrier = (dc_flags & DC_FLAG_BARRIER);
        do {
            if (dq == stop_dq) return;
            if (barrier) {
                //唤醒执行
                //_dispatch_lane_wakeup
                dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
            } else {
                //已经执行完成没有栅栏函数
                _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
            }
            dq = dq->do_targetq;
            barrier = (dq->dq_width == 1);
        } while (unlikely(dq->do_targetq));
    }
    
    • 这里进行了递归调用,循环条件是dq->do_targetq也就是 仅对当前队列有效
    • 唤醒执行栅栏前任务执行_dispatch_lane_wakeup逻辑。
    • 当栅栏前的任务执行完毕走_dispatch_lane_non_barrier_complete逻辑。这也就是为什么栅栏起作用的原因。

    dx_wakeup在全局队列是_dispatch_root_queue_wakeup,在自定义并行队列是_dispatch_lane_wakeup

    1.2.1.1 _dispatch_lane_wakeup

    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            //barrier完成了就走这里的逻辑,barrier之前的任务执行完毕。
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        //走这里
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    
    • 在栅栏函数执行完毕后才走_dispatch_lane_barrier_complete_dispatch_lane_non_barrier_complete中的逻辑就汇合了。
    • 没有执行完毕的时候执行_dispatch_queue_wakeup

    _dispatch_queue_wakeup源码如下:

    void
    _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
    {
    ......
    
        if (likely((old_state ^ new_state) & enqueue)) {
    ......
            //_dispatch_queue_push_queue 断点断不住,走这里。
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
    ......
    }
    

    最终走的是_dispatch_queue_push_queue逻辑:

    static inline void
    _dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
            uint64_t dq_state)
    {
    #if DISPATCH_USE_KEVENT_WORKLOOP
        if (likely(_dq_state_is_base_wlh(dq_state))) {
            _dispatch_trace_runtime_event(worker_request, dq._dq, 1);
            return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,
                    DISPATCH_EVENT_LOOP_CONSUME_2);
        }
    #endif // DISPATCH_USE_KEVENT_WORKLOOP
        _dispatch_trace_item_push(tq, dq);
        //_dispatch_lane_concurrent_push
        return dx_push(tq, dq, _dq_state_max_qos(dq_state));
    }
    

    内部是对_dispatch_lane_concurrent_push的调用:

    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    

    这里直接调用_dispatch_lane_push

    void
    _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
    ......
        if (flags) {
            //栅栏函数走这里。
            //#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
            //dx_wakeup 对应 dq_wakeup 自定义全局队列对应 _dispatch_lane_wakeup,全局队列对应 _dispatch_root_queue_wakeup
            return dx_wakeup(dq, qos, flags);
        }
    }
    

    又调用回了_dispatch_lane_wakeup,相当于一直扫描。

    1.2.1.2 _dispatch_lane_non_barrier_complete

    static void
    _dispatch_lane_non_barrier_complete(dispatch_lane_t dq,
            dispatch_wakeup_flags_t flags)
    {
        ......
        _dispatch_lane_non_barrier_complete_finish(dq, flags, old_state, new_state);
    }
    

    其中是对_dispatch_lane_non_barrier_complete_finish的调用。

    DISPATCH_ALWAYS_INLINE
    static void
    _dispatch_lane_non_barrier_complete_finish(dispatch_lane_t dq,
            dispatch_wakeup_flags_t flags, uint64_t old_state, uint64_t new_state)
    {
        if (_dq_state_received_override(old_state)) {
            // Ensure that the root queue sees that this thread was overridden.
            _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
        }
    
        if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
            if (_dq_state_is_dirty(old_state)) {
            //走_dispatch_lane_barrier_complete逻辑
            return _dispatch_lane_barrier_complete(dq, 0, flags);
        }
    
        if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
            if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
                _dispatch_retain_2(dq);
            }
            dispatch_assert(!_dq_state_is_base_wlh(new_state));
            _dispatch_trace_item_push(dq->do_targetq, dq);
            return dx_push(dq->do_targetq, dq, _dq_state_max_qos(new_state));
        }
    
        if (flags & DISPATCH_WAKEUP_CONSUME_2) {
            _dispatch_release_2_tailcall(dq);
        }
    }
    

    走的是_dispatch_lane_barrier_complete逻辑:

    DISPATCH_NOINLINE
    static void
    _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
        dispatch_lane_t dq = dqu._dl;
    
        if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
            struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
            //串行队列
            if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
                if (_dispatch_object_is_waiter(dc)) {
                    //栅栏中的任务逻辑
                    return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
                }
            } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
                return _dispatch_lane_drain_non_barriers(dq, dc, flags);
            }
    
            if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
                _dispatch_retain_2(dq);
                flags |= DISPATCH_WAKEUP_CONSUME_2;
            }
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
    
        uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
                dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
        //执行栅栏后续的代码
        return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
    }
    
    • _dispatch_lane_drain_barrier_waiter执行栅栏函数中的任务。
    • _dispatch_lane_class_barrier_complete执行栅栏函数后续的代码。

    调用_dispatch_lane_drain_barrier_waiter执行栅栏函数中的任务:

    static void
    _dispatch_lane_drain_barrier_waiter(dispatch_lane_t dq,
            struct dispatch_object_s *dc, dispatch_wakeup_flags_t flags,
            uint64_t enqueued_bits)
    {
        ......
        return _dispatch_barrier_waiter_redirect_or_wake(dq, dc, flags,
                old_state, new_state);
    }
    

    直接调用_dispatch_barrier_waiter_redirect_or_wake

    static void
    _dispatch_barrier_waiter_redirect_or_wake(dispatch_queue_class_t dqu,
            dispatch_object_t dc, dispatch_wakeup_flags_t flags,
            uint64_t old_state, uint64_t new_state)
    {
        ......
        return _dispatch_waiter_wake(dsc, wlh, old_state, new_state);
    }
    

    调用_dispatch_waiter_wake

    static void
    _dispatch_waiter_wake(dispatch_sync_context_t dsc, dispatch_wlh_t wlh,
            uint64_t old_state, uint64_t new_state)
    {
        dispatch_wlh_t waiter_wlh = dsc->dc_data;
    
        if ((_dq_state_is_base_wlh(old_state) && !dsc->dsc_from_async) ||
                _dq_state_is_base_wlh(new_state) ||
                waiter_wlh != DISPATCH_WLH_ANON) {
            _dispatch_event_loop_wake_owner(dsc, wlh, old_state, new_state);
        }
        if (unlikely(waiter_wlh == DISPATCH_WLH_ANON)) {
            //走这里
            _dispatch_waiter_wake_wlh_anon(dsc);
        }
    }
    

    调用_dispatch_waiter_wake_wlh_anon:

    static void
    _dispatch_waiter_wake_wlh_anon(dispatch_sync_context_t dsc)
    {
        if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
            _dispatch_wqthread_override_start(dsc->dsc_waiter,
                    dsc->dsc_override_qos);
        }
        //执行
        _dispatch_thread_event_signal(&dsc->dsc_event);
    }
    

    其中是对线程发送信号。

    对于_dispatch_root_queue_wakeup而言:

    void
    _dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
            DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
    {
        if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
            DISPATCH_INTERNAL_CRASH(dq->dq_priority,
                    "Don't try to wake up or override a root queue");
        }
        if (flags & DISPATCH_WAKEUP_CONSUME_2) {
            return _dispatch_release_2_tailcall(dq);
        }
    }
    

    内部没有对barrier的处理,所以全局队列栅栏函数无效。

    因为全局队列不仅有你的任务,还有其它系统的任务。如果加barrier不仅影响你自己的任务还会影响系统的任务。对于全局队列而言栅栏函数就是个普通的异步函数。

    整个流程如下:


    dispatch_barrier_sync逻辑

    1.2.2 dispatch_barrier_async

    dispatch_barrier_async源码如下:

    void
    dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc_flags);
    }
    

    调用的是_dispatch_continuation_async

    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);
    }
    

    调用了dx_push,对应的自定义队列是_dispatch_lane_concurrent_push。全局队列是_dispatch_root_queue_push

    _dispatch_lane_concurrent_push:

    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    

    断点跟踪走的是_dispatch_lane_push

    DISPATCH_NOINLINE
    void
    _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        dispatch_wakeup_flags_t flags = 0;
        struct dispatch_object_s *prev;
    
        if (unlikely(_dispatch_object_is_waiter(dou))) {
            return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
        }
    
        dispatch_assert(!_dispatch_object_is_global(dq));
        qos = _dispatch_queue_push_qos(dq, qos);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
        if (unlikely(os_mpsc_push_was_empty(prev))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
        } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2;
        }
        os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
        if (flags) {
            //栅栏函数走这里。
            //#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
            //dx_wakeup 对应 dq_wakeup 自定义全局队列对应 _dispatch_lane_wakeup,全局队列对应 _dispatch_root_queue_wakeup
            return dx_wakeup(dq, qos, flags);
        }
    }
    

    栅栏函数走_dispatch_lane_wakeup逻辑:

    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        //走这里
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    

    继续断点走_dispatch_queue_wakeup逻辑:

    void
    _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
    {
        ......
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        ......
            //loop  _dispatch_lane_wakeup  //_dq_state_merge_qos
            return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
                    flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
        }
    
        if (target) {
        ......
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        } else if (qos) {
        ......
        if (likely((old_state ^ new_state) & enqueue)) {
            ...... //_dispatch_queue_push_queue断点断不住,断它内部断点
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
            if (_dq_state_should_override(new_state)) {
                return _dispatch_queue_wakeup_with_override(dq, new_state,
                        flags);
            }
        }
    #endif // HAVE_PTHREAD_WORKQUEUE_QOS
    done:
        if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            return _dispatch_release_2_tailcall(dq);
        }
    }
    

    这里断点走了_dispatch_queue_push_queue逻辑(_dispatch_queue_push_queue本身断不住,断它内部断点):

    static inline void
    _dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
            uint64_t dq_state)
    {
    #if DISPATCH_USE_KEVENT_WORKLOOP
        if (likely(_dq_state_is_base_wlh(dq_state))) {
            _dispatch_trace_runtime_event(worker_request, dq._dq, 1);
            return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,
                    DISPATCH_EVENT_LOOP_CONSUME_2);
        }
    #endif // DISPATCH_USE_KEVENT_WORKLOOP
        _dispatch_trace_item_push(tq, dq);
        //_dispatch_lane_concurrent_push
        return dx_push(tq, dq, _dq_state_max_qos(dq_state));
    }
    

    内部走的是_dispatch_lane_concurrent_push逻辑,这里又继续走了_dispatch_lane_push的逻辑了,在这里就造成了循环等待。当队列中任务执行完毕后_dispatch_lane_wakeup中就走_dispatch_lane_barrier_complete逻辑了。

    可以通过barrier前面的任务加延迟去验证。直接断点_dispatch_lane_barrier_complete,当前面的任务执行完毕后就进入_dispatch_lane_barrier_complete断点了。

    _dispatch_lane_barrier_complete源码如下:

    static void
    _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
    ......
        return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
    }
    

    走了_dispatch_lane_class_barrier_complete逻辑:

    static void
    _dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
            uint64_t owned)
    {
    ......
    again:
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
        ......
            } else if (unlikely(_dq_state_is_dirty(old_state))) {
        ......
                    flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
                    //自定义并行队列 _dispatch_lane_wakeup
                    return dx_wakeup(dq, qos, flags);
                });
            } else {
                new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
            }
        });
    ......
    }
    

    调用走的是_dispatch_lane_wakeup

    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            //barrier完成了就走这里的逻辑,barrier之前的任务执行完毕。
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        //走这里
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    

    这个时候就又走到了_dispatch_lane_barrier_complete

    DISPATCH_WAKEUP_BARRIER_COMPLETE状态是在_dispatch_lane_resume中进行变更的:

    image.png

    _dispatch_root_queue_push内部并没有对barrier的处理,与全局队列逻辑一致。所以barrier函数传递全局队列无效。

    整个过程如下:


    dispatch_barrier_async逻辑

    二、信号量(dispatch_semaphore_t

    相关函数:

    • dispatch_semaphore_create:创建信号量
    • dispatch_semaphore_wait:信号量等待
    • dispatch_semaphore_signal:信号量释放

    信号量有两个效果:同步作为锁控制GCD最大并发数

    二元信号量是最简单的一种锁,只有两种状态:占用与非占用。适合只能被唯一一个线程独占访问资源。当二元信号量处于非占用状态时,第一个试图获取该二元信号量的线程会获得该锁,并将二元信号置为占用状态,此后其他的所有视图获取该二元信号量的线程将会等待,直到该锁被释放。

    对于允许多个线程并发访问的资源,多元信号量简称信号量,它是一个很好的选择。一个初始值为 N 的信号量允许 N 个线程并发访问。线程访问资源的时候首先获取信号量,进行如下操作:

    • 将信号量的值减1
    • 如果信号量的值小于0,则进入等待状态,否则继续执行。

    访问完资源之后,线程释放信号量,进行如下操作:

    • 将信号量的值+1
    • 如果信号量的值< 1,唤醒一个等待中的线程。

    2.1 应用

        dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
        dispatch_semaphore_t sem = dispatch_semaphore_create(1);
        dispatch_queue_t queue1 = dispatch_queue_create("HotpotCat", NULL);
     
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            NSLog(@"1 start");
            NSLog(@"1 end");
            dispatch_semaphore_signal(sem);
        });
        
        dispatch_async(queue1, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            NSLog(@"2 start");
            NSLog(@"2 end");
            dispatch_semaphore_signal(sem);
        });
        
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            NSLog(@"3 start");
            NSLog(@"3 end");
            dispatch_semaphore_signal(sem);
        });
        
        dispatch_async(queue1, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            NSLog(@"4 start");
            NSLog(@"4 end");
            dispatch_semaphore_signal(sem);
        });
    

    对于上面的例子输出:

    1 start
    1 end
    2 start
    2 end
    3 start
    3 end
    4 start
    4 end
    

    这个时候信号量初始化的是1,全局队列与自定义串行队列中的任务按顺序依次执行。
    当将信号量改为2后输出:

    1 start
    2 start
    2 end
    1 end
    3 start
    4 start
    3 end
    4 end
    

    这个时候1、2先执行无序,3、4后执行无序。这样就控制了GCD任务的最大并发数。

    修改代码如下:

    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(0);
    
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        NSLog(@"1 start");
        NSLog(@"1 end");
    });
    
    dispatch_async(queue, ^{
        sleep(2);
        NSLog(@"2 start");
        NSLog(@"2 end");
        dispatch_semaphore_signal(sem);
    });
    

    信号量初始值修改为0,在任务1wait,在任务2signal,这个时候输出如下:

    2 start
    2 end
    1 start
    1 end
    

    任务2比任务1先执行了。由于信号量初始化为0wait函数后面任务就执行不了一直等待;等到signal执行后发送信号wait就可以执行了。这样就达到了控制流程。任务2中的信号控制了任务1的执行。

    2.2 源码分析

    2.2.1 dispatch_semaphore_create

    /*
     * @param dsema
     * The semaphore. The result of passing NULL in this parameter is undefined.
    */
    
    dispatch_semaphore_t
    dispatch_semaphore_create(intptr_t value)
    {
        dispatch_semaphore_t dsema;
    
        // If the internal value is negative, then the absolute of the value is
        // equal to the number of waiting threads. Therefore it is bogus to
        // initialize the semaphore with a negative value.
        if (value < 0) { //>=0 才有用,否则直接返回
            return DISPATCH_BAD_INPUT;// 0
        }
    
        dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
                sizeof(struct dispatch_semaphore_s));
        dsema->do_next = DISPATCH_OBJECT_LISTLESS;
        dsema->do_targetq = _dispatch_get_default_queue(false);
        dsema->dsema_value = value;
        _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        dsema->dsema_orig = value;
        return dsema;
    }
    
    • value < 0的时候无效,只有>= 0才有效,才会执行后续流程。

    2.2.2 dispatch_semaphore_wait

    intptr_t
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
        //--
        long value = os_atomic_dec2o(dsema, dsema_value, acquire);
        if (likely(value >= 0)) { //>=0 返回
            return 0;
        }
        return _dispatch_semaphore_wait_slow(dsema, timeout);
    }
    
    • --value大于等于0直接返回0。执行dispatch_semaphore_wait后续的代码。
    • 否则执行_dispatch_semaphore_wait_slow(相当于do-while循环)。

    _dispatch_semaphore_wait_slow
    当信号量为0的时候调用wait后(< 0)就走_dispatch_semaphore_wait_slow逻辑了:

    DISPATCH_NOINLINE
    static intptr_t
    _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
            dispatch_time_t timeout)
    {
        long orig;
    
        _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        //超时直接break
        switch (timeout) {
        default:
            if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
                break;
            }
            // Fall through and try to undo what the fast path did to
            // dsema->dsema_value
        //NOW的情况下进行超时处理
        case DISPATCH_TIME_NOW:
            orig = dsema->dsema_value;
            while (orig < 0) {
                if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                        &orig, relaxed)) {
                    return _DSEMA4_TIMEOUT();
                }
            }
            // Another thread called semaphore_signal().
            // Fall through and drain the wakeup.
        //FOREVER则进入wait逻辑。
        case DISPATCH_TIME_FOREVER:
            _dispatch_sema4_wait(&dsema->dsema_sema);
            break;
        }
        return 0;
    }
    
    • 当值为timeout的时候直接break
    • 当值为DISPATCH_TIME_NOW的时候循环调用_DSEMA4_TIMEOUT()
    #define _DSEMA4_TIMEOUT() KERN_OPERATION_TIMED_OUT
    
    • 当值为DISPATCH_TIME_FOREVER的时候调用_dispatch_sema4_wait

    _dispatch_sema4_wait

    //    void
    //    _dispatch_sema4_wait(_dispatch_sema4_t *sema)
    //    {
    //        int ret = 0;
    //        do {
    //            ret = sem_wait(sema);
    //        } while (ret == -1 && errno == EINTR);
    //        DISPATCH_SEMAPHORE_VERIFY_RET(ret);
    //    }
    
    void
    _dispatch_sema4_wait(_dispatch_sema4_t *sema)
    {
        kern_return_t kr;
        do {
            kr = semaphore_wait(*sema);
        } while (kr == KERN_ABORTED);
        DISPATCH_SEMAPHORE_VERIFY_KR(kr);
    }
    
    • semaphore_wait并没有搜到实现,这是pthread内核封装的实现。
    • _dispatch_sema4_wait本质上是一个do-while循环,相当于在这里直接卡住执行不到后面的逻辑了。相当于:
    dispatch_async(queue, ^{
    //  dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        do {
            //循环
        } while (signal <= 0);
        NSLog(@"1 start");
        NSLog(@"1 end");
    });
    

    结论:value >= 0的时候执行后续的代码,否则do-while循环卡住后续逻辑

    2.2.3 dispatch_semaphore_signal

    /*!
     * @function dispatch_semaphore_signal
     *
     * @abstract
     * Signal (increment) a semaphore.
     *
     * @discussion
     * Increment the counting semaphore. If the previous value was less than zero,
     * this function wakes a waiting thread before returning.
     *
     * @param dsema The counting semaphore.
     * The result of passing NULL in this parameter is undefined.
     *
     * @result
     * This function returns non-zero if a thread is woken. Otherwise, zero is
     * returned.
     */
    intptr_t
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
        //++操作
        long value = os_atomic_inc2o(dsema, dsema_value, release);
        if (likely(value > 0)) {
            return 0;
        }
        //++ 后还 < 0,则表示做wait操作(--)过多。报错。
        if (unlikely(value == LONG_MIN)) {
            DISPATCH_CLIENT_CRASH(value,
                    "Unbalanced call to dispatch_semaphore_signal()");
        }
        //发送信号量逻辑,恢复wait等待的操作。
        return _dispatch_semaphore_signal_slow(dsema);
    }
    
    • os_atomic_inc2o执行++后值大于0直接返回能够执行。
    • 只有<= 0的时候才执行后续流程,调用_dispatch_semaphore_signal_slow进行异常处理。
    • 注释说明了当值< 0的时候在return之前唤醒一个等待线程。

    _dispatch_semaphore_signal_slow

    intptr_t
    _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
    {
        _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        _dispatch_sema4_signal(&dsema->dsema_sema, 1);
        return 1;
    }
    

    直接调用_dispatch_sema4_signal

    _dispatch_sema4_signal

    #define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
            DISPATCH_VERIFY_MIG(x); \
            if (unlikely((x) == KERN_INVALID_NAME)) { \
                DISPATCH_CLIENT_CRASH((x), \
                    "Use-after-free of dispatch_semaphore_t or dispatch_group_t"); \
            } else if (unlikely(x)) { \
                DISPATCH_INTERNAL_CRASH((x), "mach semaphore API failure"); \
            } \
        } while (0)
    
    //经过调试走的是这个逻辑
    void
    _dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
    {
        do {
            kern_return_t kr = semaphore_signal(*sema);//+1
            DISPATCH_SEMAPHORE_VERIFY_KR(kr);// == -1 报错
        } while (--count);//do-while(0) 只执行一次
    }
    

    相当于内部做了+1操作。这也是当信号量初始值为0的时候dispatch_semaphore_signal执行完毕后dispatch_semaphore_wait能够执行的原因。

    小结:

    • dispatch_semaphore_wait进行--操作,减完是负值进入do-while循环,阻塞后续流程
    • dispatch_semaphore_signal进行++操作,加完值不大于0进入后续报错流程
    • semaphore_signalsemaphore_wait才是信号量能控制最大并发数的根本原因,否则dispatch_semaphore_signaldispatch_semaphore_signal都是判断后直接返回,相当于什么都没做

    semaphore_signal & semaphore_wait

    三、调度组

    最直接的作用: 控制任务执行顺序
    相关API:

    • dispatch_group_create 创建组
    • dispatch_group_async 进组任务 (与dispatch_group_enterdispatch_group_leave搭配使用效果相同)
      • dispatch_group_enter 进组
      • dispatch_group_leave 出组
    • dispatch_group_notify 进组任务执行完毕通知
    • dispatch_group_wait 进组任务执行等待时间

    3.1 应用

    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_group_async(group, queue, ^{
        sleep(3);
        NSLog(@"1");
    });
    
    dispatch_group_async(group, queue1, ^{
        sleep(2);
        NSLog(@"2");
    });
    
    dispatch_group_async(group, queue1, ^{
        sleep(1);
        NSLog(@"3");
    });
    
    dispatch_group_async(group, queue, ^{
        NSLog(@"4");
    });
    
    dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
        NSLog(@"5");
    });
    

    有如上案例,任务5永远在任务1、2、3、4之后执行。

    当然也可以使用enterleave配合dispatch_async使用:

    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    
    //先 enter 再 leave
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
        sleep(3);
        NSLog(@"1");
        dispatch_group_leave(group);
    });
    
    dispatch_group_enter(group);
    dispatch_async(queue1, ^{
        sleep(2);
        NSLog(@"2");
        dispatch_group_leave(group);
    });
    
    dispatch_group_enter(group);
    dispatch_async(queue1, ^{
        sleep(1);
        NSLog(@"3");
        dispatch_group_leave(group);
    });
    
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
        NSLog(@"4");
        dispatch_group_leave(group);
    });
    
    dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
        NSLog(@"5");
    });
    

    效果相同,需要注意的是dispatch_group_enter要比dispatch_group_leave先调用,并且必须成对出现,否则会崩溃。当然两种形式也可以混着用。

    3.2 源码分析

    根据上面的分析有3个问题:

    • 1.dispatch_group_enter为什么要比dispatch_group_leave先调用,否则崩溃?
    • 2.能够实现同步的原理是什么?
    • 3.dispatch_group_async为什么等价于dispatch_group_enter + dispatch_group_leave?

    之前的版本调度组是封装了信号量,目前新版本的是调度组自己写了一套逻辑。

    3.2.1 dispatch_group_create

    dispatch_group_t
    dispatch_group_create(void)
    {
        return _dispatch_group_create_with_count(0);
    }
    
    //creat & enter 写在一起的写法,信号量标记位1
    dispatch_group_t
    _dispatch_group_create_and_enter(void)
    {
        return _dispatch_group_create_with_count(1);
    }
    

    是对_dispatch_group_create_with_count的调用:

    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
        dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
                sizeof(struct dispatch_group_s));
        dg->do_next = DISPATCH_OBJECT_LISTLESS;
        dg->do_targetq = _dispatch_get_default_queue(false);
        if (n) {
            os_atomic_store2o(dg, dg_bits,
                    (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
            os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
        }
        return dg;
    }
    

    调用_dispatch_object_alloc创建group,与信号量写法相似

    3.2.2 dispatch_group_enter

    void
    dispatch_group_enter(dispatch_group_t dg)
    {
        // The value is decremented on a 32bits wide atomic so that the carry
        // for the 0 -> -1 transition is not propagated to the upper 32bits.
        //0-- -> -1,与信号量不同的是没有wait
        uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
                DISPATCH_GROUP_VALUE_INTERVAL, acquire);
        uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
        if (unlikely(old_value == 0)) {
            _dispatch_retain(dg); // <rdar://problem/22318411>
        }
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
            DISPATCH_CLIENT_CRASH(old_bits,
                    "Too many nested calls to dispatch_group_enter()");
        }
    }
    
    • 0--变为-1,与信号量不同的是没有wait操作。

    3.2.3 dispatch_group_leave

    void
    dispatch_group_leave(dispatch_group_t dg)
    {
        // The value is incremented on a 64bits wide atomic so that the carry for
        // the -1 -> 0 transition increments the generation atomically.
        //-1++ -> 0
        uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
                DISPATCH_GROUP_VALUE_INTERVAL, release);
        //#define DISPATCH_GROUP_VALUE_MASK       0x00000000fffffffcULL
        // old_state & DISPATCH_GROUP_VALUE_MASK 是一个很大的值
        uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
        //-1 & DISPATCH_GROUP_VALUE_MASK == DISPATCH_GROUP_VALUE_1,old_value = -1
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {//old_value == -1
            old_state += DISPATCH_GROUP_VALUE_INTERVAL;
            do {
                new_state = old_state;
                if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                    new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                } else {
                    // If the group was entered again since the atomic_add above,
                    // we can't clear the waiters bit anymore as we don't know for
                    // which generation the waiters are for
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                }
                if (old_state == new_state) break;
            } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                    old_state, new_state, &old_state, relaxed)));
            //调用 _dispatch_group_wake,唤醒 dispatch_group_notify
            return _dispatch_group_wake(dg, old_state, true);
        }
        //old_value 为0的情况下直接报错,也就是先leave的情况下直接报错
        if (unlikely(old_value == 0)) {
            DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                    "Unbalanced call to dispatch_group_leave()");
        }
    }
    
    • -1++变为0,当old_value == -1的时候调用_dispatch_group_wake唤醒dispatch_group_notify
    • 既然old_value == -1的时候才唤醒,那么多次enter只有最后一次leave的时候才能唤醒。
    • old_value == 0的时候直接报错,这也就是为什么先调用leave直接发生了crash

    3.2.4 dispatch_group_notify

    void
    dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        dispatch_continuation_t dsn = _dispatch_continuation_alloc();
        _dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
        _dispatch_group_notify(dg, dq, dsn);
    }
    

    调用_dispatch_group_notify

    static inline void
    _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dsn)
    {
        uint64_t old_state, new_state;
        dispatch_continuation_t prev;
    
        dsn->dc_data = dq;
        _dispatch_retain(dq);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
        os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) {
            os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
                new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
                if ((uint32_t)old_state == 0) {//循环判断 old_state == 0 的时候 wake
                    os_atomic_rmw_loop_give_up({
                        return _dispatch_group_wake(dg, new_state, false);
                    });
                }
            });
        }
    }
    
    • old_state == 0的时候调用_dispatch_group_wake,也就是调用blockcallout。与leave调用了同一个方法。

    为什么两个地方都调用了?
    因为在leave的时候dispatch_group_notify可能已经执行过了,任务已经保存在了group中,leave的时候本身尝试调用一次。
    当然leave中也可能是一个延时任务,当调用leave的时候notify可能还没有执行,就导致notify任务还不存在。所以需要在notify中也调用。

    _dispatch_group_wake

    static void
    _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
    {
        uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
        if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
            dispatch_continuation_t dc, next_dc, tail;
    
            // Snapshot before anything is notified/woken <rdar://problem/8554546>
            dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
            do {
                dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
                next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
                //异步回调,执行block callout
                _dispatch_continuation_async(dsn_queue, dc,
                        _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
                _dispatch_release(dsn_queue);
            } while ((dc = next_dc));
    
            refs++;
        }
    
        if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
            _dispatch_wake_by_address(&dg->dg_gen);
        }
    
        if (refs) _dispatch_release_n(dg, refs);
    }
    
    • 调用_dispatch_continuation_async相当于调用的是dispatch_async执行notify的任务。
    • 任务先保存在在group中,唤醒notify的时候才将任务加入队列。

    3.2.5 dispatch_group_async

    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
           //标记 DC_FLAG_GROUP_ASYNC
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
        _dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    

    调用_dispatch_continuation_group_async

    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc, dispatch_qos_t qos)
    {
        //调用enter
        dispatch_group_enter(dg);
        dc->dc_data = dg;
        //dispatch_async
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    • 内部先调用dispatch_group_enter,在这里就等待wakeup的调用了
    • 再调用_dispatch_continuation_async,相当于dispatch_async

    那么leave在什么时候调用呢?
    肯定要在callout执行完毕后调用。_dispatch_continuation_async的调用以全局队列为例调用_dispatch_root_queue_push,最终会调用到_dispatch_continuation_invoke_inline

    image.png
    在这里就进行了逻辑区分,有group的情况下(dispatch_group_async的时候dc_flags进行了标记)调用的是_dispatch_continuation_with_group_invoke
    static inline void
    _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
    {
        struct dispatch_object_s *dou = dc->dc_data;
        unsigned long type = dx_type(dou);
        if (type == DISPATCH_GROUP_TYPE) {
            //callout
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_trace_item_complete(dc);
            //leave
            dispatch_group_leave((dispatch_group_t)dou);
        } else {
            DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
        }
    }
    
    • callout后调用了dispatch_group_leave

    dispatch_group_async 底层是对 dispatch_group_enter + dispatch_group_leave 的封装

    • dispatch_group_async中先进行dispatch_group_enter,然后执行dispatch_async
    • 在回调中先_dispatch_client_callout然后dispatch_group_leave

    四、Dispatch Source

    在任一线程上调用它的的一个函数 dispatch_source_merge_data 后,会执行 dispatch source 事先定义好的句柄(可以把句柄简单理解为一个 block ) 这个过程叫 用户事件(Custom event)。是 dispatch source 支持处理的一种事件。

    句柄是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有很密切的关系。比如:实例句柄(HINSTANCE),位图句柄(HBITMAP),设备表述句柄(HDC),图标句柄(HICON)等。这当中还有一个通用的句柄,就是HANDLE

    Dispatch Source有两点:

    • CPU 负荷非常小,尽量不占用资源 。
    • 联结的优势。
    • dispatch source不受runloop的影响,底层封装的是pthread

    相关API

    • dispatch_source_create 创建源
    • dispatch_source_set_event_handler 设置源事件回调
    • dispatch_source_merge_data 源事件设置数据
    • dispatch_source_get_data 获取源事件数据
    • dispatch_resume 继续
    • dispatch_suspend 挂起

    4.1 应用

    dispatch_source_t
    dispatch_source_create(dispatch_source_type_t type,
        uintptr_t handle,
        uintptr_t mask,
        dispatch_queue_t _Nullable queue);
    
    • typedispatch 源可处理的事件。比如:DISPATCH_SOURCE_TYPE_TIMERDISPATCH_SOURCE_TYPE_DATA_ADD
      • DISPATCH_SOURCE_TYPE_DATA_ADD: 将所有触发结果相加,最后统一执行响应。间隔的时间越长,则每次触发都会响应;如果间隔的时间很短,则会将触发后的结果相加后统一触发。也就是利用CPU空闲时间进行回调。
    • handle:可以理解为句柄、索引或id,如果要监听进程,需要传入进程的ID
    • mask:可以理解为描述,具体要监听什么。
    • queue:处理handle的队列。

    有如下一个进度条的案例:

    self.completed = 0;
    self.queue = dispatch_queue_create("HotpotCat", NULL);
    self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
    //设置句柄
    dispatch_source_set_event_handler(self.source, ^{
        NSLog(@"%@",[NSThread currentThread]);
        NSUInteger value = dispatch_source_get_data(self.source);
        self.completed += value;
        double progress = self.completed / 100.0;
        NSLog(@"progress: %.2f",progress);
        self.progressView.progress = progress;
    });
    self.isRunning = YES;
    //创建后默认是挂起状态
    dispatch_resume(self.source);
    

    创建了一个ADD类型的source,在handle获取进度增量并更新进度条。由于创建后source处于挂起状态,需要先恢复。

    可以在按钮的点击事件中进行任务的挂起和恢复:

    if (self.isRunning) {
        dispatch_suspend(self.source);
        dispatch_suspend(self.queue);
        NSLog(@"pause");
        self.isRunning = NO;
        [sender setTitle:@"pause" forState:UIControlStateNormal];
    } else {
        dispatch_resume(self.source);
        dispatch_resume(self.queue);
        NSLog(@"running");
        self.isRunning = YES;
        [sender setTitle:@"running" forState:UIControlStateNormal];
    }
    

    任务的执行是一个简单的循环:

    for (NSInteger i = 0; i < 100; i++) {
        dispatch_async(self.queue, ^{
            NSLog(@"merge");
            //加不加 sleep 影响 handler 的执行次数。
            sleep(1);
            dispatch_source_merge_data(self.source, 1);//+1
        });
    }
    
    • 在循环中调用dispatch_source_merge_data触发回调。当queue挂起后后续任务就不再执行了。
    • 在不加sleep的情况下handler的回调是小于100次的,任务会被合并。

    4.2 源码解析

    4.2.1 dispatch_source_create

    dispatch_source_t
    dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
            uintptr_t mask, dispatch_queue_t dq)
    {
        dispatch_source_refs_t dr;
        dispatch_source_t ds;
        //add对应 _dispatch_source_data_create timer对应 _dispatch_source_timer_create
        dr = dux_create(dst, handle, mask)._dr;
        if (unlikely(!dr)) {
            return DISPATCH_BAD_INPUT;
        }
        //创建队列
        ds = _dispatch_queue_alloc(source,
                dux_type(dr)->dst_strict ? DSF_STRICT : DQF_MUTABLE, 1,
                DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER)._ds;
        ds->dq_label = "source";
        ds->ds_refs = dr;
        dr->du_owner_wref = _dispatch_ptr2wref(ds);
    
        //没有传队列,获取root_queues
        if (unlikely(!dq)) {
            dq = _dispatch_get_default_queue(true);
        } else {
            _dispatch_retain((dispatch_queue_t _Nonnull)dq);
        }
        //目标队列为传进来的dq
        ds->do_targetq = dq;
        //是timer 并且设置了interval则调用dispatch_source_set_timer进行设置
        //也就是说type为timer的时候即使不设置timer也会默认设置。这里时间间隔设置为了handle
        if (dr->du_is_timer && (dr->du_timer_flags & DISPATCH_TIMER_INTERVAL)) {
            dispatch_source_set_timer(ds, DISPATCH_TIME_NOW, handle, UINT64_MAX);
        }
        _dispatch_object_debug(ds, "%s", __func__);
        //返回自己创建的source,source本身也是队列。
        return ds;
    }
    
    • 根据type创建对应的队列。add对应_dispatch_source_data_createtimer对应_dispatch_source_timer_create
    • 如果创建的时候没有传处理handle的队列,会默认获取root_queues中的队列。
    • 设置目标队列为传进来的队列。
    • 如果typeDISPATCH_SOURCE_TYPE_INTERVAL(应该是私有的)则主动调用一次dispatch_source_set_timer
    • 返回自己创建的sourcesource本身也是队列。

    _dispatch_source_data_create

    static dispatch_unote_t
    _dispatch_source_data_create(dispatch_source_type_t dst, uintptr_t handle,
            uintptr_t mask)
    {
        if (handle || mask) {
            return DISPATCH_UNOTE_NULL;
        }
    
        // bypass _dispatch_unote_create() because this is always "direct"
        // even when EV_UDATA_SPECIFIC is 0
        dispatch_unote_class_t du = _dispatch_calloc(1u, dst->dst_size);
        du->du_type = dst;
        du->du_filter = dst->dst_filter;
        du->du_is_direct = true;
        return (dispatch_unote_t){ ._du = du };
    }
    

    直接调用_dispatch_calloc创建返回。

    _dispatch_source_timer_create

    static dispatch_unote_t
    _dispatch_source_timer_create(dispatch_source_type_t dst,
            uintptr_t handle, uintptr_t mask)
    {
        dispatch_timer_source_refs_t dt;
        ......
        //创建
        dt = _dispatch_calloc(1u, dst->dst_size);
        dt->du_type = dst;
        dt->du_filter = dst->dst_filter;
        dt->du_is_timer = true;
        dt->du_timer_flags |= (uint8_t)(mask | dst->dst_timer_flags);
        dt->du_ident = _dispatch_timer_unote_idx(dt);
        dt->dt_timer.target = UINT64_MAX;
        dt->dt_timer.deadline = UINT64_MAX;
        dt->dt_timer.interval = UINT64_MAX;
        dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
        dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
        return (dispatch_unote_t){ ._dt = dt };
    }
    

    内部时间给的默认值是最大值。

    4.2.2 dispatch_source_set_event_handler

    void
    dispatch_source_set_event_handler(dispatch_source_t ds,
            dispatch_block_t handler)
    {
        _dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
    }
    

    调用_dispatch_source_set_handler传递的类型为DS_EVENT_HANDLER

    DISPATCH_NOINLINE
    static void
    _dispatch_source_set_handler(dispatch_source_t ds, void *func,
            uintptr_t kind, bool is_block)
    {
        dispatch_continuation_t dc;
        //创建dc存储handler
        dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
        //挂起
        if (_dispatch_lane_try_inactive_suspend(ds)) {
            //替换
            _dispatch_source_handler_replace(ds, kind, dc);
            //恢复
            return _dispatch_lane_resume(ds, DISPATCH_RESUME);
        }
    ......
    }
    
    • 创建_dispatch_source_handler_alloc存储handler,内部会进行标记非DS_EVENT_HANDLER会标记为DC_FLAG_CONSUME
    • _dispatch_lane_try_inactive_suspend挂起队列。
    • _dispatch_source_handler_replace替换handler
    static inline void
    _dispatch_source_handler_replace(dispatch_source_t ds, uintptr_t kind,
            dispatch_continuation_t dc)
    {
        //handler目标回调为空释放handler
        if (!dc->dc_func) {
            _dispatch_continuation_free(dc);
            dc = NULL;
        } else if (dc->dc_flags & DC_FLAG_FETCH_CONTEXT) {
            dc->dc_ctxt = ds->do_ctxt;
        }
        //保存
        dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
        if (dc) _dispatch_source_handler_dispose(dc);
    }
    
    • _dispatch_lane_resume恢复队列,调用队列对应的awake
      image.png
      先调用_dispatch_lane_resume_activate(这也就是set后立马调用的原因):
    static void
    _dispatch_lane_resume_activate(dispatch_lane_t dq)
    {
        if (dx_vtable(dq)->dq_activate) {
            dx_vtable(dq)->dq_activate(dq);
        }
    
        _dispatch_lane_resume(dq, DISPATCH_ACTIVATION_DONE);
    }
    

    再调用_dispatch_lane_resume

    4.2.3 dispatch_source_merge_data

    void
    dispatch_source_merge_data(dispatch_source_t ds, uintptr_t val)
    {
        dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
        dispatch_source_refs_t dr = ds->ds_refs;
    
        if (unlikely(dqf & (DSF_CANCELED | DQF_RELEASED))) {
            return;
        }
        //根据类型存值
        switch (dr->du_filter) {
        case DISPATCH_EVFILT_CUSTOM_ADD:
            //有累加
            os_atomic_add2o(dr, ds_pending_data, val, relaxed);
            break;
        case DISPATCH_EVFILT_CUSTOM_OR:
            os_atomic_or2o(dr, ds_pending_data, val, relaxed);
            break;
        case DISPATCH_EVFILT_CUSTOM_REPLACE:
            os_atomic_store2o(dr, ds_pending_data, val, relaxed);
            break;
        default:
            DISPATCH_CLIENT_CRASH(dr->du_filter, "Invalid source type");
        }
        //唤醒执行回调
        dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
    }
    
    • 根据类型对值进行处理,处理完之后唤醒队列执行。

    对于主线程会执行_dispatch_main_queue_wakeup,其中会取到dispatch_queue获取到dc,最后进行handler的调用。

    4.2.4 dispatch_source_get_data

    uintptr_t
    dispatch_source_get_data(dispatch_source_t ds)
    {
        dispatch_source_refs_t dr = ds->ds_refs;
    #if DISPATCH_USE_MEMORYSTATUS
        if (dr->du_vmpressure_override) {
            return NOTE_VM_PRESSURE;
        }
    #if TARGET_OS_SIMULATOR
        if (dr->du_memorypressure_override) {
            return NOTE_MEMORYSTATUS_PRESSURE_WARN;
        }
    #endif
    #endif // DISPATCH_USE_MEMORYSTATUS
        //获取数据
        uint64_t value = os_atomic_load2o(dr, ds_data, relaxed);
        return (unsigned long)(dr->du_has_extended_status ?
                DISPATCH_SOURCE_GET_DATA(value) : value);
    }
    

    merge_data相反,一个存一个取。

    4.2.5 dispatch_resume

    void
    dispatch_resume(dispatch_object_t dou)
    {
        DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
        if (unlikely(_dispatch_object_is_global(dou) ||
                _dispatch_object_is_root_or_base_queue(dou))) {
            return;
        }
        if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
            _dispatch_lane_resume(dou._dl, DISPATCH_RESUME);
        }
    }
    

    经过调试走的是_dispatch_lane_resume逻辑,与_dispatch_source_set_handler中调用的一致。awake队列。

    4.2.6 dispatch_suspend

    void
    dispatch_suspend(dispatch_object_t dou)
    {
        DISPATCH_OBJECT_TFB(_dispatch_objc_suspend, dou);
        if (unlikely(_dispatch_object_is_global(dou) ||
                _dispatch_object_is_root_or_base_queue(dou))) {
            return;
        }
        if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
            return _dispatch_lane_suspend(dou._dl);
        }
    }
    

    调用_dispatch_lane_suspend挂起队列。

    4.2.7 dispatch_source_cancel

    dispatch_source_cancel(dispatch_source_t ds)
    {
        _dispatch_object_debug(ds, "%s", __func__);
    
        _dispatch_retain_2(ds);
    
        if (_dispatch_queue_atomic_flags_set_orig(ds, DSF_CANCELED) & DSF_CANCELED){
            _dispatch_release_2_tailcall(ds);
        } else {
            //_dispatch_workloop_wakeup
            dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
        }
    }
    
    • 调用_dispatch_workloop_wakeup
      image.png
    • cancel内部会对状态进行判断,如果是挂起状态会报错。所以需要在运行状态下取消。
    • 调用_dispatch_release_2_tailcall进行释放操作。

    4.2.8 dispatch_source_set_timer

    void
    dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
            uint64_t interval, uint64_t leeway)
    {
        dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
        dispatch_timer_config_t dtc;
    
        if (unlikely(!dt->du_is_timer)) {
            DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
        }
        //根据type配置timer和interval
        if (dt->du_timer_flags & DISPATCH_TIMER_INTERVAL) {
            dtc = _dispatch_interval_config_create(start, interval, leeway, dt);
        } else {
            dtc = _dispatch_timer_config_create(start, interval, leeway, dt);
        }
        if (_dispatch_timer_flags_to_clock(dt->du_timer_flags) != dtc->dtc_clock &&
                dt->du_filter == DISPATCH_EVFILT_TIMER_WITH_CLOCK) {
            DISPATCH_CLIENT_CRASH(0, "Attempting to modify timer clock");
        }
        //跟踪配置
        _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
        dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
        if (dtc) free(dtc);
        //唤醒
        dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
    }
    

    4.2.9 dispatch_source_set_registration_handler

    void
    dispatch_source_set_registration_handler(dispatch_source_t ds,
            dispatch_block_t handler)
    {
        _dispatch_source_set_handler(ds, handler, DS_REGISTN_HANDLER, true);
    }
    

    也是直接调用的_dispatch_source_set_handler,参数是DS_REGISTN_HANDLER

    4.2.10 dispatch_source_set_cancel_handler

    void
    dispatch_source_set_cancel_handler(dispatch_source_t ds,
            dispatch_block_t handler)
    {
        _dispatch_source_set_handler(ds, handler, DS_CANCEL_HANDLER, true);
    }
    
    • 直接调用的_dispatch_source_set_handler,参数是DS_CANCEL_HANDLER
    • 会根据DS_REGISTN_HANDLER、DS_CANCEL_HANDLER、DS_EVENT_HANDLER进行handler的获取和释放,因为这三者可能同时存在。

    那么就有个问题设置timer类型后我们没有主动调用dispatch_source_merge_data,那么它是在什么时机调用的呢?在回调中bt:

        frame #2: 0x000000010b6a29c8 libdispatch.dylib`_dispatch_client_callout + 8
        frame #3: 0x000000010b6a5316 libdispatch.dylib`_dispatch_continuation_pop + 557
        frame #4: 0x000000010b6b8e8b libdispatch.dylib`_dispatch_source_invoke + 2205
        frame #5: 0x000000010b6b4508 libdispatch.dylib`_dispatch_root_queue_drain + 351
        frame #6: 0x000000010b6b4e6d libdispatch.dylib`_dispatch_worker_thread2 + 135
        frame #7: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
        frame #8: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15
    

    搜索_dispatch_source_invoke只找到了:

    DISPATCH_VTABLE_INSTANCE(source,
        .do_type        = DISPATCH_SOURCE_KEVENT_TYPE,
        .do_dispose     = _dispatch_source_dispose,
        .do_debug       = _dispatch_source_debug,
        .do_invoke      = _dispatch_source_invoke,
    
        .dq_activate    = _dispatch_source_activate,
        .dq_wakeup      = _dispatch_source_wakeup,
        .dq_push        = _dispatch_lane_push,
    );
    

    也就是调用的sourcedo_invoke,调用逻辑为_dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> dx_invoke

    void
    _dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
            dispatch_invoke_flags_t flags)
    {
        _dispatch_queue_class_invoke(ds, dic, flags,
                DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
    
    #if DISPATCH_EVENT_BACKEND_KEVENT
        if (flags & DISPATCH_INVOKE_WORKLOOP_DRAIN) {
            dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh();
            dispatch_timer_heap_t dth = dwl->dwl_timer_heap;
            if (dth && dth[0].dth_dirty_bits) {
                //调用
                _dispatch_event_loop_drain_timers(dwl->dwl_timer_heap,
                        DISPATCH_TIMER_WLH_COUNT);
            }
        }
    #endif // DISPATCH_EVENT_BACKEND_KEVENT
    }
    

    4.3 Dispatch Source 封装 Timer

    目标是封装一个类似NSTimer的工具。

    void
    dispatch_source_set_timer(dispatch_source_t source,
        dispatch_time_t start,
        uint64_t interval,
        uint64_t leeway);
    
    • source:事件源。
    • start:控制计时器第一次触发的时刻。
      • 参数类型是 dispatch_time_topaque类型),不能直接操作它。需要 dispatch_timedispatch_walltime 函数来创建。
      • 常量 DISPATCH_TIME_NOWDISPATCH_TIME_FOREVER 很常用。
      • 当使用dispatch_time 或者 DISPATCH_TIME_NOW 时,系统会使用默认时钟来进行计时。然而当系统休眠的时候,默认时钟是不走的,也就会导致计时器停止。使用 dispatch_walltime 可以让计时器按照真实时间间隔进行计时。
    • interval:回调间隔时间。
    • leeway:计时器触发的精准程度,就算指定为0系统也无法保证完全精确的触发时间,只是会尽可能满足这个需求。

    首先实现一个最简单的封装:

    - (instancetype)initTimerWithTimeInterval:(NSTimeInterval)interval queue:(dispatch_queue_t)queue leeway:(NSTimeInterval)leeway repeats:(BOOL)repeats handler:(dispatch_block_t)handler {    
        if (self == [super init]) {
            self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
            dispatch_source_set_timer(self.timer, dispatch_walltime(NULL, 0), interval * NSEC_PER_SEC, leeway * NSEC_PER_SEC);
            //解决与handler互相持有
            __weak typeof(self) weakSelf = self;
    
            //事件回调,这个函数在执行完之后 block 会立马执行一遍。后面隔一定时间间隔再执行一次。
            dispatch_source_set_event_handler(self.timer, ^{
                if (handler) {
                    handler();
                }
                if (!repeats) {
                    //repeats 为 NO 执行一次后取消
                    [weakSelf cancel];
                }
            });
        }
        return self;
    }
    

    这样就满足了最基本的要求,由于handler的调用在设置和恢复后会立马调用,所以需要过滤需改handler实现如下:

    //忽略 handler 设置完马上回调
    if (weakSelf.isAutoFirstCallback) {
        @synchronized(weakSelf) {
            weakSelf.isAutoFirstCallback = NO;
        }
        return;
    }
    //忽略挂起恢复后的立马回调
    if (!weakSelf.resumeCallbackEnable && weakSelf.isResumeCallback) {
        @synchronized(weakSelf) {
            weakSelf.isResumeCallback = NO;
        }
        return;
    }
    
    if (handler) {
        handler();
    }
    
    if (!repeats) {
        //repeats 为 NO 执行一次后取消
        [weakSelf cancel];
    }
    

    为了更灵活对注册以及取消source逻辑也进行暴露:

    dispatch_source_set_registration_handler(self.timer, ^{
        if (weakSelf.startBlock) {
            weakSelf.startBlock();
        }
    });
    //取消回调
    dispatch_source_set_cancel_handler(self.timer, ^{
        if (weakSelf.cancelBlock) {
            weakSelf.cancelBlock();
        }
    });
    

    由于source本身提供了挂起和恢复的功能,同样对其封装。并且需要进行释放操作,所以提供cancel功能:

    - (void)start {
        //为了与isResumeCallback区分开
        @synchronized(self) {
            if (!self.isStarted && self.timerStatus == HPTimerSuspend) {
                self.isStarted = YES;
                self.timerStatus = HPTimerResume;
                dispatch_resume(self.timer);
            }
        }
    }
    
    - (void)suspend {
        //挂起,挂起的时候不能设置timer为nil
        @synchronized(self) {
            if (self.timerStatus == HPTimerResume) {
                self.timerStatus = HPTimerSuspend;
                dispatch_suspend(self.timer);
            }
        }
    }
    
    - (void)resume {
        //恢复
        @synchronized(self) {
            if (self.timerStatus == HPTimerSuspend) {
                self.isResumeCallback = YES;
                self.timerStatus = HPTimerResume;
                dispatch_resume(self.timer);
            }
        }
    }
    
    - (void)cancel {
        //取消
        @synchronized(self) {
            if (self.timerStatus != HPTimerCanceled) {
                //先恢复再取消
                if (self.timerStatus == HPTimerSuspend) {
                    [self resume];
                }
                self.timerStatus = HPTimerCanceled;
                dispatch_source_cancel(self.timer);
                _timer = nil;
            }
        }
    }
    
    - (void)dealloc {
        [self cancel];
    }
    
    • dealloc中主动进行cancel调用方可以不必在自己的dealloc中调用。

    这样再暴露一些简单接口就可以直接调用了(调用方需要持有timer):

    self.timer = [HPTimer scheduledTimerWithTimeInterval:3 handler:^{
        NSLog(@"timer 回调");
    }];
    

    完整代码详见 HPTimer

    具体的更多介绍和用法可以参考:
    iOS多线程——Dispatch Source
    timer 注意事项

    五、延迟函数(dispatch_after)

    void
    dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
            dispatch_block_t work)
    {
        _dispatch_after(when, queue, NULL, work, true);
    }
    

    直接调用_dispatch_after

    static inline void
    _dispatch_after(dispatch_time_t when, dispatch_queue_t dq,
            void *ctxt, void *handler, bool block)
    {
        dispatch_timer_source_refs_t dt;
        dispatch_source_t ds;
        uint64_t leeway, delta;
        //FOREVER 直接返回什么也不做
        if (when == DISPATCH_TIME_FOREVER) {
    #if DISPATCH_DEBUG
            DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
    #endif
            return;
        }
    
        delta = _dispatch_timeout(when);
        if (delta == 0) {
            if (block) {
                //时间为0直接执行handler
                return dispatch_async(dq, handler);
            }
            return dispatch_async_f(dq, ctxt, handler);
        }
        //精度 = 间隔 / 10
        leeway = delta / 10; // <rdar://problem/13447496>
        //<1 毫秒 的时候设置最小值为1毫秒
        if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
        //大于60s的时候设置为60s,也就是  1ms <= leeway <= 1min
        if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
    
        // this function can and should be optimized to not use a dispatch source
        //创建 type 为 after 的 source
        ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, dq);
        dt = ds->ds_timer_refs;
    
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        if (block) {
            //包装handler
            _dispatch_continuation_init(dc, dq, handler, 0, 0);
        } else {
            _dispatch_continuation_init_f(dc, dq, ctxt, handler, 0, 0);
        }
        // reference `ds` so that it doesn't show up as a leak
        dc->dc_data = ds;
        _dispatch_trace_item_push(dq, dc);
        //存储handler
        os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
        dispatch_clock_t clock;
        uint64_t target;
        _dispatch_time_to_clock_and_value(when, false, &clock, &target);
        if (clock != DISPATCH_CLOCK_WALL) {
            leeway = _dispatch_time_nano2mach(leeway);
        }
        dt->du_timer_flags |= _dispatch_timer_flags_from_clock(clock);
        dt->dt_timer.target = target;
        dt->dt_timer.interval = UINT64_MAX;
        dt->dt_timer.deadline = target + leeway;
        dispatch_activate(ds);
    }
    
    • 延时时间设置为DISPATCH_TIME_FOREVER直接返回什么也不做。
    • 延时时间为0直接调用dispatch_async执行handler
    • 精度:1ms <= leeway <= 1min要在这个范围,否则会修正。
    • 创建_dispatch_source_type_after类型的source
    • 包装存储handler
    • 调用_dispatch_time_to_clock_and_value进行target设置。

    本质上 dispatch_after 也是对 source的封装。

    时间单位

    #define NSEC_PER_SEC 1000000000ull      1秒 = 10亿纳秒              
    #define NSEC_PER_MSEC 1000000ull       1毫秒  =   100万纳秒 
    #define USEC_PER_SEC 1000000ull        1秒  = 100万微秒   
    #define NSEC_PER_USEC 1000ull           1微秒   = 1000 纳秒
    

    1s = 1000ms = 100万us = 10亿ns
    1ms = 1000us
    1us = 1000ns

    相关文章

      网友评论

        本文标题:GCD底层分析(三):栅栏、信号量、调度组以及source

        本文链接:https://www.haomeiwen.com/subject/yfthbltx.html