美文网首页iOSiOS 进阶开发
GCD源码分析(二)

GCD源码分析(二)

作者: Cooci_和谐学习_不急不躁 | 来源:发表于2019-03-15 16:44 被阅读191次
GCD源码解析

dispatch_group_async

同样从入口看起

void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;

    _dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc);
}

  同样是_dispatch_continuation_init函数,这里跟dispatch_async那里一毛一样,忘记了的话,往回看。我们接着往下看,_dispatch_continuation_group_async函数

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc)
{
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc);
}

我们发现,其实dispatch_group_async内部也是加了dispatch_group_enter函数。dispatch_group_async怎么初始化我们至此已经说明完毕。
  后面取出执行block逻辑跟dispatch_async略微不同,前面部分不做多说,调用顺序跟dispatch_async是一样的,唯一不同在于_dispatch_continuation_invoke_inline这个函数。

static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
        dispatch_invoke_flags_t flags)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;

        _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
        if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        // 这次走if语句
        if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) {
            _dispatch_continuation_with_group_invoke(dc);
        } else {
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_introspection_queue_item_complete(dou);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
        // 执行任务
        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        _dispatch_introspection_queue_item_complete(dou);
        // 调用dispatch_group_leave
        dispatch_group_leave((dispatch_group_t)dou);
    } else {
        DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}

  我们有必要看下dispatch_group_enter和dispatch_group_leave函数。

// dispatch_group_enter里面没啥说的,也就dg->dg_value值加1
void dispatch_group_enter(dispatch_group_t dg)
{
    long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
    if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) {
        DISPATCH_CLIENT_CRASH(value,
                "Too many nested calls to dispatch_group_enter()");
    }
    if (value == 0) {
        _dispatch_retain(dg); 
    }
}

void dispatch_group_leave(dispatch_group_t dg)
{
    long value = os_atomic_dec2o(dg, dg_value, release);
    // 如果没有等待者,则调用_dispatch_group_wake函数
    if (slowpath(value == 0)) {
        return (void)_dispatch_group_wake(dg, true);
    }
    // 小于0就会crash,所以dispatch_group_enter和dispatch_group_leave必须匹配,不然就crash了。
    if (slowpath(value < 0)) {
        DISPATCH_CLIENT_CRASH(value,
                "Unbalanced call to dispatch_group_leave()");
    }
}

static long _dispatch_group_wake(dispatch_group_t dg, bool needs_release)
{
    dispatch_continuation_t next, head, tail = NULL;
    long rval;


    head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
    if (head) {
        tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
    }
    // dg->dg_waiters赋值为0,并返回dg->dg_waiters之前的值
    rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
    // 如果之前还有等待者
    if (rval) {
        // 创建信号量
        _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
        // 发出信号
        _dispatch_sema4_signal(&dg->dg_sema, rval);
    }
    uint16_t refs = needs_release ? 1 : 0; 
    // dispatch_group里是否有任务等待执行,有的话加入。
    // 比如dispatch_group_notify的任务就在此时被唤醒
    if (head) {

        do {
            next = os_mpsc_pop_snapshot_head(head, tail, do_next);
            dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
            _dispatch_continuation_async(dsn_queue, head);
            _dispatch_release(dsn_queue);
        } while ((head = next));
        refs++;
    }
    if (refs) _dispatch_release_n(dg, refs);
    return 0;
}

dispatch_once

还是从入口函数开始看

// 我们调用dispatch_once的入口
void dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    // 内部又调用了dispatch_once_f函数
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

DISPATCH_NOINLINE
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
    return dispatch_once_f_slow(val, ctxt, func);
}

DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    // _dispatch_once_waiter_t格式:
    // typedef struct _dispatch_once_waiter_s {
    //      volatile struct _dispatch_once_waiter_s *volatile dow_next;
    //      dispatch_thread_event_s dow_event;
    //      mach_port_t dow_thread;
    // } *_dispatch_once_waiter_t;


    // volatile:告诉编译器不要对此指针进行代码优化,因为这个指针指向的值可能会被其他线程改变
    _dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
    struct _dispatch_once_waiter_s dow = { };
    _dispatch_once_waiter_t tail = &dow, next, tmp;
    dispatch_thread_event_t event;

    // 第一次执行时,*vval为0,此时第一个参数vval和第二个参数NULL比较是相等的,返回true,然后把tail赋值给第一个参数的值。如果这时候同时有别的线程也进来,此时vval的值不是0了,所以会来到else分支。
    if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
        // 获取当前线程
        dow.dow_thread = _dispatch_tid_self();
        // 调用block函数,一般就是我们在外面做的初始化工作
        _dispatch_client_callout(ctxt, func);

        // 内部将DLOCK_ONCE_DONE赋值给val,将当前标记为已完成,返回之前的引用值。前面说过了,把tail赋值给val了,但这只是没有别的线程进来走到下面else分支,如果有别的线程进来next就是别的值了,如果没有别的信号量在等待,工作就到此结束了。
       next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
        // 如果没有别的线程进来过处于等待,这里就会结束。如果有,则遍历每一个等待的信号量,然后一个个唤醒它们
        while (next != tail) {
            // 内部用到了thread_switch,避免优先级反转。把next->dow_next返回
            tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
            event = &next->dow_event;
            next = tmp;
            // 唤醒信号量
            _dispatch_thread_event_signal(event);
        }
    } else {
        // 内部就是_dispatch_sema4_init函数,也就是初始化一个信号链表
        _dispatch_thread_event_init(&dow.dow_event);
        // next指向新的原子
        next = *vval;
        // 不断循环等待
        for (;;) {
            // 前面说过第一次进来后进入if分支,后面再次进来,会来到这里,但是之前if里面被标志为DISPATCH_ONCE_DONE了,所以结束。
            if (next == DISPATCH_ONCE_DONE) {
                break;
            }
            // 当第一次初始化的时候,同时有别的线程也进来,这是第一个线程已经占据了if分支,但其他线程也是第一进来,所以状态并不是DISPATCH_ONCE_DONE,所以就来到了这里
            // 比较vval和next是否一样,其他线程第一次来这里肯定是相等的
            if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
                dow.dow_thread = next->dow_thread;
                dow.dow_next = next;
                if (dow.dow_thread) {
                    pthread_priority_t pp = _dispatch_get_priority();
                    _dispatch_thread_override_start(dow.dow_thread, pp, val);
                }
                // 等待唤醒,唤醒后就做收尾操作
                _dispatch_thread_event_wait(&dow.dow_event);
                if (dow.dow_thread) {

                    _dispatch_thread_override_end(dow.dow_thread, val);
                }
                break;
            }
        }
        // 销毁信号量
        _dispatch_thread_event_destroy(&dow.dow_event);
    }
}

那么,回到上篇提到使用dispatch_once死锁的问题,如果使用不当会造成什么后果?回顾下上篇的实验代码


- (void)viewDidLoad {
    [super viewDidLoad];
    
    [self once];
}

- (void)once {
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        [self otherOnce];
    });
    NSLog(@"遇到第一只熊猫宝宝...");
}

- (void)otherOnce {
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        [self once];
    });
    NSLog(@"遇到第二只熊猫宝宝...");
}

示例中我们可以看到once方法需要等待otherOnce方法的完成,而otherOnce又调用了once,根据前面的源码,otherOnce调用once方法会走到else分支,在这个分支等待之前一个信号量发出唤醒指令,但是once方法里面又依赖otherOnce方法的完成,由于处于一个线程,所以就卡住了。

dispatch_group_create & dispatch_semaphore_create

为什么两个一起看,其实dispatch_group也是通过dispatch_semaphore控制的,看下dispatch_group_create源代码:

dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}

static inline dispatch_group_t
_dispatch_group_create_with_count(long count)
{
    dispatch_group_t dg = (dispatch_group_t)_dispatch_object_alloc(
        DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
 _dispatch_semaphore_class_init(count, dg); // 初始化信号量
    if (count) {
        os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); 
    }
    return dg;
}

  同样的,看下dispatch_semaphore_create源代码,是不是一股熟悉的配方:

    dispatch_semaphore_t
    dispatch_semaphore_create(long value)
    {
        dispatch_semaphore_t dsema;

        if (value < 0) {
            return DISPATCH_BAD_INPUT;
        }

        dsema = (dispatch_semaphore_t)_dispatch_object_alloc(
            DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s));
        _dispatch_semaphore_class_init(value, dsema); // 同样的初始化信号量
        dsema->dsema_orig = value;
        return dsema;
}

dispatch_group_wait & dispatch_semaphore_wait

再看下dispatch_group_wait的代码,其内部是调用的_dispatch_group_wait_slow函数:

static long _dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout)
{
    long value;
    int orig_waiters;

    value = os_atomic_load2o(dg, dg_value, ordered); 
    if (value == 0) {
        return _dispatch_group_wake(dg, false);
    }

    (void)os_atomic_inc2o(dg, dg_waiters, relaxed);

    value = os_atomic_load2o(dg, dg_value, ordered);
    // 如果group里没有任务
    if (value == 0) {
        _dispatch_group_wake(dg, false);
    
        timeout = DISPATCH_TIME_FOREVER;
    }

    _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
    switch (timeout) {
    default:
        if (!_dispatch_sema4_timedwait(&dg->dg_sema, timeout)) {
            break;
        }

    case DISPATCH_TIME_NOW:
        orig_waiters = dg->dg_waiters;
        while (orig_waiters) {
            if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters,
                    orig_waiters - 1, &orig_waiters, relaxed)) {
                return _DSEMA4_TIMEOUT();
            }
        }

    case DISPATCH_TIME_FOREVER:
        _dispatch_sema4_wait(&dg->dg_sema);
        break;
    }
    return 0;
}

对比着看dispatch_semaphore_wait源码,其内部也调用_dispatch_semaphore_wait_slow函数,可以看到逻辑基本一致:

static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
        dispatch_time_t timeout)
{
    long orig;

    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    switch (timeout) {
    default:
        if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
            break;
        }

    case DISPATCH_TIME_NOW:
        orig = dsema->dsema_value;
        while (orig < 0) {
            if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
                    &orig, relaxed)) {
                return _DSEMA4_TIMEOUT();
            }
        }

    case DISPATCH_TIME_FOREVER:
        _dispatch_sema4_wait(&dsema->dsema_sema);
        break;
    }
    return 0;
}

dispatch_group_notify

再把dispatch_group_notify看下

void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dsn = _dispatch_continuation_alloc();
    // 之前说过了
    _dispatch_continuation_init(dsn, dq, db, 0, 0, DISPATCH_OBJ_CONSUME_BIT);
    _dispatch_group_notify(dg, dq, dsn);
}

static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dsn)
{
    dsn->dc_data = dq;
    dsn->do_next = NULL;
    _dispatch_retain(dq);
    if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) {
        _dispatch_retain(dg);
        os_atomic_store2o(dg, dg_notify_head, dsn, ordered);
        // 如果此时group里面的任务都完成了,那么就立刻唤醒
        if (os_atomic_load2o(dg, dg_value, ordered) == 0) {
            _dispatch_group_wake(dg, false);
        }
    }
}

dispatch_group_async里面我们知道dispatch_group的任务在执行后会调用dispatch_group_leave。这个函数里面如果等待者没有了,就会唤醒dispatch_group。里面的任务,比如dispatch_group_notify的任务就会这时候被执行。
  这里执行的调用顺序就不贴了,基本跟dispatch_async一致。

dispatch_barrier_async

可以看到,大多数实现都是大同小异,通过不同的标志位来控制。这里跟dispatch_async的不同就在于,dispatch_async直接把任务扔到root队列,而dispatch_barrier_async是把任务在到自定义的队列。

void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;

    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_push(dq, dc);
}

void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    _dispatch_queue_push_inline(dq, dou, qos);
}

static inline void _dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
        dispatch_qos_t qos)
{
    struct dispatch_object_s *tail = _tail._do;
    dispatch_wakeup_flags_t flags = 0;
    bool overriding = _dispatch_queue_need_override_retain(dq, qos);
    // 加入到自己的队列
    if (unlikely(_dispatch_queue_push_update_tail(dq, tail))) {
        if (!overriding) _dispatch_retain_2(dq->_as_os_obj);
        _dispatch_queue_push_update_head(dq, tail);
        flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
    } else if (overriding) {
        flags = DISPATCH_WAKEUP_CONSUME_2;
    } else {
        return;
    }
    // 唤醒队列
    return dx_wakeup(dq, qos, flags);
}

void _dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_queue_barrier_complete(dq, qos, flags);
    }
    // 内部就是 tail != NULL,所以满足条件
    if (_dispatch_queue_class_probe(dq)) {
        // #define DISPATCH_QUEUE_WAKEUP_TARGET  ((dispatch_queue_wakeup_target_t)1)
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }
    return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}

void _dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
    dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);

    // 会走进去
    if (target) {
        uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
        if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
            enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
        }
        qos = _dispatch_queue_override_qos(dq, qos);
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state = _dq_state_merge_qos(old_state, qos);
            if (likely(!_dq_state_is_suspended(old_state) &&
                    !_dq_state_is_enqueued(old_state) &&
                    (!_dq_state_drain_locked(old_state) ||
                    (enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
                    _dq_state_is_base_wlh(old_state))))) {
                new_state |= enqueue;
            }
            if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
                new_state |= DISPATCH_QUEUE_DIRTY;
            } else if (new_state == old_state) {
                os_atomic_rmw_loop_give_up(goto done);
            }
        });

        if (likely((old_state ^ new_state) & enqueue)) {
            dispatch_queue_t tq;
            if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
                os_atomic_thread_fence(dependency);
                tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
                        (long)new_state);
            } else {
                tq = target;
            }
            dispatch_assert(_dq_state_is_enqueued(new_state));
            // 把队列装入到root队列中,内部调用的_dispatch_root_queue_push函数
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
    }
done:
    if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
        return _dispatch_release_2_tailcall(dq);
    }
}

// _dispatch_root_queue_push函数在dispatch_async已经贴过代码,直接看_dispatch_root_queue_push_override函数
static void _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
    dispatch_continuation_t dc = dou._dc;
    // 因为barrier是直接推进自己的队列,所以这里不会走if语句,具体注释可以看dispatch_async那里
    if (_dispatch_object_is_redirection(dc)) {
        dc->dc_func = (void *)orig_rq;
    } else {
        dc = _dispatch_continuation_alloc();
        // 指定do_vtable,所以取出来执行的时候调用的是_dispatch_queue_override_invoke函数
        dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
        _dispatch_trace_continuation_push(orig_rq, dou);
        dc->dc_ctxt = dc;
        dc->dc_other = orig_rq;
        dc->dc_data = dou._do;
        dc->dc_priority = DISPATCH_NO_PRIORITY;
        dc->dc_voucher = DISPATCH_NO_VOUCHER;
    }
    _dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

// 后面也省略

同样我们人为制造一个闪退,看下被调用顺序

6   libdispatch.dylib                   0x0000000105b952f7 _dispatch_call_block_and_release + 12
7   libdispatch.dylib                   0x0000000105b9633d _dispatch_client_callout + 8
8   libdispatch.dylib                   0x0000000105ba40a5 _dispatch_queue_concurrent_drain + 1492
9   libdispatch.dylib                   0x0000000105b9f1fb _dispatch_queue_invoke + 353
10  libdispatch.dylib                   0x0000000105b9af7c _dispatch_queue_override_invoke + 733
11  libdispatch.dylib                   0x0000000105ba2102 _dispatch_root_queue_drain + 772
12  libdispatch.dylib                   0x0000000105ba1da0 _dispatch_worker_thread3 + 132
13  libsystem_pthread.dylib             0x000000010605d5a2 _pthread_wqthread + 1299
14  libsystem_pthread.dylib             0x000000010605d07d start_wqthread + 13

一样从_dispatch_root_queue_drain开始看

 static void _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
    _dispatch_queue_set_current(dq);
    dispatch_priority_t pri = dq->dq_priority;
    if (!pri) pri = _dispatch_priority_from_pp(pp);
    dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
    _dispatch_adopt_wlh_anon();

    struct dispatch_object_s *item;
    bool reset = false;
    dispatch_invoke_context_s dic = { };
    dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
            DISPATCH_INVOKE_REDIRECTING_DRAIN;
    _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
    _dispatch_perfmon_start();
    // rootqueue可以跟一个dispatch_queue_t也可以跟一个dispatch_continuation_t
    // 所以这里item取出来的是dispatch_queue_t
    while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
        if (reset) _dispatch_wqthread_override_reset();
        _dispatch_continuation_pop_inline(item, &dic, flags, dq);
        // 重置当前线程的优先级,会跟内核交互
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
    }

    // overcommit or not. worker thread
    if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
        _dispatch_perfmon_end(perfmon_thread_worker_oc);
    } else {
        _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
    }

    _dispatch_reset_wlh();
    _dispatch_reset_basepri(old_dbp);
    _dispatch_reset_basepri_override();
    _dispatch_queue_set_current(NULL);
}

static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_t dq)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dq);
    _dispatch_trace_continuation_pop(dq, dou);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    // 调用_dispatch_queue_override_invoke函数
    // 这里其实很好理解,从root队列拿出来的有可能是一个队列,也可能就是一个任务,所以如果是队列,就调用队列的执行函数
    // 所以为什么官方文档说,不是自定义队列使用barrier无效,因为不是自定义队列,这里就直接走_dispatch_continuation_invoke_inline函数,调用函数实现了,也就是dispatch_barrier_async类似于dispatch_async了。
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._do, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

void _dispatch_queue_override_invoke(dispatch_continuation_t dc,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
    dispatch_queue_t old_rq = _dispatch_queue_get_current();
    dispatch_queue_t assumed_rq = dc->dc_other;
    dispatch_priority_t old_dp;
    voucher_t ov = DISPATCH_NO_VOUCHER;
    dispatch_object_t dou;

    dou._do = dc->dc_data;
    // 将自定义queue激活,其root队列挂起。将rootqueue保存到old_dq变量
    // 所以这也就是为什么,barrier的任务可以提前执行,后面的任务会被阻塞
    //  static inline dispatch_priority_t
    //_dispatch_root_queue_identity_assume(dispatch_queue_t assumed_rq)
    //{
    //      dispatch_priority_t old_dbp = _dispatch_get_basepri();
    //      dispatch_assert(dx_hastypeflag(assumed_rq, QUEUE_ROOT));
    //      _dispatch_reset_basepri(assumed_rq->dq_priority);
    //      _dispatch_queue_set_current(assumed_rq);
    //      return old_dbp;
    //}
    old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
    if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
        flags |= DISPATCH_INVOKE_STEALING;
    } else {
        _dispatch_trace_continuation_pop(assumed_rq, dou._do);
    }
    _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
        // 来到if分支,调用_dispatch_queue_invoke函数
        if (_dispatch_object_has_vtable(dou._do)) {
            dx_invoke(dou._do, dic, flags);
        } else {
            _dispatch_continuation_invoke_inline(dou, ov, flags);
        }
    });
    // 重新激活root队列
    _dispatch_reset_basepri(old_dp);
    _dispatch_queue_set_current(old_rq);
}

void _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags)
{
    _dispatch_queue_class_invoke(dq, dic, flags, 0, dispatch_queue_invoke2);
}

static inline void _dispatch_queue_class_invoke(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_invoke_flags_t const_restrict_flags,
        _dispatch_queue_class_invoke_handler_t invoke)
{
    dispatch_queue_t dq = dou._dq;
    dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
    bool owning = !(flags & DISPATCH_INVOKE_STEALING);
    uint64_t owned = 0;

    if (!(flags & (DISPATCH_INVOKE_STEALING | DISPATCH_INVOKE_WLH))) {
        dq->do_next = DISPATCH_OBJECT_LISTLESS;
    }
    flags |= const_restrict_flags;
    if (likely(flags & DISPATCH_INVOKE_WLH)) {
        owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED | DISPATCH_QUEUE_ENQUEUED;
    } else {
        owned = _dispatch_queue_drain_try_lock(dq, flags);
    }
    if (likely(owned)) {
        dispatch_priority_t old_dbp;
        if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) {
            old_dbp = _dispatch_set_basepri(dq->dq_priority);
        } else {
            old_dbp = 0;
        }

        flags = _dispatch_queue_merge_autorelease_frequency(dq, flags);

attempt_running_slow_head:
        // 执行dispatch_queue_invoke2函数
        // 也就是执行自定义队列里面的任务
        tq = invoke(dq, dic, flags, &owned);
        dispatch_assert(tq != DISPATCH_QUEUE_WAKEUP_TARGET);
        if (unlikely(tq != DISPATCH_QUEUE_WAKEUP_NONE &&
                tq != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT)) {
        } else if (!_dispatch_queue_drain_try_unlock(dq, owned,
                tq == DISPATCH_QUEUE_WAKEUP_NONE)) {
            tq = _dispatch_queue_get_current();
            if (dx_hastypeflag(tq, QUEUE_ROOT) || !owning) {
                goto attempt_running_slow_head;
            }
            DISPATCH_COMPILER_CAN_ASSUME(tq != DISPATCH_QUEUE_WAKEUP_NONE);
        } else {
            owned = 0;
            tq = NULL;
        }
        if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) {
            _dispatch_reset_basepri(old_dbp);
        }
    }
    if (likely(owning)) {
        _dispatch_introspection_queue_item_complete(dq);
    }

    if (tq) {
        if (const_restrict_flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS) {
            dispatch_assert(dic->dic_deferred == NULL);
        } else if (dic->dic_deferred) {
            return _dispatch_queue_drain_sync_waiter(dq, dic,
                    flags, owned);
        }

        uint64_t old_state, new_state, enqueued = DISPATCH_QUEUE_ENQUEUED;
        if (tq == DISPATCH_QUEUE_WAKEUP_MGR) {
            enqueued = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
        }
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state  = old_state - owned;
            new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
            new_state |= DISPATCH_QUEUE_DIRTY;
            if (_dq_state_is_suspended(new_state)) {
                new_state |= DLOCK_OWNER_MASK;
            } else if (_dq_state_is_runnable(new_state) &&
                    !_dq_state_is_enqueued(new_state)) {
                // drain was not interupted for suspension
                // we will reenqueue right away, just put ENQUEUED back
                new_state |= enqueued;
            }
        });
        old_state -= owned;
        if (_dq_state_received_override(old_state)) {
            // Ensure that the root queue sees that this thread was overridden.
            _dispatch_set_basepri_override_qos(_dq_state_max_qos(new_state));
        }
        if ((old_state ^ new_state) & enqueued) {
            dispatch_assert(_dq_state_is_enqueued(new_state));
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
    }

    _dispatch_release_2_tailcall(dq);
}

static inline dispatch_queue_wakeup_target_t dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, uint64_t *owned)
{
    dispatch_queue_t otq = dq->do_targetq;
    dispatch_queue_t cq = _dispatch_queue_get_current();

    if (slowpath(cq != otq)) {
        return otq;
    }
    if (dq->dq_width == 1) {
        return _dispatch_queue_serial_drain(dq, dic, flags, owned);
    }
    return _dispatch_queue_concurrent_drain(dq, dic, flags, owned);
}

static dispatch_queue_wakeup_target_t _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        uint64_t *owned)
{
    return _dispatch_queue_drain(dq, dic, flags, owned, false);
}

static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
    dispatch_queue_t orig_tq = dq->do_targetq;
    dispatch_thread_frame_s dtf;
    struct dispatch_object_s *dc = NULL, *next_dc;
    uint64_t dq_state, owned = *owned_ptr;

    if (unlikely(!dq->dq_items_tail)) return NULL;

    _dispatch_thread_frame_push(&dtf, dq);
    if (serial_drain || _dq_state_is_in_barrier(owned)) {
        // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
        // but width can change while draining barrier work items, so we only
        // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
        owned = DISPATCH_QUEUE_IN_BARRIER;
    } else {
        owned &= DISPATCH_QUEUE_WIDTH_MASK;
    }

    dc = _dispatch_queue_head(dq);
    goto first_iteration;

    // 循环执行自定义里面的任务,一个接一个执行,不能并行执行。
    for (;;) {
        dc = next_dc;
        if (unlikely(dic->dic_deferred)) {
            goto out_with_deferred_compute_owned;
        }
        if (unlikely(_dispatch_needs_to_return_to_kernel())) {
            _dispatch_return_to_kernel();
        }
        if (unlikely(!dc)) {
            if (!dq->dq_items_tail) {
                break;
            }
            dc = _dispatch_queue_head(dq);
        }
        if (unlikely(serial_drain != (dq->dq_width == 1))) {
            break;
        }
        if (unlikely(_dispatch_queue_drain_should_narrow(dic))) {
            break;
        }

first_iteration:
        dq_state = os_atomic_load(&dq->dq_state, relaxed);
        if (unlikely(_dq_state_is_suspended(dq_state))) {
            break;
        }
        if (unlikely(orig_tq != dq->do_targetq)) {
            break;
        }

        if (serial_drain || _dispatch_object_is_barrier(dc)) {
            if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
                if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
                    goto out_with_no_width;
                }
                owned = DISPATCH_QUEUE_IN_BARRIER;
            }
            next_dc = _dispatch_queue_next(dq, dc);
            if (_dispatch_object_is_sync_waiter(dc)) {
                owned = 0;
                dic->dic_deferred = dc;
                goto out_with_deferred;
            }
        } else {
            if (owned == DISPATCH_QUEUE_IN_BARRIER) {
                os_atomic_xor2o(dq, dq_state, owned, release);
                owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
            } else if (unlikely(owned == 0)) {
                if (_dispatch_object_is_sync_waiter(dc)) {
                    // sync "readers" don't observe the limit
                    _dispatch_queue_reserve_sync_width(dq);
                } else if (!_dispatch_queue_try_acquire_async(dq)) {
                    goto out_with_no_width;
                }
                owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
            }

            next_dc = _dispatch_queue_next(dq, dc);
            if (_dispatch_object_is_sync_waiter(dc)) {
                owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
                _dispatch_sync_waiter_redirect_or_wake(dq,
                        DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
                continue;
            }

            if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
                owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
                _dispatch_continuation_redirect(dq, dc);
                continue;
            }
        }
        // 执行这个函数
        _dispatch_continuation_pop_inline(dc, dic, flags, dq);
    }

    if (owned == DISPATCH_QUEUE_IN_BARRIER) {
        // if we're IN_BARRIER we really own the full width too
        owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
    }
    if (dc) {
        owned = _dispatch_queue_adjust_owned(dq, owned, dc);
    }
    *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
    *owned_ptr |= owned;
    _dispatch_thread_frame_pop(&dtf);
    return dc ? dq->do_targetq : NULL;

out_with_no_width:
    *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
    _dispatch_thread_frame_pop(&dtf);
    return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;

out_with_deferred_compute_owned:
    if (serial_drain) {
        owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
    } else {
        if (owned == DISPATCH_QUEUE_IN_BARRIER) {
            // if we're IN_BARRIER we really own the full width too
            owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
        }
        if (dc) {
            owned = _dispatch_queue_adjust_owned(dq, owned, dc);
        }
    }
out_with_deferred:
    *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
    *owned_ptr |= owned;
    if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) {
        DISPATCH_INTERNAL_CRASH(dc,
                "Deferred continuation on source, mach channel or mgr");
    }
    _dispatch_thread_frame_pop(&dtf);
    return dq->do_targetq;
}

static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_t dq)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dq);
    _dispatch_trace_continuation_pop(dq, dou);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._do, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
        dispatch_invoke_flags_t flags)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;
        _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
        if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) {
            _dispatch_continuation_with_group_invoke(dc);
        } else {
            // 调用_dispatch_client_callout执行block
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_introspection_queue_item_complete(dou);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

0x10 dispatch_get_global_queue

  可以发现,dispatch_get_global_queue其实就是取对应优先级的root队列拿来用。所以上面也提过,为啥在global_queue里面不能用barrier。

dispatch_get_global_queue(long priority, unsigned long flags)
{
    if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
        return DISPATCH_BAD_INPUT;
    }
    dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);

    if (qos == DISPATCH_QOS_UNSPECIFIED) {
        return DISPATCH_BAD_INPUT;
    }
    return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}

static inline dispatch_queue_t _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
    if (unlikely(qos == DISPATCH_QOS_UNSPECIFIED || qos > DISPATCH_QOS_MAX)) {
        DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
    }
    return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}

相关文章

  • GCD源码分析(二)

    dispatch_group_async 同样从入口看起 我们发现,其实dispatch_group_async内...

  • iOS底层探索之多线程(六)—GCD源码分析(sync 同步函数

    回顾 在上篇博客对GCD的不同的队列继续了底层的源码探索分析, 那么本篇博客将继续对GCD的函数继续源码分析。 1...

  • OC多线程学习(二) - GCD

    本文内容: GCD相关概念 有关GCD的几道面试题 源码分析:队列和异步函数 GCD概念 GCD是Grand Ce...

  • GCD 底层源码分析(二)

    在这一篇章我们主要分析同步函数与异步函数的底层源码实现,在探索之前我们先了解一下同步函数与异步函数的区别。 能否开...

  • GCD底层原理分析

    GCD底层原理分析 下面我们在libdispatch.dylib去探索队列是如何创建的 底层源码分析 在源码中搜索...

  • GCD源码分析

    最近的项目中一直有关于多线程网络请求的需求,所以一直在使用GCD队列处理问题,闲置下来,整理一下思路,研读了一次G...

  • GCD源码分析

    1、源码版本 libdispatch-1173.40.5地址:https://opensource.apple.c...

  • GCD-源码分析

    GCD源码分析 dispatch_queue_create dispatch_queue_create队列创建方法...

  • GCD源码的分析

    最近看了苹果 libdispatch 的源码,也就是 GCD 的源码,对于 GCD 的运作方式有一定了解和自己的见...

  • GCD源码分析(上)

    GCD源码官网下载地址 GCD的源码在我看来,一直是iOS源码中最晦涩难懂一种,因为涉及到太多的宏定义和结构定义,...

网友评论

    本文标题:GCD源码分析(二)

    本文链接:https://www.haomeiwen.com/subject/cgdrmqtx.html