美文网首页iOS随笔iOS 底层探索之路
iOS 底层探索:多线程GCD底层探索(上)

iOS 底层探索:多线程GCD底层探索(上)

作者: 欧德尔丶胡 | 来源:发表于2020-11-11 10:33 被阅读0次

    iOS 底层探索: 学习大纲 OC篇

    前言

    • 上篇我们复习了常用的GCD之后,我们会问dispatch_async是如何开启线程的呢?带着这个疑问,我们开启GCD 底层源码的研究。
    • 主要内容:探索队列的本质,队列的创建,串行和并发 、同步和异步执行的区别及底层执行流程。

    准备

    一 、查找 GCD 源码

    来到工程,我们跳转查看dispatch_async如下:只是提供了一个对外的接口。

    #ifdef __BLOCKS__
    API_AVAILABLE(macos(10.6), ios(4.0))
    DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
    void
    dispatch_async(dispatch_queue_t queue, dispatch_block_t block);
    #endif
    

    所以我们再去断点调试:


    我们在dispatch_async下断点,查看堆栈下的dispatch_async,就会来到汇编调用情况,其中我们可以看到libdispatch.dylib 调用了dispatch_async:,所以根据以前的经验,GCD的库就在libdispatch.dylib 里面.所以我们去源码中查看. 下载地址上面已给出。

    二 、libdispatch源码分析

    当我翻开源码的时候,首先我发现,没法编译这份源码,其次源码中结构体、方法等的命名不能从字面理解,各种宏定义的嵌套错综复杂,所以看了半天有点读不下去了。但是,虽然不能像以前探索objc源码那样通过LLDB调试验证,但是我们可以从我们使用GCD流程下手,刨根问底,抱着探索求真的目的,顺着思路往下慢慢的找关键代码,然后进行分析其流程。

    所以我们开始第一阶段的探索:

    在上一篇讲解了GCD的使用步骤:

    • 第一步:创建一个队列(串行队列或并发队列)
    • 第二步:将任务追加到任务的等待队列中,然后系统就会根据任务类型执行任务(同步执行或异步执行)

    队列类型 dispatch_queue_t
    队列的创建方法:dispatch_queue_create

    创建串行队列
    dispatch_queue_t queue = 
    dispatch_queue_create("com.lgcooci.seial", DISPATCH_QUEUE_SERIAL);
    
    创建并发队列
    dispatch_queue_t queue = 
    dispatch_queue_create("com.lgcooci.concurrent", DISPATCH_QUEUE_CONCURRENT);
    

    首先我们在源码中全局去搜索找找dispatch_queue_t是什么?

    1. 队列类型dispatch_queue_t

    首先可以看到dispatch_queue_t本身只是dispatch_queue_s这个结构体指针

    typedef struct dispatch_queue_s *dispatch_queue_t; 
    

    继续查找dispatch_queue_s定义

    struct dispatch_queue_s {
        DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
        /* 32bit hole on LP64 */
    } DISPATCH_ATOMIC64_ALIGN;
    

    继续查找DISPATCH_QUEUE_CLASS_HEADER ,发现是个如下

    #define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
        _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
        /* LP64 global queue cacheline boundary */ \
        unsigned long dq_serialnum; \// queue的编号
        const char *dq_label; \   //标签
        DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
            const uint16_t dq_width, \
            const uint16_t __dq_opaque2 \
        ); \
        dispatch_priority_t dq_priority; \//优先级
        union { \
            struct dispatch_queue_specific_head_s *dq_specific_head; \
            struct dispatch_source_refs_s *ds_refs; \
            struct dispatch_timer_source_refs_s *ds_timer_refs; \
            struct dispatch_mach_recv_refs_s *dm_recv_refs; \
            struct dispatch_channel_callbacks_s const *dch_callbacks; \
        }; \
        int volatile dq_sref_cnt
    

    在这个宏里我们找到相关的方法_DISPATCH_QUEUE_CLASS_HEADER(x, pointer_sized_field); 继续展开搜索查看里面的内容如下:

    // 展开_DISPATCH_QUEUE_CLASS_HEADER
    
    #define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
        DISPATCH_OBJECT_HEADER(x); \
        DISPATCH_UNION_LE(uint64_t volatile dq_state, \
                dispatch_lock dq_state_lock, \
                uint32_t dq_state_bits \
        ); \
    
    // 持续展开DISPATCH_OBJECT_HEADER
    
    #define DISPATCH_OBJECT_HEADER(x) \
        struct dispatch_object_s _as_do[0]; \
        _DISPATCH_OBJECT_HEADER(x)
        
    // 进一步查看 _DISPATCH_OBJECT_HEADER
    
    #define _DISPATCH_OBJECT_HEADER(x) \
        struct _os_object_s _as_os_obj[0]; \
        OS_OBJECT_STRUCT_HEADER(dispatch_##x); \  // 这个宏,可以理解为dispatch_object_s继承自_os_object_s
        struct dispatch_##x##_s *volatile do_next; \
        struct dispatch_queue_s *do_targetq; \
        void *do_ctxt; \
        void *do_finalizer
        
    

    来到OS_OBJECT_STRUCT_HEADER之后,我们需要注意一个成员变量,记住这个成员变量名字叫做do_vtable。再继续拆解_OS_OBJECT_HEADER发现里面起就是一个isa指针引用计数一些信息。

    再查看 OS_OBJECT_STRUCT_HEADER

    #define OS_OBJECT_STRUCT_HEADER(x) \
        _OS_OBJECT_HEADER(\
        const void *_objc_isa, \
        do_ref_cnt, \
        do_xref_cnt); \
    // 注意这个成员变量,后面将任务Push到队列就是通过这个变量
        const struct x##_vtable_s *do_vtable
        
    // 进一步查看 _OS_OBJECT_HEADER
    
    #define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
            isa; /* must be pointer-sized */ \ // isa指针
            int volatile ref_cnt; \ // gcd对象内部引用计数
            int volatile xref_cnt// gcd对象外部引用计数(内外部都要减到0时,对象会被释放)
    

    经过一步步相关代码的查找最终来到_OS_OBJECT_HEADER ,可以发现这些代码晦涩难懂,但是我们研究源码不一定要知道每句代码的意思,我们查看其核心内容:dispatch_queue_t 的本质是一个结构体指针对象,里面包含了label(标签)、priority(优先级)等一些信息。dispatch_queue_t类型的指针,指向一个dispatch_queue_s 类型的结构体。

    其实GCD源码中的数据结构为dispatch_object_t联合抽象类

    typedef union {
        struct _os_object_s *_os_obj;// 基类
        struct dispatch_object_s *_do;// 基类继承os_object
        struct dispatch_queue_s *_dq;// 队列结构
        struct dispatch_queue_attr_s *_dqa;// 队列相关属性
        struct dispatch_group_s *_dg;// group结构
        struct dispatch_source_s *_ds;
        struct dispatch_channel_s *_dch;
        struct dispatch_mach_s *_dm;
        struct dispatch_mach_msg_s *_dmsg;
        struct dispatch_semaphore_s *_dsema;// 信号量
        struct dispatch_data_s *_ddata;
        struct dispatch_io_s *_dchannel;
    } dispatch_object_t DISPATCH_TRANSPARENT_UNION;
    

    2. 创建队列 dispatch_queue_create

    源码查看如下:

    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
        return _dispatch_lane_create_with_target(label, attr,
                DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    

    label : 标签,我们平时传入队列的名字
    attr :我们知道创建队列时, attr 属性有三个值可选,nil、DISPATCH_QUEUE_SERIAL(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT

    _dispatch_queue_create_with_target函数,这里会创建一个root队列,并将自己新建的队列绑定到所对应的root队列上。

    DISPATCH_NOINLINE
    static dispatch_queue_t
    _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
            dispatch_queue_t tq, bool legacy)
    {
        dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    
        //
        // Step 1: Normalize arguments (qos, overcommit, tq) 规范化参数
        //
        // 根据上文代码注释里提到的,作者认为调用者传入DISPATCH_QUEUE_SERIAL和nil的几率要大于传DISPATCH_QUEUE_CONCURRENT。所以这里设置个默认值。
        // 这里怎么理解呢?只要看做if(!dqa)即可
        if (!slowpath(dqa)) {
            // _dispatch_get_default_queue_attr里面会将dqa的dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT的,inactive也指定为false。这里就不展开了,只需要知道赋了哪些值。因为后面会用到。
            dqa = _dispatch_get_default_queue_attr();
        } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
            DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
        }
    
        // 取出优先级
        dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
    
        // overcommit单纯从英文理解表示过量使用的意思,那这里这个overcommit就是一个标识符,表示是不是就算负荷很高了,但还是得给我新开一个线程出来给我执行任务。
        _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
        if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
            if (tq->do_targetq) {
                DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                        "a non-global target queue");
            }
        }
    
        // 如果overcommit没有被指定
        if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
             // 所以对于overcommit,如果是串行的话默认是开启的,而并行是关闭的
            overcommit = dqa->dqa_concurrent ?
                    _dispatch_queue_attr_overcommit_disabled :
                    _dispatch_queue_attr_overcommit_enabled;
        }
    
        // 之前说过初始化队列默认传了DISPATCH_TARGET_QUEUE_DEFAULT,也就是null,所以进入if语句。
        if (!tq) {
            // 获取一个管理自己队列的root队列。
            tq = _dispatch_get_root_queue(
                    qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                    overcommit == _dispatch_queue_attr_overcommit_enabled);
            if (slowpath(!tq)) {
                DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
            }
        }
    
    //  -----------  开始进行队列的创建   -----------------
    
        // legacy默认是true的
        if (legacy) {
            // 之前说过,默认是会给dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT,所以这个判断式是成立的
            if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
                legacy = false;
            }
        }
    // vtable变量很重要,之后会被赋值到之前说的dispatch_queue_t结构体里的do_vtable变量上
        const void *vtable;
        dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
        if (dqai.dqai_concurrent) {
    // 如果创建队列的时候传了DISPATCH_QUEUE_CONCURRENT,就是走这里
            vtable = DISPATCH_VTABLE(queue_concurrent);
        } else {
            // 如果创建线程没有指定为并行队列,无论你传DISPATCH_QUEUE_SERIAL还是nil,都会创建一个串行队列。
            vtable = DISPATCH_VTABLE(queue_serial);
        }
        switch (dqai.dqai_autorelease_frequency) {
                 .....
        }
         if (label) {
            // 判断传进来的字符串是否可变的,如果可变的copy成一份不可变的
            const char *tmp = _dispatch_strdup_if_mutable(label);
            if (tmp != label) {
                dqf |= DQF_LABEL_NEEDS_FREE;
                label = tmp;
            }
        }
    
    
       // _dispatch_object_alloc里面就将vtable赋值给do_vtable变量上了。
        dispatch_lane_t dq = _dispatch_object_alloc(vtable,
                sizeof(struct dispatch_lane_s));
       // 第三个参数根据是否并行队列,如果不是则最多开一个线程,如果是则最多开0x1000 - 2个线程,这个数量很惊人了已经,换成十进制就是(4096 - 2)个。
        // dqa_inactive之前说串行是false的
        // DISPATCH_QUEUE_ROLE_INNER 也是0,所以这里串行队列的话dqa->dqa_state是0
        _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
                DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
                (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
    
        dq->dq_label = label;
        dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
                dqai.dqai_relpri);
        if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
            dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
        }
        if (!dqai.dqai_inactive) {
            _dispatch_queue_priority_inherit_from_target(dq, tq);
            _dispatch_lane_inherit_wlh_from_target(dq, tq);
        }
        _dispatch_retain(tq);
       // 自定义的queue的目标队列是root队列
        dq->do_targetq = tq;
        _dispatch_object_debug(dq, "%s", __func__);
        return _dispatch_trace_queue_create(dq)._dq;
    }
    
    

    源码中大部分代码都不能从字面上了解其意思,但是这里有几个重要的函数我们可以找到,首先是创建一个root队列_dispatch_get_root_queue函数。取root队列

    static inline dispatch_queue_global_t
    _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
    {
        if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
            DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
        }
        return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
    }
    

    看下这个_dispatch_root_queues数组。我们可以看到,每一个优先级都有对应的root队列,每一个优先级又分为是不是可以过载的队列。

    // 6618342 Contact the team that owns the Instrument DTrace probe before
    //         renaming this symbol
    struct dispatch_queue_global_s _dispatch_root_queues[] = {
    #define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
            ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
            DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
            DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
    #define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
        [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
            DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \
            .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
            .do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \
            .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
            .dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \
                    _dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \
                    _dispatch_priority_make(DISPATCH_QOS_##n, 0)), \
            __VA_ARGS__ \
        }
        _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
            .dq_label = "com.apple.root.maintenance-qos",
            .dq_serialnum = 4,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.maintenance-qos.overcommit",
            .dq_serialnum = 5,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
            .dq_label = "com.apple.root.background-qos",
            .dq_serialnum = 6,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.background-qos.overcommit",
            .dq_serialnum = 7,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
            .dq_label = "com.apple.root.utility-qos",
            .dq_serialnum = 8,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.utility-qos.overcommit",
            .dq_serialnum = 9,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
            .dq_label = "com.apple.root.default-qos",
            .dq_serialnum = 10,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
                DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.default-qos.overcommit",
            .dq_serialnum = 11,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
            .dq_label = "com.apple.root.user-initiated-qos",
            .dq_serialnum = 12,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.user-initiated-qos.overcommit",
            .dq_serialnum = 13,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
            .dq_label = "com.apple.root.user-interactive-qos",
            .dq_serialnum = 14,
        ),
        _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
            .dq_label = "com.apple.root.user-interactive-qos.overcommit",
            .dq_serialnum = 15,
        ),
    };
    
    

    其中DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),解析到最后是OSdispatch##name##_class这样的这样的,对应的实例对象是如下代码,指定了root队列各个操作对应的函数。

    可以看到很熟悉的alloc 、init 操作了;
    _dispatch_object_alloc: 申请对应类型的内存;
    _dispatch_queue_init:初始化。

    首先看前者的实现:

    // 注意对于iOS并不满足 OS_OBJECT_HAVE_OBJC1
    void *
    _dispatch_object_alloc(const void *vtable, size_t size)
    {
    #if OS_OBJECT_HAVE_OBJC1
        const struct dispatch_object_vtable_s *_vtable = vtable;
        dispatch_object_t dou;
        dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
        dou._do->do_vtable = vtable;
        return dou._do;
    #else
        return _os_object_alloc_realized(vtable, size);
    #endif
    }
    
    // 接着看 _os_object_alloc_realized
    inline _os_object_t
    _os_object_alloc_realized(const void *cls, size_t size)
    {
        _os_object_t obj;
        dispatch_assert(size >= sizeof(struct _os_object_s));
        while (unlikely(!(obj = calloc(1u, size)))) {
            _dispatch_temporary_resource_shortage();
        }
        obj->os_obj_isa = cls;
        return obj;
    }
    

    再看一下 _os_objc_alloc

    
    _os_object_t
    _os_object_alloc(const void *cls, size_t size)
    {
        if (!cls) cls = &_os_object_vtable;
        return _os_object_alloc_realized(cls, size);
    }
    
    inline _os_object_t
    _os_object_alloc_realized(const void *cls, size_t size)
    {
        _os_object_t obj;
        dispatch_assert(size >= sizeof(struct _os_object_s));
        while (unlikely(!(obj = calloc(1u, size)))) {
            _dispatch_temporary_resource_shortage();
        }
        obj->os_obj_isa = cls;
        return obj;
    }
    

    返回值类型:结构体_os_object_s

    typedef struct _os_object_s {
        _OS_OBJECT_HEADER(
        const _os_object_vtable_s *os_obj_isa,
        os_obj_ref_cnt,
        os_obj_xref_cnt);
    } _os_object_s;
    
    
    // 系统对象头部定义宏
    #define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) 
            isa;    // isa指针
            int volatile ref_cnt; // gcd对象内部引用计数
            int volatile xref_cnt // gcd对象外部引用计数(内外部都要减到0时,对象会被释放)
    

    再看下_dispatch_queue_init函数,这里也就是做些初始化工作了

    static inline void _dispatch_queue_init(dispatch_queue_t dq, dispatch_queue_flags_t dqf,
            uint16_t width, uint64_t initial_state_bits)
    {
        uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
    
        dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
                DISPATCH_QUEUE_INACTIVE)) == 0);
    
        if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
            dq_state |= DISPATCH_QUEUE_INACTIVE + DISPATCH_QUEUE_NEEDS_ACTIVATION;
            dq_state |= DLOCK_OWNER_MASK;
            dq->do_ref_cnt += 2; 
        }
    
        dq_state |= (initial_state_bits & DISPATCH_QUEUE_ROLE_MASK);
        // 指向DISPATCH_OBJECT_LISTLESS是优化编译器的作用。只是为了生成更好的指令让CPU更好的编码
        dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
        dqf |= DQF_WIDTH(width);
        // dqf 保存进 dq->dq_atomic_flags
        os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
        dq->dq_state = dq_state;
        dq->dq_serialnum =
                os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
    }
    
      最后是_dispatch_introspection_queue_create函数,一个内省函数。
    
    dispatch_queue_t _dispatch_introspection_queue_create(dispatch_queue_t dq)
    {
        TAILQ_INIT(&dq->diq_order_top_head);
        TAILQ_INIT(&dq->diq_order_bottom_head);
        _dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
        TAILQ_INSERT_TAIL(&_dispatch_introspection.queues, dq, diq_list);
        _dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);
    
        DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
        if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
            _dispatch_introspection_queue_create_hook(dq);
        }
        return dq;
    }
    
    

    至此,一个队列的创建过程我们大致了解了。大致可以分为这么几点

      1. 设置队列优先级
      1. 默认创建的是一个串行队列
      1. 设置队列挂载的根队列。优先级不同根队列也不同
      1. 实例化vtable对象,这个对象给不同队列指定了push、wakeup等函数。

    3. 任务异步dispatch_async

    内部就是两个函数:

    • _dispatch_continuation_init:任务包装函数

    • _dispatch_continuation_async:并发处理函数

    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME; // 设置标识位
        dispatch_qos_t qos;
     // 任务的包装器  接收 保存 函数式 保存block
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    

    dispatch_continuation_init函数只是一个初始化,主要就是保存Block上下文,指定block的执行函数

    static inline dispatch_qos_t
    _dispatch_continuation_init(dispatch_continuation_t dc,
            dispatch_queue_class_t dqu, dispatch_block_t work,
            dispatch_block_flags_t flags, uintptr_t dc_flags)
    {
        // block对象赋值到dc_ctx
        void *ctxt = _dispatch_Block_copy(work); //拷贝任务
        dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            dc->dc_flags = dc_flags;
            dc->dc_ctxt = ctxt;//赋值
            // will initialize all fields but requires dc_flags & dc_ctxt to be set
            return _dispatch_continuation_init_slow(dc, dqu, flags);
        }
    
        dispatch_function_t func = _dispatch_Block_invoke(work);/封装work - 异步回调
        if (dc_flags & DC_FLAG_CONSUME) {
            func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
        }
        return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
    }
    

    _dispatch_call_block_and_release这个函数就是直接执行block了,所以dc->dc_func被调用的话就block会被直接执行了。

    void
    _dispatch_call_block_and_release(void *block)
    {
        void (^b)(void) = block;
        b();
        Block_release(b);
    }
    

    上面的初始化过程就是这样,接着看下_dispatch_continuation_async函数

    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    // 如果是用barrier插进来的任务或者是串行队列,直接将任务加入到队列
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc); 
    //trace 轨迹 跟踪器的意思  
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);//宏
    }
    

    宏:#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z),我们只关心dq_push

    全局搜索我们看到:

    DISPATCH_VTABLE_INSTANCE(queue,
        // This is the base class for queues, no objects of this type are made
        .do_type        = _DISPATCH_QUEUE_CLUSTER,
        .do_dispose     = _dispatch_object_no_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_object_no_invoke,
    
        .dq_activate    = _dispatch_queue_no_activate,
    );
    
    DISPATCH_VTABLE_INSTANCE(workloop,
        .do_type        = DISPATCH_WORKLOOP_TYPE,
        .do_dispose     = _dispatch_workloop_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_workloop_invoke,
    
        .dq_activate    = _dispatch_queue_no_activate,
        .dq_wakeup      = _dispatch_workloop_wakeup,
        .dq_push        = _dispatch_workloop_push,
    );
    
    DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
        .do_type        = DISPATCH_QUEUE_SERIAL_TYPE,
        .do_dispose     = _dispatch_lane_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_lane_invoke,
    
        .dq_activate    = _dispatch_lane_activate,
        .dq_wakeup      = _dispatch_lane_wakeup,
        .dq_push        = _dispatch_lane_push,
    );
    
    DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
        .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
        .do_dispose     = _dispatch_lane_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_lane_invoke,
    
        .dq_activate    = _dispatch_lane_activate,
        .dq_wakeup      = _dispatch_lane_wakeup,
        .dq_push        = _dispatch_lane_concurrent_push,
    );
    

    根据队列的类型不同,这里会对应的有不同的方法
    比如:
    并发队列:.dq_push = _dispatch_lane_concurrent_push,
    串行队列:.dq_push = _dispatch_lane_push,
    等等
    在项目中我们可以添加符号断点_dispatch_lane_concurrent_push验证如下:

    image.png

    所以我们先研究下:_dispatch_lane_concurrent_push

    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
            dispatch_qos_t qos)
    {
        if (dq->dq_items_tail == NULL &&
                !_dispatch_object_is_waiter(dou) &&
                !_dispatch_object_is_barrier(dou) &&
                _dispatch_queue_try_acquire_async(dq)) {
    // 进行一系列的判断后,进入该方法。
            return _dispatch_continuation_redirect_push(dq, dou, qos);
        }
    
        _dispatch_lane_push(dq, dou, qos);
    }
    
    

    符号断点调试可以发现会走_dispatch_continuation_redirect_push 这里:

    DISPATCH_NOINLINE
    static void
    _dispatch_continuation_redirect_push(dispatch_lane_t dl,
            dispatch_object_t dou, dispatch_qos_t qos)
    {
        if (likely(!_dispatch_object_is_redirection(dou))) {
            dou._dc = _dispatch_async_redirect_wrap(dl, dou);
        } else if (!dou._dc->dc_ctxt) {
            // find first queue in descending target queue order that has
            // an autorelease frequency set, and use that as the frequency for
            // this continuation.
            dou._dc->dc_ctxt = (void *)
            (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
        }
    
        dispatch_queue_t dq = dl->do_targetq;
        if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
        // 递归调用,
        // 原因在于GCD也是对象,也存在继承封装的问题,类似于 类 父类 根类的关系。
        dx_push(dq, dou, qos);
    }
    
    

    这里会发现又走到了dx_push,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法

    接下来,通过根类的_dispatch_root_queue_push符号断点,来验证猜想是否正确,从运行结果看出,完全是正确的

    进入_dispatch_root_queue_push -> _dispatch_root_queue_push_inline ->_dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow源码,经过符号断点验证,确实是走的这里,查看该方法的源码实现,主要有两步操作

    通过_dispatch_root_queues_init方法注册回调

    通过do-while循环创建线程,使用pthread_create方法

    DISPATCH_NOINLINE
    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
        int remaining = n;
        int r = ENOSYS;
    
        _dispatch_root_queues_init();//重点
        
        ...
        //do-while循环创建线程
        do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
            while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
                if (r != EAGAIN) {
                    (void)dispatch_assume_zero(r);
                }
                _dispatch_temporary_resource_shortage();
            }
        } while (--remaining);
        ...
    }
    

    首先我们先分析一下:_dispatch_root_queues_init

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
    }
    

    进入_dispatch_root_queues_init源码实现,发现是一个dispatch_once_f单例(这里不作说明,以后重点分析),其中传入的func是_dispatch_root_queues_init_once。

    _dispatch_root_queues_init_once 开始进入底层os的处理了。通过断点,bt打印堆栈信息如下:

    (lldb) bt
    * thread #7, queue = 'cooci', stop reason = breakpoint 1.1
      * frame #0: 0x0000000100449144 001---函数与队列`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x000000010044c0e0) at ViewController.m:48:9
        frame #1: 0x00000001004efb68 libdispatch.dylib`_dispatch_call_block_and_release + 32
        frame #2: 0x00000001004f15f0 libdispatch.dylib`_dispatch_client_callout + 20
        frame #3: 0x00000001004f44e4 libdispatch.dylib`_dispatch_continuation_pop + 584
        frame #4: 0x00000001004f3888 libdispatch.dylib`_dispatch_async_redirect_invoke + 692
        frame #5: 0x00000001005042ec libdispatch.dylib`_dispatch_root_queue_drain + 364
        frame #6: 0x0000000100504c94 libdispatch.dylib`_dispatch_worker_thread2 + 140
        frame #7: 0x00000001d52b48cc libsystem_pthread.dylib`_pthread_wqthread + 216
    (lldb) 
    

    综上所述,异步函数的底层分析如下

    【准备工作】:首先,将异步任务拷贝并封装,并设置回调函数func

    【block回调】:底层通过dx_push递归,会重定向到根队列,然后通过pthread_creat创建线程,最后通过dx_invoke执行block回调(注意dx_push 和 dx_invoke 是成对的)

    异步函数的底层分析流程如图所示


    4. 任务同步dispatch_sync

    dispatch_sync直接调用的是dispatch_sync_f

    void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        // 很大可能不会走if分支,看做if(_dispatch_block_has_private_data(work))
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_private_data(dq, work, 0);
        }
        dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
    }
    
    static void
    _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
            uintptr_t dc_flags)
    {
        _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    

    查看_dispatch_sync_f_inline源码,其中width = 1表示是串行队列,其中有两个重点:

    • 栅栏:_dispatch_barrier_sync_f(可以通过后文的栅栏函数底层分析解释),可以得出同步函数的底层实现其实是同步栅栏函数
    • 死锁:_dispatch_sync_f_slow,如果存在相互等待的情况,就会造成死锁
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
    // 串行队列会走到这个if分支
        if (likely(dq->dq_width == 1)) {
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
            
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    
    // 全局获取的并行队列或者绑定的是非调度线程的队列会走进这个if分支
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
    
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    

    第一种情况,串行队列:dispatch_barrier_sync_f

    DISPATCH_NOINLINE
    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();
    
     // 队列绑定的是非调度线程就会走这里
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                    DC_FLAG_BARRIER | dc_flags);
        }
    
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
    // 一般会走到这里
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    
    DISPATCH_NOINLINE
    static void
    _dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
            void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
    // 首先会执行这个函数
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
        if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
    // 内部其实就是唤醒队列
            return _dispatch_lane_barrier_complete(dq, 0, 0);
        }
    
        const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
                DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
                DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
                DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
        uint64_t old_state, new_state;
    
        // similar to _dispatch_queue_drain_try_unlock
       // 原子锁。检查dq->dq_state与old_state是否相等,如果相等把new_state赋值给dq->dq_state,如果不相等,把dq_state赋值给old_state。
        // 串行队列走到这里,dq->dq_state与old_state是相等的,会把new_state也就是闭包里的赋值的值给dq->dq_state
       
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state  = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
            new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
            new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
            if (unlikely(old_state & fail_unlock_mask)) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_lane_barrier_complete(dq, 0, 0);
                });
            }
        });
        if (_dq_state_is_base_wlh(old_state)) {
            _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
        }
    }
    

    自定义并行队列会走_dispatch_sync_invoke_and_complete函数。

    DISPATCH_NOINLINE
    static void
    _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
            dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
        _dispatch_sync_function_invoke_inline(dq, ctxt, func);
        _dispatch_trace_item_complete(dc);
      // 将自定义队列加入到root队列里去
        // dispatch_async也会调用此方法,之前我们初始化的时候会绑定一个root队列,这里就将我们新建的队列交给root队列进行管理
        
        _dispatch_lane_non_barrier_complete(dq, 0);
    }
    
    static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func)
    {
        dispatch_thread_frame_s dtf;
        _dispatch_thread_frame_push(&dtf, dq);
        // 执行任务
        _dispatch_client_callout(ctxt, func);
        _dispatch_perfmon_workitem_inc();
        _dispatch_thread_frame_pop(&dtf);
    }
    

    _dispatch_sync_f_slow 死锁
    进入_dispatch_sync_f_slow,当前的主队列是挂起、阻塞的

    DISPATCH_NOINLINE
    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
            dispatch_function_t func, uintptr_t top_dc_flags,
            dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
        dispatch_queue_t top_dq = top_dqu._dq;
        dispatch_queue_t dq = dqu._dq;
        if (unlikely(!dq->do_targetq)) {
            return _dispatch_sync_function_invoke(dq, ctxt, func);
        }
    
        pthread_priority_t pp = _dispatch_get_priority();
        struct dispatch_sync_context_s dsc = {
            .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
            .dc_func     = _dispatch_async_and_wait_invoke,
            .dc_ctxt     = &dsc,
            .dc_other    = top_dq,
            .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
            .dc_voucher  = _voucher_get(),
            .dsc_func    = func,
            .dsc_ctxt    = ctxt,
            .dsc_waiter  = _dispatch_tid_self(),
        };
    
        _dispatch_trace_item_push(top_dq, &dsc);
        __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
        if (dsc.dsc_func == NULL) {
            // dsc_func being cleared means that the block ran on another thread ie.
            // case (2) as listed in _dispatch_async_and_wait_f_slow.
            dispatch_queue_t stop_dq = dsc.dc_other;
            return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
        }
    
        _dispatch_introspection_sync_begin(top_dq);
        _dispatch_trace_item_pop(top_dq, &dsc);
        _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
                DISPATCH_TRACE_ARG(&dsc));
    }
    
    

    往一个队列中 加入任务,会push加入主队列,进入_dispatch_trace_item_push

    static inline void
    _dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
    {
        if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
            _dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
        }
    
        _dispatch_trace_item_push_inline(dqu._dq, _tail._do);
        _dispatch_introspection_queue_push(dqu, _tail);
    }
    

    进入DISPATCH_WAIT_FOR_QUEUE,判断dq是否为正在等待的队列,然后给出一个状态state,然后将dq的状态和当前任务依赖的队列进行匹配

    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
        uint64_t dq_state = _dispatch_wait_prepare(dq);
        if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
            DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                    "dispatch_sync called on queue "
                    "already owned by current thread");
        }
       .....省略 ....
    }
    

    进入_dq_state_drain_locked_by -> _dispatch_lock_is_locked_by源码

    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
        //异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
        //即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    

    如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相乘,如果相等,则会造成死锁

    同步函数 + 并发队列 顺序执行的原因

    在_dispatch_sync_invoke_and_complete -> _dispatch_sync_function_invoke_inline源码中,主要有三个步骤:

    • 将任务压入队列: _dispatch_thread_frame_push
    • 执行任务的block回调: _dispatch_client_callout
    • 将任务出队:_dispatch_thread_frame_pop

    从实现中可以看出,是先将任务push队列中,然后执行block回调,在将任务pop,所以任务是顺序执行的。

    综上所述,同步函数的底层分析如下

    • 同步函数的底层实现实际是同步栅栏函数

    • 同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁

    步骤如下:


    三 、总结

    分析了这一阶段的源码之后,我觉得了解下基本流程就行了,最后都来到底层函数封装的调用。接下来我们分析GCD的另外一部分,GCD底层原理下,单例,栅栏函数、信号量等等。

    相关文章

      网友评论

        本文标题:iOS 底层探索:多线程GCD底层探索(上)

        本文链接:https://www.haomeiwen.com/subject/wjjovktx.html