美文网首页
GCD-源码分析

GCD-源码分析

作者: 浪的出名 | 来源:发表于2020-11-10 10:58 被阅读0次

    GCD源码分析

    dispatch_queue_create

    • dispatch_queue_create队列创建方法
    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
        return _dispatch_lane_create_with_target(label, attr,
                DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    
    • _dispatch_lane_create_with_target分析
        return _dispatch_trace_queue_create(dq)._dq;
    
    • 上面方法的代码还是比较长的,我们通过查看它的返回值方法_dispatch_trace_queue_create(dq)._dq;_dispatch_trace_queue_create(dq)._dq;调用了_dispatch_introspection_queue_create方法
    ispatch_queue_class_t
    _dispatch_introspection_queue_create(dispatch_queue_t dq)
    {
        dispatch_queue_introspection_context_t dqic;
        size_t sz = sizeof(struct dispatch_queue_introspection_context_s);
    
        if (!_dispatch_introspection.debug_queue_inversions) {
            sz = offsetof(struct dispatch_queue_introspection_context_s,
                    __dqic_no_queue_inversion);
        }
        dqic = _dispatch_calloc(1, sz);
        dqic->dqic_queue._dq = dq;
        if (_dispatch_introspection.debug_queue_inversions) {
            LIST_INIT(&dqic->dqic_order_top_head);
            LIST_INIT(&dqic->dqic_order_bottom_head);
        }
        dq->do_finalizer = dqic;
    
        _dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
        LIST_INSERT_HEAD(&_dispatch_introspection.queues, dqic, dqic_list);
        _dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);
    
        DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
        if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
            _dispatch_introspection_queue_create_hook(dq);
        }
        return upcast(dq)._dqu;
    }
    
    • 该方法是对dq的一些处理,回到开始的回调方法,发现他返回的是_dq,也就是说返回的就是dq,后面只是对dq的一些处理完善
    • 再回到_dispatch_lane_create_with_target,找到dq的alloc和init
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
                sizeof(struct dispatch_lane_s)); // alloc
        _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
                DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
                (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
    
    • 通过参数vtable来确定队列的类型,DISPATCH_VTABLE通过一系列的宏来确定vtable的类型
            const void *vtable;
        dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
        if (dqai.dqai_concurrent) {// 并行队列
            // OS_dispatch_queue_concurrent_class
            vtable = DISPATCH_VTABLE(queue_concurrent);
        } else {
                    // OS_dispatch_ queue_serial_class
            vtable = DISPATCH_VTABLE(queue_serial);
        }
    
    #define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
    #define DISPATCH_OBJC_CLASS(name)   (&DISPATCH_CLASS_SYMBOL(name))
    #if USE_OBJC
    #define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class
    
    队列的类型
    • 查看dpai的赋值dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    dispatch_queue_attr_info_t
    _dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
    {
        dispatch_queue_attr_info_t dqai = { };
    
        if (!dqa) return dqai;
    
    #if DISPATCH_VARIANT_STATIC
        if (dqa == &_dispatch_queue_attr_concurrent) { // null 默认都是串行的队列
            dqai.dqai_concurrent = true;
            return dqai;
        }
    #endif
            ......
    }
    
    • 总结,我们在调用dispatch_queue_create创建队列,会传递两个参数,通过这两个参数来确定队列的名字,和队列串行或并行属性,然后alloc,init出一个队列对象,根据传递参数的不同,他们的类型也不一样。

    dispatch_async

    • 异步函数dispatch_async
    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME;
        dispatch_qos_t qos;
        // 任务包装器 - 接受 - 保存 - 函数式
        // 保存 block 
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    • 任务包装方法_dispatch_continuation_init,copy任务,封装func
    DISPATCH_ALWAYS_INLINE
    static inline dispatch_qos_t
    _dispatch_continuation_init(dispatch_continuation_t dc,
            dispatch_queue_class_t dqu, dispatch_block_t work,
            dispatch_block_flags_t flags, uintptr_t dc_flags)
    {
        void *ctxt = _dispatch_Block_copy(work);
    
        dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            dc->dc_flags = dc_flags;
            dc->dc_ctxt = ctxt;
            // will initialize all fields but requires dc_flags & dc_ctxt to be set
            return _dispatch_continuation_init_slow(dc, dqu, flags);
        }
    
        dispatch_function_t func = _dispatch_Block_invoke(work);
        if (dc_flags & DC_FLAG_CONSUME) {
            func = _dispatch_call_block_and_release;
        }
        return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
    }
    
    • _dispatch_continuation_init_f方法将封装好的func和任务赋值
    DISPATCH_ALWAYS_INLINE
    static inline dispatch_qos_t
    _dispatch_continuation_init_f(dispatch_continuation_t dc,
            dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
            dispatch_block_flags_t flags, uintptr_t dc_flags)
    {
        pthread_priority_t pp = 0;
        dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
        dc->dc_func = f;
        dc->dc_ctxt = ctxt;
        // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
        // should not be propagated, only taken from the handler if it has one
        if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
            pp = _dispatch_priority_propagate();
        }
        _dispatch_continuation_voucher_set(dc, flags);
        return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
    }
    
    • 继续执行_dispatch_continuation_async
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);
    }
    
    • 全局搜索dx_push,搜到#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    • 继续搜索dq_push
      image.png
    • 发现很多结果,因为我们是自定义并发队列,所以走的是_dispatch_lane_concurrent_push方法
    • _dispatch_lane_concurrent_push方法里面会有dx_push的递归,最终会调用到_dispatch_root_queue_push方法
    DISPATCH_NOINLINE
    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        // <rdar://problem/24738102&24743140> reserving non barrier width
        // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
        // width equivalent), so we have to check that this thread hasn't
        // enqueued anything ahead of this call or we can break ordering
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    
    • 继续往下执行_dispatch_root_queue_push_inline-->_dispatch_root_queue_poke-->_dispatch_root_queue_poke_slow
    • _dispatch_root_queue_poke_slow里面就有_dispatch_root_queues_init()的初始化和关于线程的开辟等
    ......
    _dispatch_root_queues_init();
    ......
    do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
            while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {//创建线程
                if (r != EAGAIN) {
                    (void)dispatch_assume_zero(r);
                }
                _dispatch_temporary_resource_shortage();
            }
        } while (--remaining);
    ......
    
    • _dispatch_root_queues_init方法
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL,
                _dispatch_root_queues_init_once);
    }
    
    • _dispatch_root_queues_init_once方法里面就有cfg.workq_cb = _dispatch_worker_thread2;,而在我们执行block中断点的堆栈中得到下面的流程
    * thread #9, queue = 'com.apple.root.default-qos', stop reason = breakpoint 3.1
      * frame #0: 0x00000001073a1cd7 GCD-Basic`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x00000001073a4048) at ViewController.m:22:9
        frame #1: 0x00000001076bfdd4 libdispatch.dylib`_dispatch_call_block_and_release + 12
        frame #2: 0x00000001076c0d48 libdispatch.dylib`_dispatch_client_callout + 8
        frame #3: 0x00000001076c31ef libdispatch.dylib`_dispatch_queue_override_invoke + 1022
        frame #4: 0x00000001076d228c libdispatch.dylib`_dispatch_root_queue_drain + 351
        frame #5: 0x00000001076d2b96 libdispatch.dylib`_dispatch_worker_thread2 + 132
        frame #6: 0x00007fff524636b6 libsystem_pthread.dylib`_pthread_wqthread + 220
        frame #7: 0x00007fff52462827 libsystem_pthread.dylib`start_wqthread + 15
    
    • 可以看到就是在这里注册了block的调用

    dispatch_sync

    • dispatch_sync同步函数的底层实现
    DISPATCH_NOINLINE
    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    • _dispatch_sync_f方法里面调用了_dispatch_sync_f_inline
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        if (likely(dq->dq_width == 1)) {//串行队列
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {//死锁
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    

    同步串行的处理

    • 如果传过来的是串行队列,就会执行_dispatch_barrier_sync_f方法,往下继续调用_dispatch_barrier_sync_f_inline
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();//获取线程id
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // The more correct thing to do would be to merge the qos of the thread
        // that just acquired the barrier lock into the queue state.
        //
        // However this is too expensive for the fast path, so skip doing it.
        // The chosen tradeoff is that if an enqueue on a lower priority thread
        // contends with this fast path, this thread may receive a useless override.
        //
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {//同步栅栏函数也会死锁
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);//同步开始的准备工作
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    • _dispatch_lane_barrier_sync_invoke_and_complete方法,任务处理完成后会通知系统继续往下执行,因为同步会堵塞当前线程。
    static void
    _dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
            void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);//任务的处理
        _dispatch_trace_item_complete(dc);//处理完成的回调
        if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
            return _dispatch_lane_barrier_complete(dq, 0, 0);
        }
    
        // Presence of any of these bits requires more work that only
        // _dispatch_*_barrier_complete() handles properly
        //
        // Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
        // checking the role is sloppy, but is a super fast check, and neither of
        // these bits should be set if the lock was never contended/discovered.
        const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
                DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
                DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
                DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
        uint64_t old_state, new_state;
    
            // 同步任务执行完成通知系统往下执行
        // similar to _dispatch_queue_drain_try_unlock
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state  = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
            new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
            new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
            if (unlikely(old_state & fail_unlock_mask)) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_lane_barrier_complete(dq, 0, 0);
                });
            }
        });
        if (_dq_state_is_base_wlh(old_state)) {
            _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
        }
    }
    
    • 任务的处理_dispatch_sync_function_invoke_inline
    static inline void
    _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
            dispatch_function_t func)
    {
        dispatch_thread_frame_s dtf;
        _dispatch_thread_frame_push(&dtf, dq);
        _dispatch_client_callout(ctxt, func);
        _dispatch_perfmon_workitem_inc();
        _dispatch_thread_frame_pop(&dtf);
    }
    

    同步死锁

    • _dispatch_sync_f_slow
    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
    
        pthread_priority_t pp = _dispatch_get_priority();
        struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    
        _dispatch_trace_item_push(top_dq, &dsc);//将任务加入到队列
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    
    • 将任务添加到队列之后会执行__DISPATCH_WAIT_FOR_QUEUE__
    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
    
        // Blocks submitted to the main thread MUST run on the main thread, and
        // dispatch_async_and_wait also executes on the remote context rather than
        // the current thread.
        //
        // For both these cases we need to save the frame linkage for the sake of
        // _dispatch_async_and_wait_invoke
        _dispatch_thread_frame_save_state(&dsc->dsc_dtf);
    
        if (_dq_state_is_suspended(dq_state) ||
                _dq_state_is_base_anon(dq_state)) {
            dsc->dc_data = DISPATCH_WLH_ANON;
        } else if (_dq_state_is_base_wlh(dq_state)) {
            dsc->dc_data = (dispatch_wlh_t)dq;
        } else {
            _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
        }
    
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
                    (uint8_t)_dispatch_get_basepri_override_qos_floor();
            _dispatch_thread_event_init(&dsc->dsc_event);
        }
        dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
        _dispatch_trace_runtime_event(sync_wait, dq, 0);
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
        } else {
            _dispatch_event_loop_wait_for_ownership(dsc);
        }
        if (dsc->dc_data == DISPATCH_WLH_ANON) {
            _dispatch_thread_event_destroy(&dsc->dsc_event);
            // If _dispatch_sync_waiter_wake() gave this thread an override,
            // ensure that the root queue sees it.
            if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
                _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
            }
        }
    }
    
    • _dq_state_drain_locked_by-->_dispatch_lock_is_locked_by
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    
    • 如果lock_valuetid相同就会执行DISPATCH_CLIENT_CRASH((uintptr_t)dq_state, "dispatch_sync called on queue " "already owned by current thread");,也就是我们所说的死锁

    同步并发的处理

    • 如果不是串行,且没有死锁的时候,就会执行
    static void
    _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);//任务的处理
        _dispatch_trace_item_complete(dc);
        _dispatch_lane_non_barrier_complete(dq, 0);
    }
    
    • 和上文同步串行的处理差不多,任务处理完后通知系统继续往下执行。

    单列

    • 一般我们创建单列的代码如下
        static dispatch_once_t onceToken;
        dispatch_once(&onceToken, ^{
            
        });
    
    • dispatch_once的底层实现
    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    
    • dispatch_once_f的底层实现
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
        if (likely(v == DLOCK_ONCE_DONE)) {
            return;
        }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        if (likely(DISPATCH_ONCE_IS_GEN(v))) {
            return _dispatch_once_mark_done_if_quiesced(l, v);
        }
    #endif
    #endif
        if (_dispatch_once_gate_tryenter(l)) {
            return _dispatch_once_callout(l, ctxt, func);
        }
        return _dispatch_once_wait(l);
    }
    
    • 外面传入两个参数,一个静态onceToken保证唯一性,还有一个block,将onceToken包装成一个dispatch_once_gate_t l变量,用来记录单列的状态,如果已经是DLOCK_ONCE_DONE就表示已经执行过一次了,就会直接返回,正常情况下是会走_dispatch_once_callout。进入_dispatch_once_callout之前还有个if判断_dispatch_once_gate_tryenter(保证执行一次)。
    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
        return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
                (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
    }
    
    • _dispatch_once_callout
    static void
    _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_client_callout(ctxt, func);//执行block
        _dispatch_once_gate_broadcast(l);
    }
    
    • _dispatch_once_gate_broadcast会将l的状态改变,保证只执行一次
    static inline void
    _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
    {
        dispatch_lock value_self = _dispatch_lock_value_for_self();
        uintptr_t v;
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        v = _dispatch_once_mark_quiescing(l);
    #else
        v = _dispatch_once_mark_done(l);
    #endif
        if (likely((dispatch_lock)v == value_self)) return;
        _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
    }
    

    dispatch_semaphore_t信号量

    • dispatch_semaphore_t信号量为1可以当锁使用,可以控制GCD的最大并发量,信号量的使用需要dispatch_semaphore_waitdispatch_semaphore_signal配合使用。

    dispatch_semaphore_wait

    • dispatch_semaphore_wait的底层实现
    long
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
        long value = os_atomic_inc2o(dsema, dsema_value, release);
        if (likely(value > 0)) {
            return 0;
        }
        if (unlikely(value == LONG_MIN)) {
            DISPATCH_CLIENT_CRASH(value,
                    "Unbalanced call to dispatch_semaphore_signal()");
        }
        return _dispatch_semaphore_signal_slow(dsema);
    }
    
    • os_atomic_inc2o其实就是一个信号量+1的操作,如果value > 0,直接返回0,可以继续使用,否则会进入一个长等待直到信号量的值>0。
    #define os_atomic_inc2o(p, f, m) \
            os_atomic_add2o(p, f, 1, m)   
    #define os_atomic_add2o(p, f, v, m) \
            os_atomic_add(&(p)->f, (v), m)
    #define os_atomic_add(p, v, m) \
            _os_atomic_c11_op((p), (v), m, add, +)
    #define _os_atomic_c11_op(p, v, m, o, op) \
            ({ _os_atomic_basetypeof(p) _v = (v), _r = \
            atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
            memory_order_##m); (__typeof__(_r))(_r op _v); })
    //_r = atomic_fetch_add_explicit(dsema-> dsema_value,1);
    
    

    dispatch_semaphore_wait

    • dispatch_semaphore_wait的底层实现
    long
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
        long value = os_atomic_dec2o(dsema, dsema_value, acquire);
        if (likely(value >= 0)) {
            return 0;
        }
        return _dispatch_semaphore_wait_slow(dsema, timeout);
    }
    
    • os_atomic_dec2o其实就是一个信号量-1的操作,如果value >= 0,直接返回0,继续往下执行,否则就会执行_dispatch_semaphore_signal_slow等待,直到信号量唤醒或者超时

    dispatch_group

    dispatch_group_create

    • 调度组的创建dispatch_group_create,然后会调用_dispatch_group_create_with_count方法
    // 创建一个group对象,并进行一些初始化处理
    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
        dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
                sizeof(struct dispatch_group_s));
        dg->do_next = DISPATCH_OBJECT_LISTLESS;
        dg->do_targetq = _dispatch_get_default_queue(false);
        if (n) {
            os_atomic_store2o(dg, dg_bits,
                    (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
            os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
        }
        return dg;
    }
    
    dispatch_group_t
    dispatch_group_create(void)
    {
        return _dispatch_group_create_with_count(0);
    }
    

    dispatch_group_enter

    • dispatch_group_enter源码实现如下,os_atomic_sub_orig2odg_bits-1操作
    void
    dispatch_group_enter(dispatch_group_t dg)
    {
        // The value is decremented on a 32bits wide atomic so that the carry
        // for the 0 -> -1 transition is not propagated to the upper 32bits.
        uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
                DISPATCH_GROUP_VALUE_INTERVAL, acquire);
        uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
        if (unlikely(old_value == 0)) {
            _dispatch_retain(dg); // <rdar://problem/22318411>
        }
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
            DISPATCH_CLIENT_CRASH(old_bits,
                    "Too many nested calls to dispatch_group_enter()");
        }
    }
    

    dispatch_group_leave

    • dispatch_group_leave源码实现如下,os_atomic_add_orig2odg_bits+1操作,如果 +1 之后还等于 0 那么说明之前没有调用dispatch_group_enter,就里会 crash,当然这里核心在 _dispatch_group_wake
    void
    dispatch_group_leave(dispatch_group_t dg)
    {
        // The value is incremented on a 64bits wide atomic so that the carry for
        // the -1 -> 0 transition increments the generation atomically.
        uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
                DISPATCH_GROUP_VALUE_INTERVAL, release);
        uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
            old_state += DISPATCH_GROUP_VALUE_INTERVAL;
            do {
                new_state = old_state;
                if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                    new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                } else {
                    // If the group was entered again since the atomic_add above,
                    // we can't clear the waiters bit anymore as we don't know for
                    // which generation the waiters are for
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                }
                if (old_state == new_state) break;
            } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                    old_state, new_state, &old_state, relaxed)));
            return _dispatch_group_wake(dg, old_state, true);
        }
    
        if (unlikely(old_value == 0)) {
            DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                    "Unbalanced call to dispatch_group_leave()");
        }
    }
    
    • _dispatch_group_wake方法,重点研究_dispatch_continuation_async
    DISPATCH_NOINLINE
    static void
    _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
    {
        uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
        if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
            dispatch_continuation_t dc, next_dc, tail;
    
            // Snapshot before anything is notified/woken <rdar://problem/8554546>
            dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
            do {
                dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
                next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
                _dispatch_continuation_async(dsn_queue, dc,
                        _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
                _dispatch_release(dsn_queue);
            } while ((dc = next_dc));
    
            refs++;
        }
    
        if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
            _dispatch_wake_by_address(&dg->dg_gen);
        }
    
        if (refs) _dispatch_release_n(dg, refs);
    }
    
    • _dispatch_continuation_async方法,发现和上文异步函数的执行流程很相似
    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);
    }
    

    dispatch_group_async

    • dispatch_group_async的效果相当于dispatch_group_enter+dispatch_group_leave
    void
    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
        _dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    
    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc, dispatch_qos_t qos)
    {
        dispatch_group_enter(dg);
        dc->dc_data = dg;
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    • 通过在block断点处打印bt,观察进入到_dispatch_continuation_with_group_invoke里面找到了dispatch_group_leave的执行
    frame #1: 0x000000010a1f5dd4 libdispatch.dylib`_dispatch_call_block_and_release + 12
        frame #2: 0x000000010a1f6d48 libdispatch.dylib`_dispatch_client_callout + 8
        frame #3: 0x000000010a1f979a libdispatch.dylib`_dispatch_continuation_pop + 776
        frame #4: 0x000000010a1f8ac5 libdispatch.dylib`_dispatch_async_redirect_invoke + 849
        frame #5: 0x000000010a20828c libdispatch.dylib`_dispatch_root_queue_drain + 351
        frame #6: 0x000000010a208b96 libdispatch.dylib`_dispatch_worker_thread2 + 132
        frame #7: 0x00007fff524636b6 libsystem_pthread.dylib`_pthread_wqthread + 220
        frame #8: 0x00007fff52462827 libsystem_pthread.dylib`start_wqthread + 15
    
    // 通过查找`_dispatch_client_callout `的调用
    static inline void
    _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
    {
        struct dispatch_object_s *dou = dc->dc_data;
        unsigned long type = dx_type(dou);
        if (type == DISPATCH_GROUP_TYPE) {
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_trace_item_complete(dc);
            dispatch_group_leave((dispatch_group_t)dou);
        } else {
            DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
        }
    }
    
    

    dispatch_group_notify

    • dispatch_group_notify通知
    static inline void
    _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dsn)
    {
        uint64_t old_state, new_state;
        dispatch_continuation_t prev;
    
        dsn->dc_data = dq;
        _dispatch_retain(dq);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
        os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) {
            os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
                new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
                if ((uint32_t)old_state == 0) {
                    os_atomic_rmw_loop_give_up({
                        return _dispatch_group_wake(dg, new_state, false);
                    });
                }
            });
        }
    }
    
    • 发现old_state == 0就会执行dispatch_group_notifyblock的内容

    相关文章

      网友评论

          本文标题:GCD-源码分析

          本文链接:https://www.haomeiwen.com/subject/mpctbktx.html