美文网首页
iOS GCD底层原理分析

iOS GCD底层原理分析

作者: 木扬音 | 来源:发表于2021-06-15 23:51 被阅读0次

    libdispatch.dylib源码地址https://opensource.apple.com/release/macos-1015.html

    队列创建

    • 在源码中搜索dispatch_queue_create
    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
        return _dispatch_lane_create_with_target(label, attr, DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    
    • 搜索_dispatch_lane_create_with_target(并进入
    DISPATCH_NOINLINE
    static dispatch_queue_t
    _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
            dispatch_queue_t tq, bool legacy)
    {
        if (!slowpath(dqa)) { // 如果是串行队列
            dqa = _dispatch_get_default_queue_attr(); // 串行队列的attr 取默认attr
        } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) { // 并行队列的 attr->do_vtable 应该等于 DISPATCH_VTABLE(queue_attr)
            DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
        }
    
        // dqai 创建 -
        dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
        
        //第一步:规范化参数,例如qos, overcommit, tq
        // dispatch_qos_t qos是优先级?
        dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
        // 是否overcommit(即queue创建的线程数是否允许超过实际的CPU个数)
        _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
        if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) { //
            if (tq->do_targetq) { // overcommit 的queue 必须是全局的
                DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                        "a non-global target queue");
            }
        }
    
        // 下面这些代码,因为用户创建的queue的tq一定为NULL,因此,只要关注tq == NULL的分支即可,我们删除了其余分支
        if (!tq) { // 自己创建的queue,tq都是null
            tq = _dispatch_get_root_queue( // 在root queue里面去取一个合适的queue当做target queue
                    qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 无论是用户创建的串行还是并行队列,其qos都没有指定,因此,qos这里都取DISPATCH_QOS_DEFAULT
                    overcommit == _dispatch_queue_attr_overcommit_enabled);
            if (slowpath(!tq)) { // 如果根据create queue是传入的属性无法获取到对应的tq,crash
                DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
            }
        }
    
        ...
        
        //拼接队列名称
        // 根据不同的queue类型,设置vtable。vtable实现了SERIAL queue 和 CONCURRENT queue的行为差异。
        const void *vtable;
        dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
        if (dqai.dqai_concurrent) { // 并发
            // OS_dispatch_queue_concurrent
            vtable = DISPATCH_VTABLE(queue_concurrent);
        } else {// 串行
            vtable = DISPATCH_VTABLE(queue_serial);
        }
        
        ....
        
        //创建队列,并初始化
        dispatch_lane_t dq = _dispatch_object_alloc(vtable,
                sizeof(struct dispatch_lane_s)); // alloc
        //根据dqai.dqai_concurrent的值,就能判断队列 是 串行 还是并发,对于并发队列,其queue width是DISPATCH_QUEUE_WIDTH_MAX,而串行队列其width是1
        _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
                DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
                (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
        //设置队列label标识符
        dq->dq_label = label;//设置dq的名字
        dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos, dqai.dqai_relpri);//优先级处理
        
        ...
        
        //类似于类与元类的绑定,不是直接的继承关系,而是类似于模型与模板的关系
        dq->do_targetq = tq;
        _dispatch_object_debug(dq, "%s", __func__);
        return _dispatch_trace_queue_create(dq)._dq;//研究dq
    }
    

    _dispatch_lane_create_with_target源码分析

    -【第一步】_dispatch_queue_attr_to_info方法传入daq(队列类型)创建_dispatch_queue_attr_to_info_t类型的对象dqai,用于存储队列的相关属性信息

    _dispatch_queue_attr_to_info

    -【第二步】设置队列的相关属性,例如服务质量qos是否overcommit(队列创建的线程数是否允许超过实际的CPU个数)、tq(自己创建的队列tq一定为NULL)

    -【第三步】DISPATCH_VTABLE宏定义拼接队列名字,OS_dispatch_+队列类型=队列名字
    - 串行队列:OS_dispatch_queue_serial
    - 并发队列:OS_dispatch_queue_concurrent

    #define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
    
    #define DISPATCH_OBJC_CLASS(name)   (&DISPATCH_CLASS_SYMBOL(name))
    
    #define DISPATCH_CLASS(name) OS_dispatch_##name
    

    -【第四步】初始化队列dq(alloc init),dqai.dqai_concurrent 可以判断队列类型,DISPATCH_QUEUE_WIDTH_MAX==并发,1==串行
    - 进入_dispatch_object_alloc -> _os_object_alloc_realized方法中,发现里面设置了isa指向,说明了队列是对象

    _os_object_alloc_realized
    - 进入_dispatch_queue_init方法,初始化队列相关属性
    _dispatch_queue_init

    -【第五步】通过_dispatch_trace_queue_create处理创建好的dq队列

    _dispatch_trace_queue_create
    - 进入_dispatch_introspection_queue_create_hook -> dispatch_introspection_queue_get_info -> _dispatch_introspection_lane_get_info方法,实现了一个模板队列
    _dispatch_introspection_lane_get_info

    总结

    • dispatch_queue_create中队列类型决定了下层dq_width的值(max=并发,1=串行
    • 队列queue对象,通过alloc+init创建,在alloc中有个class(宏定义)指定isa的指向
    • 队列在底层是通过模板创建的,类型是结构体dispatch_introspection_queue_s
      队列创建分析

    异步函数

    进入dispatch_async的源码实现

    • _dispatch_continuation_init:任务包装函数
    • _dispatch_continuation_async:并发处理函数
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)//work 任务
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME;
        dispatch_qos_t qos;
    
        // 任务包装器(work在这里才有使用) - 接受work - 保存work - 并函数式编程
        // 保存 block 
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        //并发处理
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    

    _dispatch_continuation_init任务包装

    DISPATCH_ALWAYS_INLINE
    static inline dispatch_qos_t
    _dispatch_continuation_init(dispatch_continuation_t dc,
            dispatch_queue_class_t dqu, dispatch_block_t work,
            dispatch_block_flags_t flags, uintptr_t dc_flags)
    {
        void *ctxt = _dispatch_Block_copy(work);//拷贝任务
    
        dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            dc->dc_flags = dc_flags;
            dc->dc_ctxt = ctxt;//赋值
            // will initialize all fields but requires dc_flags & dc_ctxt to be set
            return _dispatch_continuation_init_slow(dc, dqu, flags);
        }
    
        dispatch_function_t func = _dispatch_Block_invoke(work);//封装work - 异步回调
        if (dc_flags & DC_FLAG_CONSUME) {
            func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
        }
        return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
    }
    
    • 通过_dispatch_Block_copy拷贝任务
    • 通过_dispatch_Block_invoke封装任务,是一个异步回调函数
    • 如果是同步,则回调函数赋值为_dispatch_call_block_and_release
    • 通过_dispatch_continuation_init_f方法赋值,将func任务保存在属性中
      _dispatch_continuation_init_f

    _dispatch_continuation_async并发处理

    主要是执行block回调 ,进入_dispatch_continuation_async源码

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);//跟踪日志
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);//与dx_invoke一样,都是宏
    }
    
    • 关键代码dx_push是个宏
    #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    
    • dx_push根据队列类型执行不同的函数

      dq_push
    • 进入_dispatch_root_queue_push -> _dispatch_root_queue_push_inline ->_dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow源码,

      • 通过_dispatch_root_queues_init注册回调
      • 使用pthread_create方法通过do-while循环创建线程
    DISPATCH_NOINLINE
    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
        int remaining = n;
        int r = ENOSYS;
    
        _dispatch_root_queues_init();//重点
        
        ...
        //do-while循环创建线程
        do {
            _dispatch_retain(dq); // released in _dispatch_worker_thread
            while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
                if (r != EAGAIN) {
                    (void)dispatch_assume_zero(r);
                }
                _dispatch_temporary_resource_shortage();
            }
        } while (--remaining);
        
        ...
    }
    

    _dispatch_root_queues_init

    通过dispatch_once_f单例实现

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_root_queues_init(void)
    {
        dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
    }
    
    • 进入_dispatch_root_queues_init_once源码,发现内部不同事物的调用句柄都是_dispatch_worker_thread2

      _dispatch_root_queues_init_once
    • block的回调执行路径是_dispatch_root_queues_init_once ->_dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline -> _dispatch_client_callout -> dispatch_call_block_and_release,可以通过bt打印堆栈信息,

      bt打印堆栈

    总结

    • 将异步任务拷贝并封装后,设置回调函数func
    • 通过dx_push递归,重定向到跟队列,通过pthread_creat方法do-while循环创建线程,通过dx_invoke执行回调,dx_pushdx_invoke是一一对应的
      异步函数

    同步函数

    进入dispatch_sync源码,发现底层是通过栅栏函数实现的

    DISPATCH_NOINLINE
    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    • 进入_dispatch_sync_f源码

      _dispatch_sync_f
    • 进入_dispatch_sync_f_inline源码,dq->dq_width == 1表示串行队列

      • 栅栏函数_dispatch_barrier_sync_f
      • 死锁_dispatch_sync_f_slow,如果存在相互等待的情况下会死锁
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        if (likely(dq->dq_width == 1)) {//表示是串行队列
            return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);//栅栏
        }
    
        if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
            DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
        }
    
        dispatch_lane_t dl = upcast(dq)._dl;
        // Global concurrent queues and queues bound to non-dispatch threads
        // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
            return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
        }
    
        if (unlikely(dq->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);//处理当前信息
        _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
                _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));//block执行并释放
    }
    

    _dispatch_sync_f_slow死锁

    • 进入_dispatch_sync_f_slow,当前主队列挂起、阻塞

      _dispatch_sync_f_slow
    • 向队列中添加任务,push加入主队列,_dispatch_trace_item_push

      _dispatch_trace_item_push
    • 进入__DISPATCH_WAIT_FOR_QUEUE__,判断dqu.dq是否为正在等待的主队列,然后将dq的状态和当前任务依赖的队列进行匹配

      __DISPATCH_WAIT_FOR_QUEUE__
    • 进入_dq_state_drain_locked_by -> _dispatch_lock_is_locked_by源码,如果当前等待的队列和正在执行任务的是同一个队列,即线程ID是否相同,如果相同会造成死锁

    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
        // equivalent to _dispatch_lock_owner(lock_value) == tid
        //异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
        //即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
        return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    

    总结

    • 同步函数底层实现是同步栅栏函数
    • 如果正在执行任务的队列和等待的是同一个队列,会造成相互等待的局面,形成死锁
    同步函数流程图

    单例

    进入dispatch_once源码实现,发现底层是通过dispatch_once_f实现的

    // val -- 静态变量,由于不同位置定义的静态变量是不同的,所以静态变量具有唯一性
    //block -- block回调
    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
        dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    
    • 进入dispatch_once_f源码,其中的val是外界传入的onceToken静态变量,func_dispatch_Block_invoke(block)
    DISPATCH_NOINLINE
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
        //静态变量
        dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    //os_atomic_load获取当前任务的标识符
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        uintptr_t v = os_atomic_load(&l->dgo_once, acquire);//原子属性关联
        if (likely(v == DLOCK_ONCE_DONE)) {//已经执行过了,直接返回
            return;
        }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
        if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        //任务执行后,加锁失败,重新存储,将标识符== DLOCK_ONCE_DONE
            return _dispatch_once_mark_done_if_quiesced(l, v);
        }
    #endif
    #endif
        if (_dispatch_once_gate_tryenter(l)) {
          //尝试进入任务,解锁,执行block回调
            return _dispatch_once_callout(l, ctxt, func);
        }
    //如果此时有任务1正在执行,再次进来一个任务2,则`任务2`进入无限次等待
        return _dispatch_once_wait(l);
    }
    

    _dispatch_once_gate_tryenter解锁

    底层通过os_atomic_cmpxchg方法进行标识符对比,如果对比没问题进行解锁,即任务的标识符置为DLOCK_ONCE_UNLOCKED

    DISPATCH_ALWAYS_INLINE
    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
        return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
                (uintptr_t)_dispatch_lock_value_for_self(), relaxed);//首先对比,然后进行改变
    }
    

    _dispatch_once_callout 回调

    进入_dispatch_once_callout源码

    • 【第一步】_dispatch_client_callout:block回调
    • 【第二步】_dispatch_once_gate_broadcast:广播
    DISPATCH_NOINLINE
    static void
    _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
            dispatch_function_t func)
    {
        _dispatch_client_callout(ctxt, func);//block调用执行
        _dispatch_once_gate_broadcast(l);//进行广播:告诉别人有了归属,不要找我了
    }
    
    • 进入_dispatch_client_callout源码,其中f==_dispatch_Block_invoke(block),即异步回调
    #undef _dispatch_client_callout
    void
    _dispatch_client_callout(void *ctxt, dispatch_function_t f)
    {
        @try {
            return f(ctxt);
        }
        @catch (...) {
            objc_terminate();
        }
    }
    
    • 进入_dispatch_once_gate_broadcast -> _dispatch_once_mark_done,将任务的标识为DLOCK_ONCE_DONE,即加锁
    DISPATCH_ALWAYS_INLINE
    static inline uintptr_t
    _dispatch_once_mark_done(dispatch_once_gate_t dgo)
    {
        //如果不相同,直接改为相同,然后上锁 -- DLOCK_ONCE_DONE
        return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
    }
    

    总结

    • 【只执行一次的原理】:GCD单例中有两个参数onceToken 、block,其中onceToken是静态变量,具有唯一性,在底层封装成dispatch_once_gate_t类型的变量l变量l主要是用来获取底层原子属性关联,即变量v,通过变量v来查询任务状态,如果变量v == DLOCK_ONCE_DONE,说明任务已经处理了,直接return

    • 【block调用时机】:如果任务没有被执行,底层会通过C++函数的比较,任务状态置为DLOCK_ONCE_UNLOCK,确保当前任务执行的唯一性,之后进行block回调,将任务置状态为DLOCK_ONCE_DONE,确保下次进来不会执行

    • 【多线程影响】:如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的

    单例原理流程图

    栅栏函数

    • 控制任务执行顺序,可以让其同步执行
    • 只能控制同一并发队列,只会阻塞一次
    • 同步栅栏添加进队列时,当前线程会被加锁,直到同步栅栏之前的任务和同步栅栏本身的任务都执行完毕,当前线程才会解锁
    • 只有使用自定义队列才有意义,如果使用串行队列或者全局并发队列,这个栅栏函数的作用==同步函数,没有任何意义,还会耗费更多性能
    • dispatch_barrier_sync 同步栅栏函数阻塞线程,影响主线程任务执行
    • dispatch_barrier_async 异步栅栏函数阻塞的是队列且必须是自定义的并发队列,不影响主线程任务的执行
    同步栅栏函数 异步栅栏函数

    异步栅栏函数 底层分析

    进入dispatch_barrier_async源码,发现底层和dispatch_async类似

    #ifdef __BLOCKS__
    void
    dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
        _dispatch_continuation_async(dq, dc, qos, dc_flags);
    }
    #endif
    

    同步栅栏函数 底层分析

    进入dispatch_barrier_sync源码

    void
    dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
        uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
        if (unlikely(_dispatch_block_has_private_data(work))) {
            return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
        }
        _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    _dispatch_barrier_sync_f_inline

    进入_dispatch_barrier_sync_f --> _dispatch_barrier_sync_f_inline源码

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
            dispatch_function_t func, uintptr_t dc_flags)
    {
        dispatch_tid tid = _dispatch_tid_self();//获取线程的id,即线程的唯一标识
        
        ...
        
        //判断线程状态,需不需要等待,是否回收
        if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {//栅栏函数也会死锁
            return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,//没有回收
                    DC_FLAG_BARRIER | dc_flags);
        }
        //验证target是否存在,如果存在,加入栅栏函数的递归查找 是否等待
        if (unlikely(dl->do_targetq->do_targetq)) {
            return _dispatch_sync_recurse(dl, ctxt, func,
                    DC_FLAG_BARRIER | dc_flags);
        }
        _dispatch_introspection_sync_begin(dl);
        _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
                DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                        dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));//执行
    }
    

    主要分为以下几步

    • 通过_dispatch_tid_self 获取线程ID

    • 通过_dispatch_queue_try_acquire_barrier_sync判断线程状态

      _dispatch_queue_try_acquire_barrier_sync
      • 进入_dispatch_queue_try_acquire_barrier_sync_and_suspend源码,在这里释放
        _dispatch_queue_try_acquire_barrier_sync_and_suspend
    • 通过_dispatch_sync_recurse递归查找栅栏函数的target

    • 通过_dispatch_introspection_sync_begin向前信息进行处理

    • 通过_dispatch_lane_barrier_sync_invoke_and_complete执行block并释放

      _dispatch_lane_barrier_sync_invoke_and_complete

    信号量

    • 使任务同步执行,类似互斥锁
    • 控制GCD最大并发数
    dispatch_semaphore_create 创建

    初始化信号量,设置GCD最大并发数且最大并发数必须大于0

    dispatch_semaphore_t
    dispatch_semaphore_create(long value)
    {
        dispatch_semaphore_t dsema;
    
        // If the internal value is negative, then the absolute of the value is
        // equal to the number of waiting threads. Therefore it is bogus to
        // initialize the semaphore with a negative value.
        if (value < 0) {
            return DISPATCH_BAD_INPUT;
        }
    
        dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
                sizeof(struct dispatch_semaphore_s));
        dsema->do_next = DISPATCH_OBJECT_LISTLESS;
        dsema->do_targetq = _dispatch_get_default_queue(false);
        dsema->dsema_value = value;
        _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        dsema->dsema_orig = value;
        return dsema;
    }
    
    dispatch_semaphore_wait 加锁

    主要作用是对信号量dsema 通过os_atomic_dec2o进行--操作,内部执行C++函数atomic_fetch_sub_explicit

    • value >= 0,执行成功
    • value == LONG_MIN,系统抛出crash
    • value < 0,进入长等待
    long
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
        // dsema_value 进行 -- 操作
        long value = os_atomic_dec2o(dsema, dsema_value, acquire);
        if (likely(value >= 0)) {//表示执行操作无效,即执行成功
            return 0;
        }
        return _dispatch_semaphore_wait_slow(dsema, timeout);//长等待
    }
    
    • 进入_dispatch_semaphore_wait_slow源码,当value<0时,更加等待事件timeout做出不同操作
      _dispatch_semaphore_wait_slow
    dispatch_semaphore_signal解锁

    通过os_atomic_inc2o函数对value进行了++操作,os_atomic_inc2o内部是通过C++的atomic_fetch_add_explicit

    • value > 0,执行成功
    • value == 0,进入长等待
    long
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
        //signal 对 value是 ++
        long value = os_atomic_inc2o(dsema, dsema_value, release);
        if (likely(value > 0)) {//返回0,表示当前的执行操作无效,相当于执行成功
            return 0;
        }
        if (unlikely(value == LONG_MIN)) {
            DISPATCH_CLIENT_CRASH(value,
                    "Unbalanced call to dispatch_semaphore_signal()");
        }
        return _dispatch_semaphore_signal_slow(dsema);//进入长等待
    }
    

    总结

    • dispatch_semaphore_create 初始化信号量,设置最大并发数
    • dispatch_semaphore_wait 对信号量value--,加锁
    • dispatch_semaphore_signal 对信号量value++,解锁
      信号量流程图

    调度组

    • 控制任务执行顺序
    dispatch_group_create 创建组 
    dispatch_group_async 进组任务 
    dispatch_group_notify 进组任务执行完毕通知 dispatch_group_wait 进组任务执行等待时间
    
    //进组和出组一般是成对使用的
    dispatch_group_enter 进组 
    dispatch_group_leave 出组
    
    dispatch_group_create 创建组
    • 进入dispatch_group_create源码
    dispatch_group_t
    dispatch_group_create(void)
    {
        return _dispatch_group_create_with_count(0);
    }
    
    • 进入_dispatch_group_create_with_count源码,对group对象属性赋值,并返回group
    DISPATCH_ALWAYS_INLINE
    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
        //创建group对象,类型为OS_dispatch_group
        dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
                sizeof(struct dispatch_group_s));
        //group对象赋值
        dg->do_next = DISPATCH_OBJECT_LISTLESS;
        dg->do_targetq = _dispatch_get_default_queue(false);
        if (n) {
            os_atomic_store2o(dg, dg_bits,
                    (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
            os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
        }
        return dg;
    }
    
    dispatch_group_enter 进组

    进入dispatch_group_enter源码,通过os_atomic_sub_orig2odg->dg.bits--操作,对数值进行处理

    void
    dispatch_group_enter(dispatch_group_t dg)
    {
        // The value is decremented on a 32bits wide atomic so that the carry
        // for the 0 -> -1 transition is not propagated to the upper 32bits.
        uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,//原子递减 0 -> -1
                DISPATCH_GROUP_VALUE_INTERVAL, acquire);
        uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
        if (unlikely(old_value == 0)) {//如果old_value
            _dispatch_retain(dg); // <rdar://problem/22318411>
        }
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {//到达临界值,会报crash
            DISPATCH_CLIENT_CRASH(old_bits,
                    "Too many nested calls to dispatch_group_enter()");
        }
    }
    
    dispatch_group_leave 出组

    进入dispatch_group_leave源码,通过os_atomic_add_orig2o对dg-> dg_state 作 `++操作

    • -1到0,++
    • 根据状态,do_while循环,唤醒block执行任务
    • 如果0 + 1 = 1,enter-leave不平衡,即leave多次调用,会crash
    void
    dispatch_group_leave(dispatch_group_t dg)
    {
        // The value is incremented on a 64bits wide atomic so that the carry for
        // the -1 -> 0 transition increments the generation atomically.
        uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,//原子递增 ++
                DISPATCH_GROUP_VALUE_INTERVAL, release);
        uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
        //根据状态,唤醒
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
            old_state += DISPATCH_GROUP_VALUE_INTERVAL;
            do {
                new_state = old_state;
                if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                    new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                } else {
                    // If the group was entered again since the atomic_add above,
                    // we can't clear the waiters bit anymore as we don't know for
                    // which generation the waiters are for
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                }
                if (old_state == new_state) break;
            } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                    old_state, new_state, &old_state, relaxed)));
            return _dispatch_group_wake(dg, old_state, true);//唤醒
        }
        //-1 -> 0, 0+1 -> 1,即多次leave,会报crash,简单来说就是enter-leave不平衡
        if (unlikely(old_value == 0)) {
            DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                    "Unbalanced call to dispatch_group_leave()");
        }
    }
    
    • 进入_dispatch_group_wake源码,do_while循环进行一步命中,调用_dispatch_continuation_async
    DISPATCH_NOINLINE
    static void
    _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
    {
        uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
        if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
            dispatch_continuation_t dc, next_dc, tail;
    
            // Snapshot before anything is notified/woken <rdar://problem/8554546>
            dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
            do {
                dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
                next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
                _dispatch_continuation_async(dsn_queue, dc,
                        _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);//block任务执行
                _dispatch_release(dsn_queue);
            } while ((dc = next_dc));//do-while循环,进行异步任务的命中
    
            refs++;
        }
    
        if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
            _dispatch_wake_by_address(&dg->dg_gen);//地址释放
        }
    
        if (refs) _dispatch_release_n(dg, refs);//引用释放
    }
    
    • 进入_dispatch_continuation_async源码,与异步函数的block回调一致
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);//跟踪日志
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);//与dx_invoke一样,都是宏
    }
    
    dispatch_group_notify 通知

    进入dispatch_group_notify源码,如果old_state==0,就可以进行释放了,

    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dsn)
    {
        uint64_t old_state, new_state;
        dispatch_continuation_t prev;
    
        dsn->dc_data = dq;
        _dispatch_retain(dq);
        //获取dg底层的状态标识码,通过os_atomic_store2o获取的值,即从dg的状态码 转成了 os底层的state
        prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
        os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) {
            os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
                new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
                if ((uint32_t)old_state == 0) { //如果等于0,则可以进行释放了
                    os_atomic_rmw_loop_give_up({
                        return _dispatch_group_wake(dg, new_state, false);//唤醒
                    });
                }
            });
        }
    }
    
    dispatch_group_async

    进入dispatch_group_async源码,包装任务和异步处理任务,底层实际就是enter-leave

    #ifdef __BLOCKS__
    void
    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
        dispatch_qos_t qos;
        //任务包装器
        qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
        //处理任务
        _dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    #endif
    
    • 进入_dispatch_continuation_group_async源码,封装了dispatch_group_enter进组操作
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc, dispatch_qos_t qos)
    {
        dispatch_group_enter(dg);//进组
        dc->dc_data = dg;
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);//异步操作
    }
    
    • 搜索_dispatch_client_callout的调用,在_dispatch_continuation_with_group_invoke
    DISPATCH_ALWAYS_INLINE
    static inline void
    _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
    {
        struct dispatch_object_s *dou = dc->dc_data;
        unsigned long type = dx_type(dou);
        if (type == DISPATCH_GROUP_TYPE) {//如果是调度组类型
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);//block回调
            _dispatch_trace_item_complete(dc);
            dispatch_group_leave((dispatch_group_t)dou);//出组
        } else {
            DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
        }
    

    总结

    • enter-leave必须成对出现
    • dispatch_group_enter底层通过C++函数,对group的value进行--操作(0->-1)
    • dispatch_group_leave底层通过C++函数,对group的value进行++操作(-1 ->0)
    • dispatch_group_notify底层通过判断group的state是否等于0,当==0时来进行通知
    • block中任务的唤醒可以通过dispatch_group_leavedispatch_group_notify
    • dispatch_group_async底层得实现就是enter-leave
      调度组原理图

    dispatch_source

    • 基础数据类型,用于协调特定底层系统事件的处理

    • 代替异步回调函数,来处理系统相关的事件,当配置一个dispatch时,需要制定检测的事件,dispatch queue处理事件的代码(block或函数),当事件发生时,dispatch source会提交你的block或函数到queue中执行

    • 相对于dispatch_async的优势是联结和CPU负荷小,简单来说就是由你调用dispatch_source_merge_data函数来向自己发送信号

      • 联结:在任一线程上调度它的一个函数dispatch_source_merge_data,就会执行dispatch source事先定义好的句柄(block),这个过程叫做Custom event ,用户事件
      • 句柄:一种指向指针的指针,指向的就是一个类或者结构,和系统有密切关系
        • 实例句柄 HINSTANCE
        • 位图句柄 HBITMAP
        • 设备表句柄 HDC
        • 图标句柄 HICON

    使用

    • type :dispatch处理的事件
    • handle :可以理解为句柄、索引或id,假如要监听进程,需要传入进程的ID
    • mask :可以理解为描述,提供更详细的描述,让它知道具体要监听什么
    • queue :自定义源需要的一个队列,用来处理所有的响应句柄
    dispatch_source_t source = dispatch_source_create(dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue)
    

    Dispatch Source 种类

    • DISPATCH_SOURCE_TYPE_DATA_ADD:自定义的事件,变量增加,当同一时间,一个事件的的触发频率很高,那么Dispatch Source会将这些响应以ADD的方式进行累积,然后等系统空闲时最终处理,如果触发频率比较零散,那么Dispatch Source会将这些事件分别响应。
    • DISPATCH_SOURCE_TYPE_DATA_OR:自定义的事件,和DISPATCH_SOURCE_TYPE_DATA_ADD一样,但是是以OR的方式进行累积
    • DISPATCH_SOURCE_TYPE_MACH_SEND:MACH端口发送
    • DISPATCH_SOURCE_TYPE_MACH_RECV:MACH端口接收
    • DISPATCH_SOURCE_TYPE_MEMORYPRESSURE:内存压力 (注:iOS8后可用)
    • DISPATCH_SOURCE_TYPE_PROC:进程监听,如进程的退出、创建一个或更多的子线程、进程收到UNIX信号
    • DISPATCH_SOURCE_TYPE_READ:IO操作,如对文件的操作、socket操作的读响应
    • DISPATCH_SOURCE_TYPE_SIGNAL:接收到UNIX信号时响应
    • DISPATCH_SOURCE_TYPE_TIMER:定时器
    • DISPATCH_SOURCE_TYPE_VNODE:文件状态监听,文件被删除、移动、重命名
    • DISPATCH_SOURCE_TYPE_WRITE :IO操作,如对文件的操作、socket操作的写响应

    常见函数

    //挂起队列
    dispatch_suspend(queue) 
    
    //分派源创建时默认处于暂停状态,在分派源分派处理程序之前必须先恢复
    dispatch_resume(source) 
    
    //向分派源发送事件,需要注意的是,不可以传递0值(事件不会被触发),同样也不可以传递负数。
    dispatch_source_merge_data 
    
    //设置响应分派源事件的block,在分派源指定的队列上运行
    dispatch_source_set_event_handler 
    
    //得到分派源的数据
    dispatch_source_get_data 
    
    //得到dispatch源创建,即调用dispatch_source_create的第二个参数
    uintptr_t dispatch_source_get_handle(dispatch_source_t source); 
    
    //得到dispatch源创建,即调用dispatch_source_create的第三个参数
    unsigned long dispatch_source_get_mask(dispatch_source_t source); 
    
    ////取消dispatch源的事件处理--即不再调用block。如果调用dispatch_suspend只是暂停dispatch源。
    void dispatch_source_cancel(dispatch_source_t source); 
    
    //检测是否dispatch源被取消,如果返回非0值则表明dispatch源已经被取消
    long dispatch_source_testcancel(dispatch_source_t source); 
    
    //dispatch源取消时调用的block,一般用于关闭文件或socket等,释放相关资源
    void dispatch_source_set_cancel_handler(dispatch_source_t source, dispatch_block_t cancel_handler); 
    
    //可用于设置dispatch源启动时调用block,调用完成后即释放这个block。也可在dispatch源运行当中随时调用这个函数。
    void dispatch_source_set_registration_handler(dispatch_source_t source, dispatch_block_t registration_handler); 
    

    使用场景

    用于验证码倒计时,因为dispatch_source不依赖于Runloop,而是直接和底层内核交互,准确性更高。

    - (void)use033{
        //倒计时时间
        __block int timeout = 3;
        
        //创建队列
        dispatch_queue_t globalQueue = dispatch_get_global_queue(0, 0);
        
        //创建timer
        dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, globalQueue);
        
        //设置1s触发一次,0s的误差
        /*
         - source 分派源
         - start 数控制计时器第一次触发的时刻。参数类型是 dispatch_time_t,这是一个opaque类型,我们不能直接操作它。我们得需要 dispatch_time 和 dispatch_walltime 函数来创建它们。另外,常量 DISPATCH_TIME_NOW 和 DISPATCH_TIME_FOREVER 通常很有用。
         - interval 间隔时间
         - leeway 计时器触发的精准程度
         */
        dispatch_source_set_timer(timer,dispatch_walltime(NULL, 0),1.0*NSEC_PER_SEC, 0);
        
         //触发的事件
        dispatch_source_set_event_handler(timer, ^{
            //倒计时结束,关闭
            if (timeout <= 0) {
                //取消dispatch源
                dispatch_source_cancel(timer);
            }else{
                timeout--;
                
                dispatch_async(dispatch_get_main_queue(), ^{
                    //更新主界面的操作
                    NSLog(@"倒计时 - %d", timeout);
                });
            }
        });
        
        //开始执行dispatch源
        dispatch_resume(timer);
    }
    

    相关文章

      网友评论

          本文标题:iOS GCD底层原理分析

          本文链接:https://www.haomeiwen.com/subject/qpuieltx.html