![](https://img.haomeiwen.com/i2257417/84720a96a68e22d6.png)
dispatch_group_async
同样从入口看起
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
_dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc);
}
同样是_dispatch_continuation_init函数,这里跟dispatch_async那里一毛一样,忘记了的话,往回看。我们接着往下看,_dispatch_continuation_group_async函数
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc);
}
我们发现,其实dispatch_group_async内部也是加了dispatch_group_enter函数。dispatch_group_async怎么初始化我们至此已经说明完毕。
后面取出执行block逻辑跟dispatch_async略微不同,前面部分不做多说,调用顺序跟dispatch_async是一样的,唯一不同在于_dispatch_continuation_invoke_inline这个函数。
static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
dispatch_invoke_flags_t flags)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
_dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
// 这次走if语句
if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) {
_dispatch_continuation_with_group_invoke(dc);
} else {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
// 执行任务
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
// 调用dispatch_group_leave
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
我们有必要看下dispatch_group_enter和dispatch_group_leave函数。
// dispatch_group_enter里面没啥说的,也就dg->dg_value值加1
void dispatch_group_enter(dispatch_group_t dg)
{
long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) {
DISPATCH_CLIENT_CRASH(value,
"Too many nested calls to dispatch_group_enter()");
}
if (value == 0) {
_dispatch_retain(dg);
}
}
void dispatch_group_leave(dispatch_group_t dg)
{
long value = os_atomic_dec2o(dg, dg_value, release);
// 如果没有等待者,则调用_dispatch_group_wake函数
if (slowpath(value == 0)) {
return (void)_dispatch_group_wake(dg, true);
}
// 小于0就会crash,所以dispatch_group_enter和dispatch_group_leave必须匹配,不然就crash了。
if (slowpath(value < 0)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_group_leave()");
}
}
static long _dispatch_group_wake(dispatch_group_t dg, bool needs_release)
{
dispatch_continuation_t next, head, tail = NULL;
long rval;
head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
if (head) {
tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
}
// dg->dg_waiters赋值为0,并返回dg->dg_waiters之前的值
rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
// 如果之前还有等待者
if (rval) {
// 创建信号量
_dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
// 发出信号
_dispatch_sema4_signal(&dg->dg_sema, rval);
}
uint16_t refs = needs_release ? 1 : 0;
// dispatch_group里是否有任务等待执行,有的话加入。
// 比如dispatch_group_notify的任务就在此时被唤醒
if (head) {
do {
next = os_mpsc_pop_snapshot_head(head, tail, do_next);
dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
_dispatch_continuation_async(dsn_queue, head);
_dispatch_release(dsn_queue);
} while ((head = next));
refs++;
}
if (refs) _dispatch_release_n(dg, refs);
return 0;
}
dispatch_once
还是从入口函数开始看
// 我们调用dispatch_once的入口
void dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
// 内部又调用了dispatch_once_f函数
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
DISPATCH_NOINLINE
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
return dispatch_once_f_slow(val, ctxt, func);
}
DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
// _dispatch_once_waiter_t格式:
// typedef struct _dispatch_once_waiter_s {
// volatile struct _dispatch_once_waiter_s *volatile dow_next;
// dispatch_thread_event_s dow_event;
// mach_port_t dow_thread;
// } *_dispatch_once_waiter_t;
// volatile:告诉编译器不要对此指针进行代码优化,因为这个指针指向的值可能会被其他线程改变
_dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
struct _dispatch_once_waiter_s dow = { };
_dispatch_once_waiter_t tail = &dow, next, tmp;
dispatch_thread_event_t event;
// 第一次执行时,*vval为0,此时第一个参数vval和第二个参数NULL比较是相等的,返回true,然后把tail赋值给第一个参数的值。如果这时候同时有别的线程也进来,此时vval的值不是0了,所以会来到else分支。
if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
// 获取当前线程
dow.dow_thread = _dispatch_tid_self();
// 调用block函数,一般就是我们在外面做的初始化工作
_dispatch_client_callout(ctxt, func);
// 内部将DLOCK_ONCE_DONE赋值给val,将当前标记为已完成,返回之前的引用值。前面说过了,把tail赋值给val了,但这只是没有别的线程进来走到下面else分支,如果有别的线程进来next就是别的值了,如果没有别的信号量在等待,工作就到此结束了。
next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
// 如果没有别的线程进来过处于等待,这里就会结束。如果有,则遍历每一个等待的信号量,然后一个个唤醒它们
while (next != tail) {
// 内部用到了thread_switch,避免优先级反转。把next->dow_next返回
tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
event = &next->dow_event;
next = tmp;
// 唤醒信号量
_dispatch_thread_event_signal(event);
}
} else {
// 内部就是_dispatch_sema4_init函数,也就是初始化一个信号链表
_dispatch_thread_event_init(&dow.dow_event);
// next指向新的原子
next = *vval;
// 不断循环等待
for (;;) {
// 前面说过第一次进来后进入if分支,后面再次进来,会来到这里,但是之前if里面被标志为DISPATCH_ONCE_DONE了,所以结束。
if (next == DISPATCH_ONCE_DONE) {
break;
}
// 当第一次初始化的时候,同时有别的线程也进来,这是第一个线程已经占据了if分支,但其他线程也是第一进来,所以状态并不是DISPATCH_ONCE_DONE,所以就来到了这里
// 比较vval和next是否一样,其他线程第一次来这里肯定是相等的
if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
dow.dow_thread = next->dow_thread;
dow.dow_next = next;
if (dow.dow_thread) {
pthread_priority_t pp = _dispatch_get_priority();
_dispatch_thread_override_start(dow.dow_thread, pp, val);
}
// 等待唤醒,唤醒后就做收尾操作
_dispatch_thread_event_wait(&dow.dow_event);
if (dow.dow_thread) {
_dispatch_thread_override_end(dow.dow_thread, val);
}
break;
}
}
// 销毁信号量
_dispatch_thread_event_destroy(&dow.dow_event);
}
}
那么,回到上篇提到使用dispatch_once
死锁的问题,如果使用不当会造成什么后果?回顾下上篇的实验代码
- (void)viewDidLoad {
[super viewDidLoad];
[self once];
}
- (void)once {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self otherOnce];
});
NSLog(@"遇到第一只熊猫宝宝...");
}
- (void)otherOnce {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self once];
});
NSLog(@"遇到第二只熊猫宝宝...");
}
示例中我们可以看到once
方法需要等待otherOnce
方法的完成,而otherOnce
又调用了once
,根据前面的源码,otherOnce
调用once方法会走到else分支,在这个分支等待之前一个信号量发出唤醒指令,但是once方法里面又依赖otherOnce
方法的完成,由于处于一个线程,所以就卡住了。
dispatch_group_create & dispatch_semaphore_create
为什么两个一起看,其实dispatch_group也是通过dispatch_semaphore
控制的,看下dispatch_group_create
源代码:
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(long count)
{
dispatch_group_t dg = (dispatch_group_t)_dispatch_object_alloc(
DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
_dispatch_semaphore_class_init(count, dg); // 初始化信号量
if (count) {
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed);
}
return dg;
}
同样的,看下dispatch_semaphore_create源代码,是不是一股熟悉的配方:
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = (dispatch_semaphore_t)_dispatch_object_alloc(
DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s));
_dispatch_semaphore_class_init(value, dsema); // 同样的初始化信号量
dsema->dsema_orig = value;
return dsema;
}
dispatch_group_wait & dispatch_semaphore_wait
再看下dispatch_group_wait
的代码,其内部是调用的_dispatch_group_wait_slow
函数:
static long _dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout)
{
long value;
int orig_waiters;
value = os_atomic_load2o(dg, dg_value, ordered);
if (value == 0) {
return _dispatch_group_wake(dg, false);
}
(void)os_atomic_inc2o(dg, dg_waiters, relaxed);
value = os_atomic_load2o(dg, dg_value, ordered);
// 如果group里没有任务
if (value == 0) {
_dispatch_group_wake(dg, false);
timeout = DISPATCH_TIME_FOREVER;
}
_dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dg->dg_sema, timeout)) {
break;
}
case DISPATCH_TIME_NOW:
orig_waiters = dg->dg_waiters;
while (orig_waiters) {
if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters,
orig_waiters - 1, &orig_waiters, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dg->dg_sema);
break;
}
return 0;
}
对比着看dispatch_semaphore_wait源码,其内部也调用_dispatch_semaphore_wait_slow函数,可以看到逻辑基本一致:
static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
dispatch_group_notify
再把dispatch_group_notify
看下
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
// 之前说过了
_dispatch_continuation_init(dsn, dq, db, 0, 0, DISPATCH_OBJ_CONSUME_BIT);
_dispatch_group_notify(dg, dq, dsn);
}
static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
dsn->dc_data = dq;
dsn->do_next = NULL;
_dispatch_retain(dq);
if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) {
_dispatch_retain(dg);
os_atomic_store2o(dg, dg_notify_head, dsn, ordered);
// 如果此时group里面的任务都完成了,那么就立刻唤醒
if (os_atomic_load2o(dg, dg_value, ordered) == 0) {
_dispatch_group_wake(dg, false);
}
}
}
在dispatch_group_async
里面我们知道dispatch_group
的任务在执行后会调用dispatch_group_leave
。这个函数里面如果等待者没有了,就会唤醒dispatch_group
。里面的任务,比如dispatch_group_notify
的任务就会这时候被执行。
这里执行的调用顺序就不贴了,基本跟dispatch_async
一致。
dispatch_barrier_async
可以看到,大多数实现都是大同小异,通过不同的标志位来控制。这里跟dispatch_async的不同就在于,dispatch_async
直接把任务扔到root
队列,而dispatch_barrier_async
是把任务在到自定义的队列。
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_push(dq, dc);
}
void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
_dispatch_queue_push_inline(dq, dou, qos);
}
static inline void _dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
dispatch_qos_t qos)
{
struct dispatch_object_s *tail = _tail._do;
dispatch_wakeup_flags_t flags = 0;
bool overriding = _dispatch_queue_need_override_retain(dq, qos);
// 加入到自己的队列
if (unlikely(_dispatch_queue_push_update_tail(dq, tail))) {
if (!overriding) _dispatch_retain_2(dq->_as_os_obj);
_dispatch_queue_push_update_head(dq, tail);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (overriding) {
flags = DISPATCH_WAKEUP_CONSUME_2;
} else {
return;
}
// 唤醒队列
return dx_wakeup(dq, qos, flags);
}
void _dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_queue_barrier_complete(dq, qos, flags);
}
// 内部就是 tail != NULL,所以满足条件
if (_dispatch_queue_class_probe(dq)) {
// #define DISPATCH_QUEUE_WAKEUP_TARGET ((dispatch_queue_wakeup_target_t)1)
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}
void _dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
// 会走进去
if (target) {
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
}
qos = _dispatch_queue_override_qos(dq, qos);
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = _dq_state_merge_qos(old_state, qos);
if (likely(!_dq_state_is_suspended(old_state) &&
!_dq_state_is_enqueued(old_state) &&
(!_dq_state_drain_locked(old_state) ||
(enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
_dq_state_is_base_wlh(old_state))))) {
new_state |= enqueue;
}
if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
new_state |= DISPATCH_QUEUE_DIRTY;
} else if (new_state == old_state) {
os_atomic_rmw_loop_give_up(goto done);
}
});
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
os_atomic_thread_fence(dependency);
tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
(long)new_state);
} else {
tq = target;
}
dispatch_assert(_dq_state_is_enqueued(new_state));
// 把队列装入到root队列中,内部调用的_dispatch_root_queue_push函数
return _dispatch_queue_push_queue(tq, dq, new_state);
}
}
done:
if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
return _dispatch_release_2_tailcall(dq);
}
}
// _dispatch_root_queue_push函数在dispatch_async已经贴过代码,直接看_dispatch_root_queue_push_override函数
static void _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
dispatch_object_t dou, dispatch_qos_t qos)
{
bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
dispatch_continuation_t dc = dou._dc;
// 因为barrier是直接推进自己的队列,所以这里不会走if语句,具体注释可以看dispatch_async那里
if (_dispatch_object_is_redirection(dc)) {
dc->dc_func = (void *)orig_rq;
} else {
dc = _dispatch_continuation_alloc();
// 指定do_vtable,所以取出来执行的时候调用的是_dispatch_queue_override_invoke函数
dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
_dispatch_trace_continuation_push(orig_rq, dou);
dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dou._do;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}
_dispatch_root_queue_push_inline(rq, dc, dc, 1);
}
// 后面也省略
同样我们人为制造一个闪退,看下被调用顺序
6 libdispatch.dylib 0x0000000105b952f7 _dispatch_call_block_and_release + 12
7 libdispatch.dylib 0x0000000105b9633d _dispatch_client_callout + 8
8 libdispatch.dylib 0x0000000105ba40a5 _dispatch_queue_concurrent_drain + 1492
9 libdispatch.dylib 0x0000000105b9f1fb _dispatch_queue_invoke + 353
10 libdispatch.dylib 0x0000000105b9af7c _dispatch_queue_override_invoke + 733
11 libdispatch.dylib 0x0000000105ba2102 _dispatch_root_queue_drain + 772
12 libdispatch.dylib 0x0000000105ba1da0 _dispatch_worker_thread3 + 132
13 libsystem_pthread.dylib 0x000000010605d5a2 _pthread_wqthread + 1299
14 libsystem_pthread.dylib 0x000000010605d07d start_wqthread + 13
一样从_dispatch_root_queue_drain开始看
static void _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
_dispatch_queue_set_current(dq);
dispatch_priority_t pri = dq->dq_priority;
if (!pri) pri = _dispatch_priority_from_pp(pp);
dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
_dispatch_adopt_wlh_anon();
struct dispatch_object_s *item;
bool reset = false;
dispatch_invoke_context_s dic = { };
dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
DISPATCH_INVOKE_REDIRECTING_DRAIN;
_dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
_dispatch_perfmon_start();
// rootqueue可以跟一个dispatch_queue_t也可以跟一个dispatch_continuation_t
// 所以这里item取出来的是dispatch_queue_t
while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
// 重置当前线程的优先级,会跟内核交互
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}
// overcommit or not. worker thread
if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
_dispatch_perfmon_end(perfmon_thread_worker_oc);
} else {
_dispatch_perfmon_end(perfmon_thread_worker_non_oc);
}
_dispatch_reset_wlh();
_dispatch_reset_basepri(old_dbp);
_dispatch_reset_basepri_override();
_dispatch_queue_set_current(NULL);
}
static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_t dq)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dq);
_dispatch_trace_continuation_pop(dq, dou);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
// 调用_dispatch_queue_override_invoke函数
// 这里其实很好理解,从root队列拿出来的有可能是一个队列,也可能就是一个任务,所以如果是队列,就调用队列的执行函数
// 所以为什么官方文档说,不是自定义队列使用barrier无效,因为不是自定义队列,这里就直接走_dispatch_continuation_invoke_inline函数,调用函数实现了,也就是dispatch_barrier_async类似于dispatch_async了。
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
}
if (observer_hooks) observer_hooks->queue_did_execute(dq);
}
void _dispatch_queue_override_invoke(dispatch_continuation_t dc,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
dispatch_queue_t old_rq = _dispatch_queue_get_current();
dispatch_queue_t assumed_rq = dc->dc_other;
dispatch_priority_t old_dp;
voucher_t ov = DISPATCH_NO_VOUCHER;
dispatch_object_t dou;
dou._do = dc->dc_data;
// 将自定义queue激活,其root队列挂起。将rootqueue保存到old_dq变量
// 所以这也就是为什么,barrier的任务可以提前执行,后面的任务会被阻塞
// static inline dispatch_priority_t
//_dispatch_root_queue_identity_assume(dispatch_queue_t assumed_rq)
//{
// dispatch_priority_t old_dbp = _dispatch_get_basepri();
// dispatch_assert(dx_hastypeflag(assumed_rq, QUEUE_ROOT));
// _dispatch_reset_basepri(assumed_rq->dq_priority);
// _dispatch_queue_set_current(assumed_rq);
// return old_dbp;
//}
old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
flags |= DISPATCH_INVOKE_STEALING;
} else {
_dispatch_trace_continuation_pop(assumed_rq, dou._do);
}
_dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
// 来到if分支,调用_dispatch_queue_invoke函数
if (_dispatch_object_has_vtable(dou._do)) {
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, ov, flags);
}
});
// 重新激活root队列
_dispatch_reset_basepri(old_dp);
_dispatch_queue_set_current(old_rq);
}
void _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags)
{
_dispatch_queue_class_invoke(dq, dic, flags, 0, dispatch_queue_invoke2);
}
static inline void _dispatch_queue_class_invoke(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_invoke_flags_t const_restrict_flags,
_dispatch_queue_class_invoke_handler_t invoke)
{
dispatch_queue_t dq = dou._dq;
dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
bool owning = !(flags & DISPATCH_INVOKE_STEALING);
uint64_t owned = 0;
if (!(flags & (DISPATCH_INVOKE_STEALING | DISPATCH_INVOKE_WLH))) {
dq->do_next = DISPATCH_OBJECT_LISTLESS;
}
flags |= const_restrict_flags;
if (likely(flags & DISPATCH_INVOKE_WLH)) {
owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED | DISPATCH_QUEUE_ENQUEUED;
} else {
owned = _dispatch_queue_drain_try_lock(dq, flags);
}
if (likely(owned)) {
dispatch_priority_t old_dbp;
if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) {
old_dbp = _dispatch_set_basepri(dq->dq_priority);
} else {
old_dbp = 0;
}
flags = _dispatch_queue_merge_autorelease_frequency(dq, flags);
attempt_running_slow_head:
// 执行dispatch_queue_invoke2函数
// 也就是执行自定义队列里面的任务
tq = invoke(dq, dic, flags, &owned);
dispatch_assert(tq != DISPATCH_QUEUE_WAKEUP_TARGET);
if (unlikely(tq != DISPATCH_QUEUE_WAKEUP_NONE &&
tq != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT)) {
} else if (!_dispatch_queue_drain_try_unlock(dq, owned,
tq == DISPATCH_QUEUE_WAKEUP_NONE)) {
tq = _dispatch_queue_get_current();
if (dx_hastypeflag(tq, QUEUE_ROOT) || !owning) {
goto attempt_running_slow_head;
}
DISPATCH_COMPILER_CAN_ASSUME(tq != DISPATCH_QUEUE_WAKEUP_NONE);
} else {
owned = 0;
tq = NULL;
}
if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) {
_dispatch_reset_basepri(old_dbp);
}
}
if (likely(owning)) {
_dispatch_introspection_queue_item_complete(dq);
}
if (tq) {
if (const_restrict_flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS) {
dispatch_assert(dic->dic_deferred == NULL);
} else if (dic->dic_deferred) {
return _dispatch_queue_drain_sync_waiter(dq, dic,
flags, owned);
}
uint64_t old_state, new_state, enqueued = DISPATCH_QUEUE_ENQUEUED;
if (tq == DISPATCH_QUEUE_WAKEUP_MGR) {
enqueued = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
}
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state - owned;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state |= DISPATCH_QUEUE_DIRTY;
if (_dq_state_is_suspended(new_state)) {
new_state |= DLOCK_OWNER_MASK;
} else if (_dq_state_is_runnable(new_state) &&
!_dq_state_is_enqueued(new_state)) {
// drain was not interupted for suspension
// we will reenqueue right away, just put ENQUEUED back
new_state |= enqueued;
}
});
old_state -= owned;
if (_dq_state_received_override(old_state)) {
// Ensure that the root queue sees that this thread was overridden.
_dispatch_set_basepri_override_qos(_dq_state_max_qos(new_state));
}
if ((old_state ^ new_state) & enqueued) {
dispatch_assert(_dq_state_is_enqueued(new_state));
return _dispatch_queue_push_queue(tq, dq, new_state);
}
}
_dispatch_release_2_tailcall(dq);
}
static inline dispatch_queue_wakeup_target_t dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned)
{
dispatch_queue_t otq = dq->do_targetq;
dispatch_queue_t cq = _dispatch_queue_get_current();
if (slowpath(cq != otq)) {
return otq;
}
if (dq->dq_width == 1) {
return _dispatch_queue_serial_drain(dq, dic, flags, owned);
}
return _dispatch_queue_concurrent_drain(dq, dic, flags, owned);
}
static dispatch_queue_wakeup_target_t _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
uint64_t *owned)
{
return _dispatch_queue_drain(dq, dic, flags, owned, false);
}
static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
dispatch_queue_t orig_tq = dq->do_targetq;
dispatch_thread_frame_s dtf;
struct dispatch_object_s *dc = NULL, *next_dc;
uint64_t dq_state, owned = *owned_ptr;
if (unlikely(!dq->dq_items_tail)) return NULL;
_dispatch_thread_frame_push(&dtf, dq);
if (serial_drain || _dq_state_is_in_barrier(owned)) {
// we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
// but width can change while draining barrier work items, so we only
// convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
owned = DISPATCH_QUEUE_IN_BARRIER;
} else {
owned &= DISPATCH_QUEUE_WIDTH_MASK;
}
dc = _dispatch_queue_head(dq);
goto first_iteration;
// 循环执行自定义里面的任务,一个接一个执行,不能并行执行。
for (;;) {
dc = next_dc;
if (unlikely(dic->dic_deferred)) {
goto out_with_deferred_compute_owned;
}
if (unlikely(_dispatch_needs_to_return_to_kernel())) {
_dispatch_return_to_kernel();
}
if (unlikely(!dc)) {
if (!dq->dq_items_tail) {
break;
}
dc = _dispatch_queue_head(dq);
}
if (unlikely(serial_drain != (dq->dq_width == 1))) {
break;
}
if (unlikely(_dispatch_queue_drain_should_narrow(dic))) {
break;
}
first_iteration:
dq_state = os_atomic_load(&dq->dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
break;
}
if (unlikely(orig_tq != dq->do_targetq)) {
break;
}
if (serial_drain || _dispatch_object_is_barrier(dc)) {
if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_IN_BARRIER;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned = 0;
dic->dic_deferred = dc;
goto out_with_deferred;
}
} else {
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
os_atomic_xor2o(dq, dq_state, owned, release);
owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
} else if (unlikely(owned == 0)) {
if (_dispatch_object_is_sync_waiter(dc)) {
// sync "readers" don't observe the limit
_dispatch_queue_reserve_sync_width(dq);
} else if (!_dispatch_queue_try_acquire_async(dq)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_sync_waiter_redirect_or_wake(dq,
DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
continue;
}
if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_continuation_redirect(dq, dc);
continue;
}
}
// 执行这个函数
_dispatch_continuation_pop_inline(dc, dic, flags, dq);
}
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// if we're IN_BARRIER we really own the full width too
owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
}
if (dc) {
owned = _dispatch_queue_adjust_owned(dq, owned, dc);
}
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
*owned_ptr |= owned;
_dispatch_thread_frame_pop(&dtf);
return dc ? dq->do_targetq : NULL;
out_with_no_width:
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
_dispatch_thread_frame_pop(&dtf);
return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
out_with_deferred_compute_owned:
if (serial_drain) {
owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
} else {
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// if we're IN_BARRIER we really own the full width too
owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
}
if (dc) {
owned = _dispatch_queue_adjust_owned(dq, owned, dc);
}
}
out_with_deferred:
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
*owned_ptr |= owned;
if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) {
DISPATCH_INTERNAL_CRASH(dc,
"Deferred continuation on source, mach channel or mgr");
}
_dispatch_thread_frame_pop(&dtf);
return dq->do_targetq;
}
static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_t dq)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dq);
_dispatch_trace_continuation_pop(dq, dou);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
}
if (observer_hooks) observer_hooks->queue_did_execute(dq);
}
static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
dispatch_invoke_flags_t flags)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
_dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) {
_dispatch_continuation_with_group_invoke(dc);
} else {
// 调用_dispatch_client_callout执行block
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}
0x10 dispatch_get_global_queue
可以发现,dispatch_get_global_queue其实就是取对应优先级的root队列拿来用。所以上面也提过,为啥在global_queue里面不能用barrier。
dispatch_get_global_queue(long priority, unsigned long flags)
{
if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return DISPATCH_BAD_INPUT;
}
dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
if (qos == DISPATCH_QOS_UNSPECIFIED) {
return DISPATCH_BAD_INPUT;
}
return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}
static inline dispatch_queue_t _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
if (unlikely(qos == DISPATCH_QOS_UNSPECIFIED || qos > DISPATCH_QOS_MAX)) {
DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
}
return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
网友评论