美文网首页
iOS-GCD原理分析(二)

iOS-GCD原理分析(二)

作者: 似水流年_9ebe | 来源:发表于2021-08-11 00:08 被阅读0次

    前言

    iOS-GCD原理分析(一)这篇文章我们分析了GCD的函数与队列,GCD的源码还未分析完,我们接着分析。

    同步函数死锁

    我们知道GCD中有同步和异步函数,那么它们有什么区别,我们来分析下。

    我们可以从能否开辟线程任务的回调是否具备异步性,同步性是否阻塞死锁,我们根据这些问题来分析下源码。

    我们在源码中搜索dispatch_sync(dis关键字,找到如下源码:

    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    

    _dispatch_sync_f_dispatch_Block_invoke对任务进行了封装,我们在搜索下_dispatch_sync_f(dis这个,如下:

    static void
    _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
            uintptr_t dc_flags)
    {
        _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    

    这里面调用了_dispatch_sync_f_inline函数,我们再看下它的代码,如下:

    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        if (likely(dq->dq_width == 1)) {
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    

    这里dq_width == 1 是串行队列,我们再看下_dispatch_barrier_sync_f这个函数的代码,如下:

    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    

    再看_dispatch_barrier_sync_f_inline的源码,如下:

    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // The more correct thing to do would be to merge the qos of the thread
        // that just acquired the barrier lock into the queue state.
        //
        // However this is too expensive for the fast path, so skip doing it.
        // The chosen tradeoff is that if an enqueue on a lower priority thread
        // contends with this fast path, this thread may receive a useless override.
        //
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    

    这里有调用_dispatch_sync_f_slow这个函数(死锁发生的时候,堆栈中可以看到有调用这个函数),如下图所示:

    1
    这里发生死锁的时候的堆栈。
    真正报错的是下图:
    2
    我们看下_dispatch_sync_f_slow的源码,如下:
    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
    
        pthread_priority_t pp = _dispatch_get_priority();
        struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    
        _dispatch_trace_item_push(top_dq, &dsc);
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    

    上面的图有说明DISPATCH_WAIT_FOR_QUEUE是在这个函数报的错,我们来看下源码:

    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    
        // Blocks submitted to the main thread MUST run on the main thread, and
        // dispatch_async_and_wait also executes on the remote context rather than
        // the current thread.
        //
        // For both these cases we need to save the frame linkage for the sake of
        // _dispatch_async_and_wait_invoke
        _dispatch_thread_frame_save_state(&dsc->dsc_dtf);
    
        if (_dq_state_is_suspended(dq_state) ||
                _dq_state_is_base_anon(dq_state)) {
            dsc->dc_data = DISPATCH_WLH_ANON;
        } else if (_dq_state_is_base_wlh(dq_state)) {
            dsc->dc_data = (dispatch_wlh_t)dq;
        } else {
            _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
        }
    
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
                    (uint8_t)_dispatch_get_basepri_override_qos_floor();
            _dispatch_thread_event_init(&dsc->dsc_event);
        }
        dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
        _dispatch_trace_runtime_event(sync_wait, dq, 0);
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
        } else if (!dsc->dsc_wlh_self_wakeup) {
            _dispatch_event_loop_wait_for_ownership(dsc);
        }
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_destroy(&dsc->dsc_event);
            // If _dispatch_sync_waiter_wake() gave this thread an override,
            // ensure that the root queue sees it.
            if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
                _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
            }
        }
    }
    

    其中

    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    

    这段代码就是报错的代码。这是为什么会发生死锁?
    我们来看下死锁的条析。
    "dispatch_sync called on queue ,already owned by current thread"这段英文解释的是同步函数调起的队列,已经被当前的线程所持有
    dsc->dsc_waiter中的dsc是由_dispatch_sync_f_slow传过来的,
    _dispatch_sync_f_slow有段这样的代码

    struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    

    dsc_waiter=_dispatch_tid_self(),_dispatch_tid_self*就是当前线程的线程id,如下:

    #define _dispatch_tid_self()        ((dispatch_tid)_dispatch_thread_port())
    

    DISPATCH_WAIT_FOR_QUEUE函数中的

        uint64_t dq_state = _dispatch_wait_prepare(dq);
    

    代码dq就是当前的队列,dq_state是当前队列的状态,

    static inline bool
    _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
    {
        return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
    }
    

    _dispatch_lock_is_locked_by这个函数的代码如下:

    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    

    检查队列的状态和线程id的匹配。
    队列的状态与线程id异或运算和DLOCK_OWNER_MASK进行与运算(#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc))
    **也就是说要等待的状态和线程id相同,当前在等待状态,又调起了dq(队列),又要去执行,执行的时候,又发现是在等待状态,产生了死锁。 **

    同步函数任务同步

    并发,为什么还要保持串联执行呢?我们分析下。
    其中这段代码

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    

    这段代码是如何执行的,我们利用断点调试,如下图所示:


    2

    我们看下_dispatch_sync_invoke_and_complete这个函数,它的源码如下:

    static void
    _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
        _dispatch_lane_non_barrier_complete(dq, 0);
    }
    

    DISPATCH_TRACE_ARG(void dc)这里的ARG就是arg,#define DISPATCH_TRACE_ARG(arg) , arg,这里是可选参数。
    经过分析之后,发现走到了
    _dispatch_sync_f_slow这里,接着调用了_dispatch_sync_function_invoke*这个函数,经过分析,来到了这里

    static inline void
    _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        dispatch_thread_frame_s dtf;
        _dispatch_thread_frame_push(&dtf, dq);
        _dispatch_client_callout(ctxt, func);
        _dispatch_perfmon_workitem_inc();
        _dispatch_thread_frame_pop(&dtf);
    }
    

    在这里调用_dispatch_client_callout执行回调,然后出来。
    同步函数:任务的执行和函数放在一块,中间处理下状态,这就是为什么同步函数可以立马执行。

    异步函数分析上

    同步函数分析完,我们再来分析下异步函数。
    对于异步函数,我们需要关注它的创建线程和任务回调的的异步性。
    我们在源码中搜下 dispatch_async(dis找到如下代码:

    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    

    我们再看下_dispatch_continuation_async这个函数的代码,如下 :

    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);
    }
    

    这里的qos就是包装的任务,dx_push这里会调用dq_push
    ,而dq_push会根据你传过来队列的不同导致不同

    DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
        .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
        .do_dispose     = _dispatch_lane_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_lane_invoke,
    
        .dq_activate    = _dispatch_lane_activate,
        .dq_wakeup      = _dispatch_lane_wakeup,
        .dq_push        = _dispatch_lane_concurrent_push,
    );
    

    这里是并发的。
    搜索_dispatch_lane_concurrent_push找到以下代码:

    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        // <rdar://problem/24738102&24743140> reserving non barrier width
        // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
        // width equivalent), so we have to check that this thread hasn't
        // enqueued anything ahead of this call or we can break ordering
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    

    _dispatch_lane_push这个函数,并发和串行的都会来到这里。

    if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    

    区别来于这里栅栏的处理。
    _dispatch_lane_push的源码如下:

    void
    _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        dispatch_wakeup_flags_t flags = 0;
        struct dispatch_object_s *prev;
    
        if (unlikely(_dispatch_object_is_waiter(dou))) {
            return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
        }
    
        dispatch_assert(!_dispatch_object_is_global(dq));
        qos = _dispatch_queue_push_qos(dq, qos);
    
        // If we are going to call dx_wakeup(), the queue must be retained before
        // the item we're pushing can be dequeued, which means:
        // - before we exchange the tail if we have to override
        // - before we set the head if we made the queue non empty.
        // Otherwise, if preempted between one of these and the call to dx_wakeup()
        // the blocks submitted to the queue may release the last reference to the
        // queue when invoked by _dispatch_lane_drain. <rdar://problem/6932776>
    
        prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
        if (unlikely(os_mpsc_push_was_empty(prev))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
        } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
            // There's a race here, _dispatch_queue_need_override may read a stale
            // dq_state value.
            //
            // If it's a stale load from the same drain streak, given that
            // the max qos is monotonic, too old a read can only cause an
            // unnecessary attempt at overriding which is harmless.
            //
            // We'll assume here that a stale load from an a previous drain streak
            // never happens in practice.
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2;
        }
        os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
        if (flags) {
            return dx_wakeup(dq, qos, flags);
        }
    }
    

    我们通过符号断点_dispatch_lane_push_dispatch_lane_push_waiterdx_wakeupdq_wakeup通过查找是_dispatch_lane_wakeup这个函数,我们对它进行符号断点,发现进入了_dispatch_lane_wakeup这个函数。
    我们看下这个函数的代码:

    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    
    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
    

    如果添加了barrier函数会走这个流程。
    之后会调用_dispatch_queue_wakeup这个函数

    void
    _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
    {
        dispatch_queue_t dq = dqu._dq;
        uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
        dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
    
        if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq);
            flags |= DISPATCH_WAKEUP_CONSUME_2;
        }
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            //
            // _dispatch_lane_class_barrier_complete() is about what both regular
            // queues and sources needs to evaluate, but the former can have sync
            // handoffs to perform which _dispatch_lane_class_barrier_complete()
            // doesn't handle, only _dispatch_lane_barrier_complete() does.
            //
            // _dispatch_lane_wakeup() is the one for plain queues that calls
            // _dispatch_lane_barrier_complete(), and this is only taken for non
            // queue types.
            //
            dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
            qos = _dispatch_queue_wakeup_qos(dq, qos);
            return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
                    flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
        }
    
        if (target) {
            if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
                enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
            }
            qos = _dispatch_queue_wakeup_qos(dq, qos);
            os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
                new_state = _dq_state_merge_qos(old_state, qos);
                if (flags & DISPATCH_WAKEUP_CLEAR_ACTIVATING) {
                    // When an event is being delivered to a source because its
                    // unote was being registered before the ACTIVATING state
                    // had a chance to be cleared, we don't want to fail the wakeup
                    // which could lead to a priority inversion.
                    //
                    // Instead, these wakeups are allowed to finish the pending
                    // activation.
                    if (_dq_state_is_activating(old_state)) {
                        new_state &= ~DISPATCH_QUEUE_ACTIVATING;
                    }
                }
                if (likely(!_dq_state_is_suspended(new_state) &&
                        !_dq_state_is_enqueued(old_state) &&
                        (!_dq_state_drain_locked(old_state) ||
                        enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR))) {
                    // Always set the enqueued bit for async enqueues on all queues
                    // in the hierachy
                    new_state |= enqueue;
                }
                if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
                    new_state |= DISPATCH_QUEUE_DIRTY;
                } else if (new_state == old_state) {
                    os_atomic_rmw_loop_give_up(goto done);
                }
            });
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        } else if (qos) {
            //
            // Someone is trying to override the last work item of the queue.
            //
            os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
                // Avoid spurious override if the item was drained before we could
                // apply an override
                if (!_dq_state_drain_locked(old_state) &&
                    !_dq_state_is_enqueued(old_state)) {
                    os_atomic_rmw_loop_give_up(goto done);
                }
                new_state = _dq_state_merge_qos(old_state, qos);
                if (_dq_state_is_base_wlh(old_state) &&
                        !_dq_state_is_suspended(old_state) &&
                        /* <rdar://problem/63179930> */
                        !_dq_state_is_enqueued_on_manager(old_state)) {
    
                    // Always set the enqueued bit for async enqueues on all queues
                    // in the hierachy (rdar://62447289)
                    //
                    // Scenario:
                    // - mach channel DM
                    // - targetting TQ
                    //
                    // Thread 1:
                    // - has the lock on (TQ), uncontended sync
                    // - causes a wakeup at a low QoS on DM, causing it to have:
                    //   max_qos = UT, enqueued = 1
                    // - the enqueue of DM onto TQ hasn't happened yet.
                    //
                    // Thread 2:
                    // - an incoming IN IPC is being merged on the servicer
                    // - DM having qos=UT, enqueud=1, no further enqueue happens,
                    //   but we need an extra override and go through this code for
                    //   TQ.
                    // - this causes TQ to be "stashed", which requires the enqueued
                    //   bit set, else try_lock_wlh() will complain and the
                    //   wakeup refcounting will be off.
                    new_state |= enqueue;
                }
    
                if (new_state == old_state) {
                    os_atomic_rmw_loop_give_up(goto done);
                }
            });
    
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
    #endif // HAVE_PTHREAD_WORKQUEUE_QOS
        } else {
            goto done;
        }
    
        if (likely((old_state ^ new_state) & enqueue)) {
            dispatch_queue_t tq;
            if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
                // the rmw_loop above has no acquire barrier, as the last block
                // of a queue asyncing to that queue is not an uncommon pattern
                // and in that case the acquire would be completely useless
                //
                // so instead use depdendency ordering to read
                // the targetq pointer.
                os_atomic_thread_fence(dependency);
                tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
                        (long)new_state);
            } else {
                tq = target;
            }
            dispatch_assert(_dq_state_is_enqueued(new_state));
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
            if (_dq_state_should_override(new_state)) {
                return _dispatch_queue_wakeup_with_override(dq, new_state,
                        flags);
            }
        }
    #endif // HAVE_PTHREAD_WORKQUEUE_QOS
    done:
        if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            return _dispatch_release_2_tailcall(dq);
        }
    }
    

    我们再通过符号断点,发现,有时会不断的调用_dispatch_lane_push,_dispatch_lane_wakeup,这是后台会不断调用任务导致。
    当我们走到_dispatch_lane_class_barrier_complete,我们搜下_dispatch_lane_class_barrier_complete这个函数,代码如下:

    static void
    _dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
            uint64_t owned)
    {
        uint64_t old_state, new_state, enqueue;
        dispatch_queue_t tq;
    
        if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
            tq = _dispatch_mgr_q._as_dq;
            enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
        } else if (target) {
            tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
            enqueue = DISPATCH_QUEUE_ENQUEUED;
        } else {
            tq = NULL;
            enqueue = 0;
        }
    
    again:
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            if (unlikely(_dq_state_needs_ensure_ownership(old_state))) {
                _dispatch_event_loop_ensure_ownership((dispatch_wlh_t)dq);
                _dispatch_queue_move_to_contended_sync(dq->_as_dq);
                os_atomic_rmw_loop_give_up(goto again);
            }
            new_state  = _dq_state_merge_qos(old_state - owned, qos);
            new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
            if (unlikely(_dq_state_is_suspended(old_state))) {
                if (likely(_dq_state_is_base_wlh(old_state))) {
                    new_state &= ~DISPATCH_QUEUE_ENQUEUED;
                }
            } else if (enqueue) {
                if (!_dq_state_is_enqueued(old_state)) {
                    new_state |= enqueue;
                }
            } else if (unlikely(_dq_state_is_dirty(old_state))) {
                os_atomic_rmw_loop_give_up({
                    // just renew the drain lock with an acquire barrier, to see
                    // what the enqueuer that set DIRTY has done.
                    // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
                    // is already in a register
                    os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
                    flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
                    return dx_wakeup(dq, qos, flags);
                });
            } else {
                new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
            }
        });
        old_state -= owned;
        dispatch_assert(_dq_state_drain_locked_by_self(old_state));
        dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state));
    
        if (_dq_state_is_enqueued(new_state)) {
            _dispatch_trace_runtime_event(sync_async_handoff, dq, 0);
        }
    
    #if DISPATCH_USE_KEVENT_WORKLOOP
        if (_dq_state_is_base_wlh(old_state)) {
            // - Only non-"du_is_direct" sources & mach channels can be enqueued
            //   on the manager.
            //
            // - Only dispatch_source_cancel_and_wait() and
            //   dispatch_source_set_*_handler() use the barrier complete codepath,
            //   none of which are used by mach channels.
            //
            // Hence no source-ish object can both be a workloop and need to use the
            // manager at the same time.
            dispatch_assert(!_dq_state_is_enqueued_on_manager(new_state));
            if (_dq_state_is_enqueued_on_target(old_state) ||
                    _dq_state_is_enqueued_on_target(new_state) ||
                    !_dq_state_in_uncontended_sync(old_state)) {
                return _dispatch_event_loop_end_ownership((dispatch_wlh_t)dq,
                        old_state, new_state, flags);
            }
            _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
            if (flags & DISPATCH_WAKEUP_CONSUME_2) {
                return _dispatch_release_2_tailcall(dq);
            }
            return;
        }
    #endif
    
        if (_dq_state_received_override(old_state)) {
            // Ensure that the root queue sees that this thread was overridden.
            _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
        }
    
        if (tq) {
            if (likely((old_state ^ new_state) & enqueue)) {
                dispatch_assert(_dq_state_is_enqueued(new_state));
                dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
                return _dispatch_queue_push_queue(tq, dq, new_state);
            }
    #if HAVE_PTHREAD_WORKQUEUE_QOS
            // <rdar://problem/27694093> when doing sync to async handoff
            // if the queue received an override we have to forecefully redrive
            // the same override so that a new stealer is enqueued because
            // the previous one may be gone already
            if (_dq_state_should_override(new_state)) {
                return _dispatch_queue_wakeup_with_override(dq, new_state, flags);
            }
    #endif
        }
        if (flags & DISPATCH_WAKEUP_CONSUME_2) {
            return _dispatch_release_2_tailcall(dq);
        }
    }
    

    os_atomic_rmw_loop2o这里不断的递归。
    这里最终会调用_dispatch_root_queue_push这个函数,代码如下:

    void
    _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
    #if DISPATCH_USE_KEVENT_WORKQUEUE
        dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
        if (unlikely(ddi && ddi->ddi_can_stash)) {
            dispatch_object_t old_dou = ddi->ddi_stashed_dou;
            dispatch_priority_t rq_overcommit;
            rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    
            if (likely(!old_dou._do || rq_overcommit)) {
                dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
                dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
                ddi->ddi_stashed_rq = rq;
                ddi->ddi_stashed_dou = dou;
                ddi->ddi_stashed_qos = qos;
                _dispatch_debug("deferring item %p, rq %p, qos %d",
                        dou._do, rq, qos);
                if (rq_overcommit) {
                    ddi->ddi_can_stash = false;
                }
                if (likely(!old_dou._do)) {
                    return;
                }
                // push the previously stashed item
                qos = old_qos;
                rq = old_rq;
                dou = old_dou;
            }
        }
    #endif
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        if (_dispatch_root_queue_push_needs_override(rq, qos)) {
            return _dispatch_root_queue_push_override(rq, dou, qos);
        }
    #else
        (void)qos;
    #endif
        _dispatch_root_queue_push_inline(rq, dou, dou, 1);
    }
    

    这里调用了_dispatch_root_queue_push_inline这个函数,代码如下:

    static inline void
    _dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
            dispatch_object_t _head, dispatch_object_t _tail, int n)
    {
        struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
        if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
            return _dispatch_root_queue_poke(dq, n, 0);
        }
    }
    

    这里又调用了_dispatch_root_queue_poke这个函数,代码如下:

    void
    _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
    {
        if (!_dispatch_queue_class_probe(dq)) {
            return;
        }
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
        if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
    #endif
        {
            if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
                _dispatch_root_queue_debug("worker thread request still pending "
                        "for global queue: %p", dq);
                return;
            }
        }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
        return _dispatch_root_queue_poke_slow(dq, n, floor);
    }
    

    这里调用了_dispatch_root_queue_poke_slow,它的代码如下:

    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
        int remaining = n;
    #if !defined(_WIN32)
        int r = ENOSYS;
    #endif
    
        _dispatch_root_queues_init();
        _dispatch_debug_root_queue(dq, __func__);
        _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    #endif
        {
            _dispatch_root_queue_debug("requesting new worker thread for global "
                    "queue: %p", dq);
            r = _pthread_workqueue_addthreads(remaining,
                    _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
            (void)dispatch_assume_zero(r);
            return;
        }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
        dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
        if (likely(pqc->dpq_thread_mediator.do_vtable)) {
            while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
                _dispatch_root_queue_debug("signaled sleeping worker for "
                        "global queue: %p", dq);
                if (!--remaining) {
                    return;
                }
            }
        }
    
        bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
        if (overcommit) {
            os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
        } else {
            if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
                _dispatch_root_queue_debug("worker thread request still pending for "
                        "global queue: %p", dq);
                return;
            }
        }
    
        int can_request, t_count;
        // seq_cst with atomic store to tail <rdar://problem/16932833>
        t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
        do {
            can_request = t_count < floor ? 0 : t_count - floor;
            if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
            if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
        } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
                t_count - remaining, &t_count, acquire));
    
    #if !defined(_WIN32)
        pthread_attr_t *attr = &pqc->dpq_thread_attr;
        pthread_t tid, *pthr = &tid;
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (unlikely(dq == &_dispatch_mgr_root_queue)) {
            pthr = _dispatch_mgr_root_queue_init();
        }
    #endif
        do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
            while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
                if (r != EAGAIN) {
                    (void)dispatch_assume_zero(r);
                }
                _dispatch_temporary_resource_shortage();
            }
        } while (--remaining);
    #else // defined(_WIN32)
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (unlikely(dq == &_dispatch_mgr_root_queue)) {
            _dispatch_mgr_root_queue_init();
        }
    #endif
        do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
    #if DISPATCH_DEBUG
            unsigned dwStackSize = 0;
    #else
            unsigned dwStackSize = 64 * 1024;
    #endif
            uintptr_t hThread = 0;
            while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
                if (errno != EAGAIN) {
                    (void)dispatch_assume(hThread);
                }
                _dispatch_temporary_resource_shortage();
            }
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
            if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
                (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
            }
    #endif
            CloseHandle((HANDLE)hThread);
        } while (--remaining);
    #endif // defined(_WIN32)
    #else
        (void)floor;
    #endif // DISPATCH_USE_PTHREAD_POOL
    }
    

    _dispatch_root_queues_init这里的函数比较重要,分析它之前,我们先要讲下GCD单例的原理。

    单例底层原理

    我们看下_dispatch_root_queues_init这个函数的代码,如下所示:

    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL,
                _dispatch_root_queues_init_once);
    }
    

    这里其实是单例,我们来分析下。
    单例的代码:

     static dispatch_once_t onceToken;
        dispatch_once(&onceToken, ^{
            
        })
    

    我们看下dispatch_once的源码,如下:

    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    

    这里调用的dispatch_once_f这个函数,它的源码如下:

    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
        if (likely(v == DLOCK_ONCE_DONE)) {
            return;
        }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        if (likely(DISPATCH_ONCE_IS_GEN(v))) {
            return _dispatch_once_mark_done_if_quiesced(l, v);
        }
    #endif
    #endif
        if (_dispatch_once_gate_tryenter(l)) {
            return _dispatch_once_callout(l, ctxt, func);
        }
        return _dispatch_once_wait(l);
    }
    

    这里的val就是onceToken,就是全局静态变量。

    dispatch_once_gate_t,强制转的,关或开。

    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
        if (likely(v == DLOCK_ONCE_DONE)) {
            return;
        }
    

    os_atomic_load判断如果做了一次,就不再进来 ,return;
    如果没有就进入 了

    if (_dispatch_once_gate_tryenter(l)) {
            return _dispatch_once_callout(l, ctxt, func);
        }
    

    这个流程,这里判断是否进去_dispatch_once_gate_tryenter
    _dispatch_once_callout又调用了_dispatch_client_callout执地回调。

    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
        return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
                (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
    }
    

    这里又调用了os_atomic_cmpxchg这里会进行锁的处理,这里是由线程锁控制。
    _dispatch_lock_value_for_self它的代码如下:

    static inline dispatch_lock
    _dispatch_lock_value_for_self(void)
    {
        return _dispatch_lock_value_from_tid(_dispatch_tid_self());
    }
    

    所以GCD的单列是多线程安全的
    _dispatch_once_callout又调起了_dispatch_once_gate_broadcast,这个函数的代码如下:

    static inline void
    _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
    {
        dispatch_lock value_self = _dispatch_lock_value_for_self();
        uintptr_t v;
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        v = _dispatch_once_mark_quiescing(l);
    #else
        v = _dispatch_once_mark_done(l);
    #endif
        if (likely((dispatch_lock)v == value_self)) return;
        _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
    }
    
    

    这里又调了_dispatch_once_mark_done这个函数,代码如下:

    static inline uintptr_t
    _dispatch_once_mark_done(dispatch_once_gate_t dgo)
    {
        return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
    }
    

    这里会值DLOCK_ONCE_DONE进去,下次就不会再进来了。
    如果dispatch_once_f里面,如果没有标记DLOCK_ONCE_DONE,_dispatch_once_gate_tryenter也不能进去,那么就会调用_dispatch_once_wait无限制等待开锁。
    通过单列底层的分析,* _dispatch_root_queues_init*会让初始化只进来一次。

    异步函数分析下

    _dispatch_root_queues_init这个函数调用了_dispatch_root_queues_init_once,我们看下它的源码

    static void
    _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
    {
        _dispatch_fork_becomes_unsafe();
    #if DISPATCH_USE_INTERNAL_WORKQUEUE
        size_t i;
        for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
            _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
                    _dispatch_root_queues[i].dq_priority);
        }
    #else
        int wq_supported = _pthread_workqueue_supported();
        int r = ENOTSUP;
    
        if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
            DISPATCH_INTERNAL_CRASH(wq_supported,
                    "QoS Maintenance support required");
        }
    
    #if DISPATCH_USE_KEVENT_SETUP
        struct pthread_workqueue_config cfg = {
            .version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
            .flags = 0,
            .workq_cb = 0,
            .kevent_cb = 0,
            .workloop_cb = 0,
            .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
    #if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
            .queue_label_offs = dispatch_queue_offsets.dqo_label,
    #endif
        };
    #endif
    
    #pragma clang diagnostic push
    #pragma clang diagnostic ignored "-Wunreachable-code"
        if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
    #if DISPATCH_USE_KEVENT_SETUP
            cfg.workq_cb = _dispatch_worker_thread2;
            r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    #else
            r = _pthread_workqueue_init(_dispatch_worker_thread2,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
    #endif // DISPATCH_USE_KEVENT_SETUP
    #if DISPATCH_USE_KEVENT_WORKLOOP
        } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
    #if DISPATCH_USE_KEVENT_SETUP
            cfg.workq_cb = _dispatch_worker_thread2;
            cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
            cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
            r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    #else
            r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
                    (pthread_workqueue_function_kevent_t)
                    _dispatch_kevent_worker_thread,
                    (pthread_workqueue_function_workloop_t)
                    _dispatch_workloop_worker_thread,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
    #endif // DISPATCH_USE_KEVENT_SETUP
    #endif // DISPATCH_USE_KEVENT_WORKLOOP
    #if DISPATCH_USE_KEVENT_WORKQUEUE
        } else if (wq_supported & WORKQ_FEATURE_KEVENT) {
    #if DISPATCH_USE_KEVENT_SETUP
            cfg.workq_cb = _dispatch_worker_thread2;
            cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
            r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    #else
            r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
                    (pthread_workqueue_function_kevent_t)
                    _dispatch_kevent_worker_thread,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
    #endif // DISPATCH_USE_KEVENT_SETUP
    #endif
        } else {
            DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
        }
    #pragma clang diagnostic pop
    
        if (r != 0) {
            DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
                    "Root queue initialization failed");
        }
    #endif // DISPATCH_USE_INTERNAL_WORKQUEUE
    }
    

    接着我们再看下dispatch_async的调用堆栈,如图所示:

    3
    frame1到frame5就是任务的执行,那么它到底是在哪调用的,我们来看下。
    这个任务是包装在_dispatch_worker_thread2这里,其实包装在pthread中API中,GCD是封装了pthread。
    这里有_pthread_workqueue_init_with_workloop工作循环调起, 不是立马调用的,受我们当前的OS管控。
    异步线程的回调是异步的
    我们再来看下_dispatch_root_queue_poke_slow这个函数,
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    #endif
        {
            _dispatch_root_queue_debug("requesting new worker thread for global "
                    "queue: %p", dq);
            r = _pthread_workqueue_addthreads(remaining,
                    _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
            (void)dispatch_assume_zero(r);
            return;
        }
    

    这里如果是全局并发类型,这里会_pthread_workqueue_addthreads调用它创建线程,执行。

    如果是普通的,

    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
        do {
            can_request = t_count < floor ? 0 : t_count - floor;
            if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
            if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
        } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
                t_count - remaining, &t_count, acquire));
    

    这里是do while循环。
    while的条件os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
    t_count - remaining, &t_count, acquire)

    dgq_thread_pool_size 标记为1。
    如下:

    const struct dispatch_queue_global_s _dispatch_custom_workloop_root_queue = {
        DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
        .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
        .do_ctxt = NULL,
        .dq_label = "com.apple.root.workloop-custom",
        .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
        .dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT) |
                DISPATCH_PRIORITY_SATURATED_OVERRIDE,
        .dq_serialnum = DISPATCH_QUEUE_SERIAL_NUMBER_WLF,
        .dgq_thread_pool_size = 1,
    };
    

    全局并发队列要比并发队列大1

    #define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1) // 全局并发队列
    #define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2) // 并发队列
    

    全局并发队列的pool_size是从1开始的。
    dgq_thread_pool_size会不断的赋值,++操作。
    _dispatch_root_queue_poke_slow

    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
        do {
            can_request = t_count < floor ? 0 : t_count - floor;
            if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
            if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
        } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
                t_count - remaining, &t_count, acquire));
    

    这段代码中,remaining = can_request;空余的数量=我能够请求的数量,can_request来之于can_request = t_count < floor ? 0 : t_count - floor;这里,如果floor传过来的,t_count是加载过来的。
    remaing来之于参数n,经过分析传的是1,异步线程进来,创建一个线程,所以是1 。
    所以这里

    if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
    

    不大于能够创建的,如果大于就说明有异常了,如果

    if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
    

    也会报经常,说明线程池队列满了,就返回,不执行。
    那么最多可以开多少线程呢,我们分析下。

    do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
    #if DISPATCH_DEBUG
            unsigned dwStackSize = 0;
    #else
            unsigned dwStackSize = 64 * 1024;
    #endif
            uintptr_t hThread = 0;
            while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
                if (errno != EAGAIN) {
                    (void)dispatch_assume(hThread);
                }
                _dispatch_temporary_resource_shortage();
            }
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
            if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
                (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
            }
    #endif
            CloseHandle((HANDLE)hThread);
        } while (--remaining);
    

    这段代码中,我们来看下,

    wStackSize = 64 * 1024这是目前看到的栈的大小,是不是这样, 我们还需要验证下。
    os_atomic_cmpxchgv2o判断是否已经满了,我们看下dgq_thread_pool_size大小

    #ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
    #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
    #endif
    

    这里定义的255,线程池最大数,但是不代表能开辟这么多。
    我们看下官方文档的说明,如图:

    3
    这里写的是512k,至少是16k,如果栈越大,开辟线程的内存占用就越大,那么开辟的线程数量就越小。
    如果4gb内存,系统内核态1gb
    如果是16k,1024*1024/16这么多,线程默认的大小一般是512k,可以开到2048,但是显然开不了这么多。

    总结

    这篇文章主要介绍了同步函数死锁,同步函数任务同步,异步函数,单列底层原理分析,后续还会再介绍GCD的其它相关知识,文章中有错误地方,请大家指正。

    相关文章

      网友评论

          本文标题:iOS-GCD原理分析(二)

          本文链接:https://www.haomeiwen.com/subject/seqxbltx.html