美文网首页
iOS底层探索 之 GCD原理

iOS底层探索 之 GCD原理

作者: MrHardy | 来源:发表于2021-08-10 16:51 被阅读0次

    今天我们来探索同步函数与异步函数区别,从以下几个点展开:

    • 能否开辟线程
    • 任务的回调是否具有异步性-同步性
    • 产生死锁现象

    同步函数

    dispatch_sync
    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    _dispatch_sync_f
    static void
    _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
            uintptr_t dc_flags)
    {
        _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    _dispatch_sync_f_inline
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
    //串行队列
        if (likely(dq->dq_width == 1)) {
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    
    _dispatch_barrier_sync_f
    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    _dispatch_barrier_sync_f_inline
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // The more correct thing to do would be to merge the qos of the thread
        // that just acquired the barrier lock into the queue state.
        //
        // However this is too expensive for the fast path, so skip doing it.
        // The chosen tradeoff is that if an enqueue on a lower priority thread
        // contends with this fast path, this thread may receive a useless override.
        //
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    _dispatch_sync_f_slow
    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
    
        pthread_priority_t pp = _dispatch_get_priority();
        struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    
        _dispatch_trace_item_push(top_dq, &dsc);
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    
    DISPATCH_WAIT_FOR_QUEUE
    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
    //死锁的代码
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    
        // Blocks submitted to the main thread MUST run on the main thread, and
        // dispatch_async_and_wait also executes on the remote context rather than
        // the current thread.
        //
        // For both these cases we need to save the frame linkage for the sake of
        // _dispatch_async_and_wait_invoke
        _dispatch_thread_frame_save_state(&dsc->dsc_dtf);
    
        if (_dq_state_is_suspended(dq_state) ||
                _dq_state_is_base_anon(dq_state)) {
            dsc->dc_data = DISPATCH_WLH_ANON;
        } else if (_dq_state_is_base_wlh(dq_state)) {
            dsc->dc_data = (dispatch_wlh_t)dq;
        } else {
            _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
        }
    
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
                    (uint8_t)_dispatch_get_basepri_override_qos_floor();
            _dispatch_thread_event_init(&dsc->dsc_event);
        }
        dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
        _dispatch_trace_runtime_event(sync_wait, dq, 0);
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
        } else if (!dsc->dsc_wlh_self_wakeup) {
            _dispatch_event_loop_wait_for_ownership(dsc);
        }
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_destroy(&dsc->dsc_event);
            // If _dispatch_sync_waiter_wake() gave this thread an override,
            // ensure that the root queue sees it.
            if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
                _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
            }
        }
    }
    
    DISPATCH_WAIT_FOR_QUEUE
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    }
    
    _dq_state_drain_locked_by
    static inline bool
    _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
    {
        return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
    }
    
    _dispatch_lock_is_locked_by
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
            //#define DLOCK_OWNER_MASK          ((dispatch_lock)0xfffffffc)
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    
    _dispatch_sync_invoke_and_complete
    static void
    _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    //#define DISPATCH_TRACE_ARG(arg) , arg
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
        _dispatch_lane_non_barrier_complete(dq, 0);
    }
    
    _dispatch_sync_function_invoke
    static void
    _dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    }
    
    _dispatch_sync_function_invoke_inline
    static inline void
    _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        dispatch_thread_frame_s dtf;
        _dispatch_thread_frame_push(&dtf, dq);
        _dispatch_client_callout(ctxt, func);//callout
        _dispatch_perfmon_workitem_inc();
        _dispatch_thread_frame_pop(&dtf);
    }
    

    异步函数

    dispatch_async
    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    _dispatch_continuation_async
    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
    //#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
        return dx_push(dqu._dq, dc, qos);
    }
    
    _dispatch_lane_concurrent_push
    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        // <rdar://problem/24738102&24743140> reserving non barrier width
        // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
        // width equivalent), so we have to check that this thread hasn't
        // enqueued anything ahead of this call or we can break ordering
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);//两者到最后皆为_dispatch_lane_push
    }
    
    _dispatch_lane_push
    void
    _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        dispatch_wakeup_flags_t flags = 0;
        struct dispatch_object_s *prev;
    
        if (unlikely(_dispatch_object_is_waiter(dou))) {
            return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
        }
    
        dispatch_assert(!_dispatch_object_is_global(dq));
        qos = _dispatch_queue_push_qos(dq, qos);
            // If we are going to call dx_wakeup(), the queue must be retained before
        // the item we're pushing can be dequeued, which means:
        // - before we exchange the tail if we have to override
        // - before we set the head if we made the queue non empty.
        // Otherwise, if preempted between one of these and the call to dx_wakeup()
        // the blocks submitted to the queue may release the last reference to the
        // queue when invoked by _dispatch_lane_drain. <rdar://problem/6932776>
    
        prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
        if (unlikely(os_mpsc_push_was_empty(prev))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
        } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
            // There's a race here, _dispatch_queue_need_override may read a stale
            // dq_state value.
            //
            // If it's a stale load from the same drain streak, given that
            // the max qos is monotonic, too old a read can only cause an
            // unnecessary attempt at overriding which is harmless.
            //
            // We'll assume here that a stale load from an a previous drain streak
            // never happens in practice.
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2;
        }
        os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
        if (flags) {
                    //#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
            return dx_wakeup(dq, qos, flags);
        }
    }
    
    _dispatch_lane_wakeup
    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    
    _dispatch_queue_wakeup
    void
    _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
    {
        dispatch_queue_t dq = dqu._dq;
        uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
        dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
    
        if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq);
            flags |= DISPATCH_WAKEUP_CONSUME_2;
        }
    }
    
    _dispatch_root_queue_push
    void
    _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
    #if DISPATCH_USE_KEVENT_WORKQUEUE
        dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
        if (unlikely(ddi && ddi->ddi_can_stash)) {
            dispatch_object_t old_dou = ddi->ddi_stashed_dou;
            dispatch_priority_t rq_overcommit;
            rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    
            if (likely(!old_dou._do || rq_overcommit)) {
                dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
                dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
                ddi->ddi_stashed_rq = rq;
                ddi->ddi_stashed_dou = dou;
                ddi->ddi_stashed_qos = qos;
                _dispatch_debug("deferring item %p, rq %p, qos %d",
                        dou._do, rq, qos);
                if (rq_overcommit) {
                    ddi->ddi_can_stash = false;
                }
                if (likely(!old_dou._do)) {
                    return;
                }
                // push the previously stashed item
                qos = old_qos;
                rq = old_rq;
                dou = old_dou;
            }
        }
    #endif
    #if HAVE_PTHREAD_WORKQUEUE_QOS
        if (_dispatch_root_queue_push_needs_override(rq, qos)) {
            return _dispatch_root_queue_push_override(rq, dou, qos);
        }
    #else
        (void)qos;
    #endif
        _dispatch_root_queue_push_inline(rq, dou, dou, 1);
    }
    
    _dispatch_root_queue_push_inline
    static inline void
    _dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
            dispatch_object_t _head, dispatch_object_t _tail, int n)
    {
        struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
        if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
            return _dispatch_root_queue_poke(dq, n, 0);
        }
    }
    
    _dispatch_root_queue_poke
    void
    _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
    {
        if (!_dispatch_queue_class_probe(dq)) {
            return;
        }
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
        if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
    #endif
        {
            if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
                _dispatch_root_queue_debug("worker thread request still pending "
                        "for global queue: %p", dq);
                return;
            }
        }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
        return _dispatch_root_queue_poke_slow(dq, n, floor);
    }
    

    未完待续......


    单例

    _dispatch_root_queues_init
    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL,
                _dispatch_root_queues_init_once);
    }
    
    dispatch_once
    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
        if (likely(v == DLOCK_ONCE_DONE)) {
            return;
        }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        if (likely(DISPATCH_ONCE_IS_GEN(v))) {
            return _dispatch_once_mark_done_if_quiesced(l, v);
        }
    #endif
    #endif
        if (_dispatch_once_gate_tryenter(l)) {
            return _dispatch_once_callout(l, ctxt, func);
        }
        return _dispatch_once_wait(l);
    }
    
    _dispatch_once_gate_tryenter
    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
        return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
                (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
    }
    
    _dispatch_once_callout
    static void
    _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_client_callout(ctxt, func);
        _dispatch_once_gate_broadcast(l);//发送广播
    }
    
    _dispatch_once_gate_broadcast
    static inline void
    _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
    {
        dispatch_lock value_self = _dispatch_lock_value_for_self();
        uintptr_t v;
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        v = _dispatch_once_mark_quiescing(l);//发送一次,标记停止
    #else
        v = _dispatch_once_mark_done(l);//一旦标记完成,立即发送
    #endif
        if (likely((dispatch_lock)v == value_self)) return;
        _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
    }
    
    _dispatch_once_mark_done
    static inline uintptr_t
    _dispatch_once_mark_done(dispatch_once_gate_t dgo)
    {
        return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
    }
    

    单例即让_dispatch_root_queues_init_once函数执行一次

    _dispatch_root_queues_init_once
    static void
    _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
    {
    }
    

    观察 _dispatch_root_queue_poke_slow 函数

    //  .dgq_thread_pool_size = 1
        t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
        do {
            can_request = t_count < floor ? 0 : t_count - floor;
            if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
            if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
        }
    

    栅栏函数

    dispatch_barrier_sync
    void
    dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    dispatch_barrier_sync
    void
    dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    _dispatch_barrier_sync_f
    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    _dispatch_barrier_sync_f_inline
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // The more correct thing to do would be to merge the qos of the thread
        // that just acquired the barrier lock into the queue state.
        //
        // However this is too expensive for the fast path, so skip doing it.
        // The chosen tradeoff is that if an enqueue on a lower priority thread
        // contends with this fast path, this thread may receive a useless override.
        //
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    _dispatch_sync_recurse
    static void
    _dispatch_sync_recurse(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
        dispatch_queue_t tq = dq->do_targetq;
    
        do {
            if (likely(tq->dq_width == 1)) {
                if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
                    return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq,
                            DC_FLAG_BARRIER);
                }
            } else {
                dispatch_queue_concurrent_t dl = upcast(tq)._dl;
                if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
                    return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq, 0);
                }
            }
            tq = tq->do_targetq;
        } while (unlikely(tq->do_targetq));
    
        _dispatch_introspection_sync_begin(dq);
        _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags)));
    }
    
    _dispatch_sync_f_slow
    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
    
        pthread_priority_t pp = _dispatch_get_priority();
        struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    
        _dispatch_trace_item_push(top_dq, &dsc);
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    
    _dispatch_sync_invoke_and_complete_recurse
    static void
    _dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
            void *ctxt, dispatch_function_t func, uintptr_t dc_flags
            DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
        _dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
    }
    
    
    ##### _dispatch_sync_complete_recurse
    ```objectivec
    static void
    _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
            uintptr_t dc_flags)
    {
        bool barrier = (dc_flags & DC_FLAG_BARRIER);
        do {
            if (dq == stop_dq) return;
            if (barrier) {
                dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
            } else {
                _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
            }
            dq = dq->do_targetq;
            barrier = (dq->dq_width == 1);
        } while (unlikely(dq->do_targetq));
    }
    
    _dispatch_root_queue_wakeup
    //全局并发队列
    void 
    _dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
            DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
    {
        if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
            DISPATCH_INTERNAL_CRASH(dq->dq_priority,
                    "Don't try to wake up or override a root queue");
        }
        if (flags & DISPATCH_WAKEUP_CONSUME_2) {
            return _dispatch_release_2_tailcall(dq);
        }
    }
    
    _dispatch_lane_wakeup
    void
    _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
                  //栅栏完成
            return _dispatch_lane_barrier_complete(dqu, qos, flags);
        }
        if (_dispatch_queue_class_probe(dqu)) {
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
        return _dispatch_queue_wakeup(dqu, qos, flags, target);
    }
    
    _dispatch_lane_barrier_complete
    static void
    _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags)
    {
        dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
        dispatch_lane_t dq = dqu._dl;
    
        if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
            struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
            if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
                if (_dispatch_object_is_waiter(dc)) {
                    return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
                }
            } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
                return _dispatch_lane_drain_non_barriers(dq, dc, flags);
            }
    
            if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
                _dispatch_retain_2(dq);
                flags |= DISPATCH_WAKEUP_CONSUME_2;
            }
            target = DISPATCH_QUEUE_WAKEUP_TARGET;
        }
    
        uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
                dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
        return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
    }
    
    _dispatch_lane_drain_non_barriers
    static void
    _dispatch_lane_drain_non_barriers(dispatch_lane_t dq,
            struct dispatch_object_s *dc, dispatch_wakeup_flags_t flags)
    {
        size_t owned_width = dq->dq_width;
        struct dispatch_object_s *next_dc;
    
        // see _dispatch_lane_drain, go in non barrier mode, and drain items
    
        os_atomic_and2o(dq, dq_state, ~DISPATCH_QUEUE_IN_BARRIER, release);
    
        do {
            if (likely(owned_width)) {
                owned_width--;
            } else if (_dispatch_object_is_waiter(dc)) {
                // sync "readers" don't observe the limit
                _dispatch_queue_reserve_sync_width(dq);
            } else if (!_dispatch_queue_try_acquire_async(dq)) {
                // no width left
                break;
            }
            next_dc = _dispatch_queue_pop_head(dq, dc);
            if (_dispatch_object_is_waiter(dc)) {
                _dispatch_non_barrier_waiter_redirect_or_wake(dq, dc);
            } else {
                _dispatch_continuation_redirect_push(dq, dc,
                        _dispatch_queue_max_qos(dq));
            }
    drain_again:
            dc = next_dc;
        } while (dc && !_dispatch_object_is_barrier(dc));
    
        uint64_t old_state, new_state, owner_self = _dispatch_lock_value_for_self();
        uint64_t owned = owned_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
    
        if (dc) {
            owned = _dispatch_queue_adjust_owned(dq, owned, dc);
        }
    
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
            new_state  = old_state - owned;
            new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
            new_state &= ~DISPATCH_QUEUE_DIRTY;
    
            // similar to _dispatch_lane_non_barrier_complete():
            // if by the time we get here all redirected non barrier syncs are
            // done and returned their width to the queue, we may be the last
            // chance for the next item to run/be re-driven.
            if (unlikely(dc)) {
                new_state |= DISPATCH_QUEUE_DIRTY;
                new_state = _dispatch_lane_non_barrier_complete_try_lock(dq,
                        old_state, new_state, owner_self);
            } else if (unlikely(_dq_state_is_dirty(old_state))) {
                os_atomic_rmw_loop_give_up({
                    os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
                    next_dc = os_atomic_load2o(dq, dq_items_head, relaxed);
                    goto drain_again;
                });
            }
        });
    
        old_state -= owned;
        _dispatch_lane_non_barrier_complete_finish(dq, flags, old_state, new_state);
    }
    

    信号量

    dispatch_semaphore_signal
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
        long value = os_atomic_inc2o(dsema, dsema_value, release);
        if (likely(value > 0)) {
            return 0;
        }
        if (unlikely(value == LONG_MIN)) {
            DISPATCH_CLIENT_CRASH(value,
                    "Unbalanced call to dispatch_semaphore_signal()");
        }
        return _dispatch_semaphore_signal_slow(dsema);
    }
    
    dispatch_semaphore_wait
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
        long value = os_atomic_dec2o(dsema, dsema_value, acquire);
        if (likely(value >= 0)) {
            return 0;
        }
        return _dispatch_semaphore_wait_slow(dsema, timeout);
    }
    
    _dispatch_semaphore_wait_slow
    static intptr_t
    _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
            dispatch_time_t timeout)
    {
        long orig;
    
        _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        switch (timeout) {
        default:
            if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
                break;
            }
            // Fall through and try to undo what the fast path did to
            // dsema->dsema_value
        case DISPATCH_TIME_NOW:
            orig = dsema->dsema_value;
            while (orig < 0) {
                if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                        &orig, relaxed)) {
                    return _DSEMA4_TIMEOUT();
                }
            }
            // Another thread called semaphore_signal().
            // Fall through and drain the wakeup.
        case DISPATCH_TIME_FOREVER:
            _dispatch_sema4_wait(&dsema->dsema_sema);
            break;
        }
        return 0;
    }
    
    _dispatch_sema4_wait
    void
    _dispatch_sema4_wait(_dispatch_sema4_t *sema)
    {
        int ret = 0;
        do {
            ret = sem_wait(sema);
        } while (ret == -1 && errno == EINTR);
        DISPATCH_SEMAPHORE_VERIFY_RET(ret);
    }
    

    调度组

    dispatch_group_create
    dispatch_group_create(void)
    {
        return _dispatch_group_create_with_count(0);
    }
    
    _dispatch_group_create_with_count
    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
        dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
                sizeof(struct dispatch_group_s));
        dg->do_next = DISPATCH_OBJECT_LISTLESS;
        dg->do_targetq = _dispatch_get_default_queue(false);
        if (n) {
            os_atomic_store2o(dg, dg_bits,
                    (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
            os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
        }
        return dg;
    }
    
    dispatch_group_enter
    void
    dispatch_group_enter(dispatch_group_t dg)
    {
        // The value is decremented on a 32bits wide atomic so that the carry
        // for the 0 -> -1 transition is not propagated to the upper 32bits.
        uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
                DISPATCH_GROUP_VALUE_INTERVAL, acquire);
        uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
        if (unlikely(old_value == 0)) {
            _dispatch_retain(dg); // <rdar://problem/22318411>
        }
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
            DISPATCH_CLIENT_CRASH(old_bits,
                    "Too many nested calls to dispatch_group_enter()");
        }
    }
    
    dispatch_group_leave
    void
    dispatch_group_leave(dispatch_group_t dg)
    {
        // The value is incremented on a 64bits wide atomic so that the carry for
        // the -1 -> 0 transition increments the generation atomically.
        uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
                DISPATCH_GROUP_VALUE_INTERVAL, release);
        uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
            old_state += DISPATCH_GROUP_VALUE_INTERVAL;
            do {
                new_state = old_state;
                if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                    new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                } else {
                    // If the group was entered again since the atomic_add above,
                    // we can't clear the waiters bit anymore as we don't know for
                    // which generation the waiters are for
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                }
                if (old_state == new_state) break;
            } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                    old_state, new_state, &old_state, relaxed)));
            return _dispatch_group_wake(dg, old_state, true);
        }
    
        if (unlikely(old_value == 0)) {
            DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                    "Unbalanced call to dispatch_group_leave()");
        }
    }
    
    _dispatch_group_notify
    static inline void
    _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dsn)
    {
        uint64_t old_state, new_state;
        dispatch_continuation_t prev;
    
        dsn->dc_data = dq;
        _dispatch_retain(dq);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
        os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) {
            os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
                new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
                if ((uint32_t)old_state == 0) {
                    os_atomic_rmw_loop_give_up({ //同步异步 block  callout
                        return _dispatch_group_wake(dg, new_state, false);
                    });
                }
            });
        }
    }
    
    dispatch_group_async
    void
    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
        _dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    
    _dispatch_continuation_group_async
    
    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc, dispatch_qos_t qos)
    {
        dispatch_group_enter(dg);
        dc->dc_data = dg;
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    

    相关文章

      网友评论

          本文标题:iOS底层探索 之 GCD原理

          本文链接:https://www.haomeiwen.com/subject/lqdxbltx.html