美文网首页
GCD分析(中)

GCD分析(中)

作者: 浅墨入画 | 来源:发表于2021-09-12 22:18 被阅读0次

    同步函数死锁

    死锁现象
    • 主线程因为同步函数的原因等着先执⾏任务
    • 主队列等着主线程的任务执⾏完毕再执⾏⾃⼰的任务
    • 主队列主线程相互等待会造成死锁
    同步函数和异步函数的区别
    • 能否开辟线程
    • 任务的回调是否具备异步性或同步性
    同步串行死锁底层源码分析
    • 全局搜索dispatch_sync(dis
    DISPATCH_NOINLINE
    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    #endif // __BLOCKS__
    
    • 查看_dispatch_sync_f方法
    • 查看_dispatch_sync_f_inline方法
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        if (likely(dq->dq_width == 1)) {
            // 查看死锁方法
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    
    • 查看_dispatch_barrier_sync_f方法
    • 查看_dispatch_barrier_sync_f_inline方法
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            //查看_dispatch_sync_f_slow方法,死锁的时候会报这个异常
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    死锁时报错

    死锁真正报错的方法是__DISPATCH_WAIT_FOR_QUEUE__

    image.png
    • 查看_dispatch_sync_f_slow方法
    • 查看__DISPATCH_WAIT_FOR_QUEUE__
    DISPATCH_NOINLINE
    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
        // 查看死锁条件
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            // 死锁产生在这里
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    
        // Blocks submitted to the main thread MUST run on the main thread, and
        // dispatch_async_and_wait also executes on the remote context rather than
        // the current thread.
        //
        // For both these cases we need to save the frame linkage for the sake of
        // _dispatch_async_and_wait_invoke
        _dispatch_thread_frame_save_state(&dsc->dsc_dtf);
    
        if (_dq_state_is_suspended(dq_state) ||
                _dq_state_is_base_anon(dq_state)) {
            dsc->dc_data = DISPATCH_WLH_ANON;
        } else if (_dq_state_is_base_wlh(dq_state)) {
            dsc->dc_data = (dispatch_wlh_t)dq;
        } else {
            _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
        }
    ...... //省略代码
    

    里面的日志和我们崩溃的死锁最后执行__DISPATCH_WAIT_FOR_QUEUE__的截图日志是一样的,意味着死锁就在这里,你调用的队列被当前线程持有

    • 产生死锁的dsc_waiter是从外面传进来的,返回去查看

      image.png
    • 查看_dispatch_tid_self()tid

    #define _dispatch_tid_self()        ((dispatch_tid)_dispatch_thread_port())
    
    • 由死锁判断条件if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))),去查看dq_state状态,调用的是_dq_state_drain_locked_by方法
    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
    {
        return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
    }
    
    • 查看_dispatch_lock_is_locked_by方法
    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    

    lock_value = tid等于0,lock_value ^ tid = 0 & DLOCK_OWNER_MASK才等于0
    dq_statedsc->dsc_waiter代表这两个值相同发生死锁

    同步函数任务同步

    同步函数全局并发队列底层源码分析

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        if (likely(dq->dq_width == 1)) {
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    
    • _dispatch_sync_invoke_and_complete方法传参以func开头,为什么这么写?我们进去探索
    DISPATCH_NOINLINE
    static void
    _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
        _dispatch_lane_non_barrier_complete(dq, 0);
    }
    
    • 查看DISPATCH_TRACE_ARG方法
    // 把逗号封装到这里,意味着arg是可选参数,根据条件来控制
    #define DISPATCH_TRACE_ARG(arg) , arg
    
    • 同步函数全局并发队列会执行到_dispatch_sync_f_inline方法中的哪个方法?我们不得而知,这里就添加符号断点来探索
      image.png
    image.png
    • 下面探索执行到_dispatch_sync_f_slow方法中的哪个方法?依然是添加符号断点来探索

      image.png
    • 跳过断点继续执行,发现刚才添加的两个符号断点并没有执行到

    image.png
    • 那么应该就是执行到_dispatch_sync_f_slow方法中的_dispatch_sync_function_invoke,我们重新添加符号断点发现确实执行到这里。
    DISPATCH_NOINLINE
    static void
    _dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    }
    
    • 查看_dispatch_sync_function_invoke_inline方法,执行的是_dispatch_client_callout
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        dispatch_thread_frame_s dtf;
        _dispatch_thread_frame_push(&dtf, dq);
        _dispatch_client_callout(ctxt, func);
        _dispatch_perfmon_workitem_inc();
        _dispatch_thread_frame_pop(&dtf);
    }
    
    • bt查看顿栈信息
    image.png

    异步函数分析上

    异步函数并发队列底层源码分析

    • 查看dispatch_async方法
    #ifdef __BLOCKS__
    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    #endif
    
    • 查看_dispatch_continuation_async方法
    • 查看dx_push方法
    #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    
    • 查看dq_push方法

      image.png
    • 查看_dispatch_lane_concurrent_push方法

    DISPATCH_NOINLINE
    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        // <rdar://problem/24738102&24743140> reserving non barrier width
        // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
        // width equivalent), so we have to check that this thread hasn't
        // enqueued anything ahead of this call or we can break ordering
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                // barrier表示栅栏,控制流程
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    
    • 查看_dispatch_lane_push方法,可以添加符号断点查看执行流程
    DISPATCH_NOINLINE
    void
    _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        dispatch_wakeup_flags_t flags = 0;
        struct dispatch_object_s *prev;
    
        if (unlikely(_dispatch_object_is_waiter(dou))) {
            return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
        }
    
        dispatch_assert(!_dispatch_object_is_global(dq));
        qos = _dispatch_queue_push_qos(dq, qos);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
        if (unlikely(os_mpsc_push_was_empty(prev))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
        } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
            _dispatch_retain_2_unsafe(dq);
            flags = DISPATCH_WAKEUP_CONSUME_2;
        }
        os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
        if (flags) {
            return dx_wakeup(dq, qos, flags);
        }
    }
    
    • 查看dx_wakeup方法
    #define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
    
    • 查看dq_wakeup方法,执行的是_dispatch_lane_wakeup方法
    DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
        .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
        .do_dispose     = _dispatch_lane_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_lane_invoke,
    
        .dq_activate    = _dispatch_lane_activate,
        .dq_wakeup      = _dispatch_lane_wakeup,
        .dq_push        = _dispatch_lane_concurrent_push,
    );
    
    • 查看_dispatch_lane_wakeup方法
    • 查看_dispatch_queue_wakeup方法
    DISPATCH_NOINLINE
    void
    _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
    {
        dispatch_queue_t dq = dqu._dq;
        uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
        dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
    
        if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq);
            flags |= DISPATCH_WAKEUP_CONSUME_2;
        }
    
        if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
            dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
            qos = _dispatch_queue_wakeup_qos(dq, qos);
            // 执行到_dispatch_lane_class_barrier_complete方法
            return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
                    flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
        }
    ...... //省略代码
    
    • 查看_dispatch_lane_class_barrier_complete方法
    DISPATCH_NOINLINE
    static void
    _dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
            dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
            uint64_t owned)
    {
        uint64_t old_state, new_state, enqueue;
        dispatch_queue_t tq;
    
        if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
            tq = _dispatch_mgr_q._as_dq;
            enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
        } else if (target) {
            tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
            enqueue = DISPATCH_QUEUE_ENQUEUED;
        } else {
            tq = NULL;
            enqueue = 0;
        }
    
    again:
        // os_atomic_rmw_loop2o产生递归调用
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            if (unlikely(_dq_state_needs_ensure_ownership(old_state))) {
                _dispatch_event_loop_ensure_ownership((dispatch_wlh_t)dq);
                _dispatch_queue_move_to_contended_sync(dq->_as_dq);
                os_atomic_rmw_loop_give_up(goto again);
            }
    ...... // 省略代码
    
    if (_dq_state_received_override(old_state)) {
            // Ensure that the root queue sees that this thread was overridden.
            _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
        }
    
        if (tq) {
            if (likely((old_state ^ new_state) & enqueue)) {
                dispatch_assert(_dq_state_is_enqueued(new_state));
                dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
                // 这里最终会执行到_dispatch_root_queue_push
                return _dispatch_queue_push_queue(tq, dq, new_state);
            }
    #if HAVE_PTHREAD_WORKQUEUE_QOS
            if (_dq_state_should_override(new_state)) {
                return _dispatch_queue_wakeup_with_override(dq, new_state, flags);
            }
    #endif
    ...... // 省略代码
    
    • 查看_dispatch_root_queue_push方法
    • 查看_dispatch_root_queue_push_inline方法
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
            dispatch_object_t _head, dispatch_object_t _tail, int n)
    {
        struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
        if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
            return _dispatch_root_queue_poke(dq, n, 0);
        }
    }
    
    • 查看_dispatch_root_queue_poke方法
    • 查看_dispatch_root_queue_poke_slow方法
    DISPATCH_NOINLINE
    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
        int remaining = n;
    #if !defined(_WIN32)
        int r = ENOSYS;
    #endif
         //  单例底层方法
        _dispatch_root_queues_init();
        _dispatch_debug_root_queue(dq, __func__);
        _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    ...... //省略代码
    

    _dispatch_root_queue_poke_slow方法中调用了_dispatch_root_queues_init,异步函数分析先暂停到这里,下面先分析单例底层原理......

    单例底层原理

    • 查看_dispatch_root_queues_init方法
    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL,
                _dispatch_root_queues_init_once);
    }
    
    GCD单例写法
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
    });
    
    • 查看dispatch_once方法
    #ifdef __BLOCKS__
    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {  
        // 用val做一个判断,来创建单例
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    #endif
    
    • 查看dispatch_once_f方法
    DISPATCH_NOINLINE
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
        if (likely(v == DLOCK_ONCE_DONE)) {
            return;
        }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        if (likely(DISPATCH_ONCE_IS_GEN(v))) {
            return _dispatch_once_mark_done_if_quiesced(l, v);
        }
    #endif
    #endif
        // 添加锁
        if (_dispatch_once_gate_tryenter(l)) {
            return _dispatch_once_callout(l, ctxt, func);
        }
        // _dispatch_once_wait 等待检测 DLOCK_ONCE_DONE
        return _dispatch_once_wait(l);
    }
    
    • _dispatch_once_gate_tryenter上锁
    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
        // 原子操作,线程锁
        return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
                (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
    }
    
    • 查看_dispatch_once_callout方法,回调block
    DISPATCH_NOINLINE
    static void
    _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_client_callout(ctxt, func);
        _dispatch_once_gate_broadcast(l);
    }
    
    • _dispatch_once_gate_broadcast方法解锁
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
    {
        dispatch_lock value_self = _dispatch_lock_value_for_self();
        uintptr_t v;
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        v = _dispatch_once_mark_quiescing(l);
    #else
        // 设值done,防止单例下次重新初始化
        v = _dispatch_once_mark_done(l);
    #endif
        if (likely((dispatch_lock)v == value_self)) return;
        _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
    }
    
    • _dispatch_once_mark_done设置DLOCK_ONCE_DONE
    DISPATCH_ALWAYS_INLINE
    static inline uintptr_t
    _dispatch_once_mark_done(dispatch_once_gate_t dgo)
    {
        return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
    }
    
    • 查看_dispatch_once_wait,等待检测DLOCK_ONCE_DONE
    void
    _dispatch_once_wait(dispatch_once_gate_t dgo)
    {
        dispatch_lock self = _dispatch_lock_value_for_self();
        uintptr_t old_v, new_v;
    #if HAVE_UL_UNFAIR_LOCK || HAVE_FUTEX
        dispatch_lock *lock = &dgo->dgo_gate.dgl_lock;
    #endif
        uint32_t timeout = 1;
    ...... //省略代码
    

    异步函数分析下

    继续上面异步函数分析,上面我们探索到_dispatch_root_queues_init -> _dispatch_root_queues_init_once方法

    static void
    _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
    {
        _dispatch_fork_becomes_unsafe();
    #if DISPATCH_USE_INTERNAL_WORKQUEUE
        size_t i;
        for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
            _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
                    _dispatch_root_queues[i].dq_priority);
        }
    #else
        int wq_supported = _pthread_workqueue_supported();
        int r = ENOTSUP;
    
        if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
            DISPATCH_INTERNAL_CRASH(wq_supported,
                    "QoS Maintenance support required");
        }
    
    #if DISPATCH_USE_KEVENT_SETUP
        struct pthread_workqueue_config cfg = {
            .version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
            .flags = 0,
            .workq_cb = 0,
            .kevent_cb = 0,
            .workloop_cb = 0,
            .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
    #if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
            .queue_label_offs = dispatch_queue_offsets.dqo_label,
    #endif
        };
    #endif
    
    #pragma clang diagnostic push
    #pragma clang diagnostic ignored "-Wunreachable-code"
        if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
    #if DISPATCH_USE_KEVENT_SETUP
            // 执行_dispatch_worker_thread2
            cfg.workq_cb = _dispatch_worker_thread2;
            r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    #else
            r = _pthread_workqueue_init(_dispatch_worker_thread2,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
    #endif // DISPATCH_USE_KEVENT_SETUP
    #if DISPATCH_USE_KEVENT_WORKLOOP
        } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
    #if DISPATCH_USE_KEVENT_SETUP
            // 执行_dispatch_worker_thread2
            cfg.workq_cb = _dispatch_worker_thread2;
            cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
            cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
    r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    #else
            // 执行_dispatch_worker_thread2
            r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
                    (pthread_workqueue_function_kevent_t)
                    _dispatch_kevent_worker_thread,
                    (pthread_workqueue_function_workloop_t)
                    _dispatch_workloop_worker_thread,
                    offsetof(struct dispatch_queue_s, dq_serialnum), 0);
    ......
    
    image.png

    _dispatch_root_queues_init_once执行一次的任务包装在 _dispatch_worker_thread2里面,其实包装在pthread中API中,GCD是封装了pthread
    这里的_pthread_workqueue_init_with_workloop工作循环调起,并不是及时调用的,而是受我们当前OS管控。

    • 回到_dispatch_root_queue_poke_slow方法
    DISPATCH_NOINLINE
    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
            //默认传1 全局并发队列创建1个线程
        int remaining = n;
    #if !defined(_WIN32)
        int r = ENOSYS;
    #endif
    
        _dispatch_root_queues_init();
        _dispatch_debug_root_queue(dq, __func__);
        //runtime相关处理
        _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    #endif
        {
            //全局GLOBAL队列 创建线程
            _dispatch_root_queue_debug("requesting new worker thread for global "
                    "queue: %p", dq);
            // 创造线程执行
            r = _pthread_workqueue_addthreads(remaining,
                    _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
            (void)dispatch_assume_zero(r);
            return;
        }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    
    ...... //省略代码
        
        //如果是普通的 进入do while循环
        //remaining空余的数量
        int can_request, t_count;
        // seq_cst with atomic store to tail <rdar://problem/16932833>
        t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
        do {
            can_request = t_count < floor ? 0 : t_count - floor;
                    //大于抛异常
            if (remaining > can_request) {
                _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                        remaining, can_request);
                os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
                remaining = can_request;
            }
                    //变0抛异常 
            if (remaining == 0) {
                _dispatch_root_queue_debug("pthread pool is full for root queue: "
                        "%p", dq);
                return;
            }
        } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
                t_count - remaining, &t_count, acquire));
        //dgq_thread_pool_size 标记为1。
    
    ...... //省略代码
    
        do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
    #if DISPATCH_DEBUG
            unsigned dwStackSize = 0;
    #else           
            //到底开多大 
                    unsigned dwStackSize = 64 * 1024;
    #endif
    ...... //省略代码
    
    • dgq_thread_pool_size标记为1
    struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
        DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
        .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
        .do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
        .dq_label = "com.apple.root.libdispatch-manager",
        .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
        .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
                DISPATCH_PRIORITY_SATURATED_OVERRIDE,
        .dq_serialnum = 3,
        .dgq_thread_pool_size = 1,
    };
    
    • DISPATCH_QUEUE_WIDTH_POOL
    #define DISPATCH_QUEUE_WIDTH_FULL_BIT       0x0020000000000000ull
    #define DISPATCH_QUEUE_WIDTH_FULL           0x1000ull
    #define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
    #define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2)
    #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
            ({ uint16_t _width = (width); \
            _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
    
    • 全局队列并发队列大1

      image.png
    • 通过os_atomic_inc2o自增加++

    (void)os_atomic_inc2o(dq, dgq_thread_pool_size, release);
    
    • DISPATCH_WORKQ_MAX_PTHREAD_COUNT线程池最大数量
    #ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
    #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
    #endif
    

    unsigned dwStackSize = 64 * 1024;
    最大开辟线程数量1GB = 1024*1024/16kb = 1024*64

    线程编程指南

    image.png

    异步函数调用流程_dispatch_worker_thread2 ->_dispatch_root_queue_drain->_dispatch_async_redirect_invoke->_dispatch_continuation_pop->_dispatch_client_callout->_dispatch_call_block_and_release

    相关文章

      网友评论

          本文标题:GCD分析(中)

          本文链接:https://www.haomeiwen.com/subject/bmuawltx.html