本文主要通过GCD源码libdispatch-1173.60.1
(源码下载)对GCD的底层原理进行探究。
队列的本质
队列常规创建方法为
dispatch_queue_t
dispatch_queue_create(const char *_Nullable label,
dispatch_queue_attr_t _Nullable attr);
其实现位于libdispatch-1173.60.1
的queue.c
,
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
进入_dispatch_lane_create_with_target
,简化后如下
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// dqai
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
...
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
// OS_dispatch_queue_concurrent
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
...
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s)); // alloc
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
dq->dq_label = label;
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
dqai.dqai_relpri);
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqai.dqai_inactive) {
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_lane_inherit_wlh_from_target(dq, tq);
}
_dispatch_retain(tq);
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;
}
这段代码较多,而且命名晦涩难懂,注释极少,难以阅读,但这段函数的返回值是_dispatch_trace_queue_create(dq)._dq
,所以我们可以通过dq
来探索队列的本质。
如果一层层查看_dispatch_trace_queue_create
,会发现_dispatch_trace_queue_create(dq)
的_dq
属性实际上就是传入的dq
,因此探究dq
的来源即可。dq
的创建为
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
紧随其后有这么段代码
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
显然这分别是alloc和init方法,可以猜测队列本质上也是一种对象。
而dq
的创建有一个关键的对象dqai
,这是区别串行和并行队列的一个关键对象,先来看它的创建过程
dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
dispatch_queue_attr_info_t dqai = { };
if (!dqa) return dqai;
#if DISPATCH_VARIANT_STATIC
if (dqa == &_dispatch_queue_attr_concurrent) { // null 默认
dqai.dqai_concurrent = true;
return dqai;
}
#endif
...
return dqai;
}
可以看到除了dqa == &_dispatch_queue_attr_concurrent
的情况,其余的情况下,dqai.dqai_concurrent
都是false
,这也是为什么,如果我们传入的队列类型参数是null
的话,队列类型默认都是串行队列。
而根据dqai.dqai_concurrent
的值,创建了不同的vtable
,vtable
实际上就是队列的类型,
可以根据DISPATCH_VTABLE
一步步看到,串行队列的类型为OS_dispatch_queue_serial_class
,并发队列的类型为OS_dispatch_queue_concurrent_class
。
我们可以通过队列的创建过程_dispatch_object_alloc
来验证vtable
是队列的类型这个论断
void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
const struct dispatch_object_vtable_s *_vtable = vtable;
dispatch_object_t dou;
dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
dou._do->do_vtable = vtable;
return dou._do;
#else
return _os_object_alloc_realized(vtable, size);
#endif
}
进入_os_object_alloc_realized
(objc1已弃用,不需查看)
inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
_os_object_t obj;
dispatch_assert(size >= sizeof(struct _os_object_s));
while (unlikely(!(obj = calloc(1u, size)))) {
_dispatch_temporary_resource_shortage();
}
obj->os_obj_isa = cls;
return obj;
}
通过obj->os_obj_isa = cls;
这句代码,我们可以下结论:
串行队列是OS_dispatch_queue_serial_class
类型的对象,并发队列是OS_dispatch_queue_concurrent_class
类型
除了类型不同,在init
方法中,我们也可以看到串行队列和并发队列在一些性质上的不同,
static inline dispatch_queue_class_t
_dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
uint16_t width, uint64_t initial_state_bits)
{
uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
dispatch_queue_t dq = dqu._dq;
dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
DISPATCH_QUEUE_INACTIVE)) == 0);
if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) {
dq->do_ref_cnt++; // released when DSF_DELETED is set
}
}
dq_state |= initial_state_bits;
dq->do_next = DISPATCH_OBJECT_LISTLESS;
dqf |= DQF_WIDTH(width);
os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
dq->dq_state = dq_state;
dq->dq_serialnum =
os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
return dqu;
}
串行队列和并发队列最大的不同,串行队列传入的dqf
为1,并发队列传入的是DISPATCH_QUEUE_WIDTH_MAX
,
另附
#define DISPATCH_QUEUE_WIDTH_FULL 0x1000ull
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX (DISPATCH_QUEUE_WIDTH_FULL - 2)
所以,队列本质上也是一个对象。
任务的包装和调用流程
任务在GCD中体现为一个block,接下来我们便探究这个block是如何被处理,并且何时调用的。
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
// 任务包装器 - 接受 - 保存 - 函数式
// 保存 block
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
c(dq, dc, qos, dc->dc_flags);
}
可以看到,这里将任务work
通过dc
封装装成qos
来保存任务,通过_dispatch_continuation_init
方法看是如何封装的
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
void *ctxt = _dispatch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
进入_dispatch_continuation_init_f
方法
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has one
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_continuation_voucher_set(dc, flags);
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
可以看到,这两个方法中,将任务work
拷贝出一份ctxt
,并将ctxt
赋值给dc
的dc_ctxt
,
那么这个任务是何时调用的呢,我们可以通过断点调试查看
如图,在任务中打上断点,进入断点后再控制台使用lldb
调试查看调用堆栈
(lldb) bt
* thread #2, queue = 'com.apple.root.default-qos', stop reason = breakpoint 1.1
* frame #0: 0x000000010bd03057 GCDTest`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x000000010bd05058) at ViewController.m:36:9
frame #1: 0x000000010bedce78 libdispatch.dylib`_dispatch_call_block_and_release + 12
frame #2: 0x000000010bede0b0 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010bee0742 libdispatch.dylib`_dispatch_queue_override_invoke + 1076
frame #4: 0x000000010bef21f9 libdispatch.dylib`_dispatch_root_queue_drain + 334
frame #5: 0x000000010bef2c83 libdispatch.dylib`_dispatch_worker_thread2 + 127
frame #6: 0x000000010bf87acf libsystem_pthread.dylib`_pthread_wqthread + 244
frame #7: 0x000000010bf86ae3 libsystem_pthread.dylib`start_wqthread + 15
这里通过系统的pthead
调用了libdispatch
的_dispatch_worker_thread2
方法,经过一系列的调用后block才进行调用。
在GCD
源码全局搜索_dispatch_worker_thread2
查看它的实现
static void
_dispatch_worker_thread2(pthread_priority_t pp)
{
bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
dispatch_queue_global_t dq;
pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
_dispatch_introspection_thread_add();
_dispatch_trace_runtime_event(worker_unpark, dq, 0);
int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
dispatch_assert(pending >= 0);
_dispatch_root_queue_drain(dq, dq->dq_priority,
DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
_dispatch_voucher_debug("root queue clear", NULL);
_dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
_dispatch_trace_runtime_event(worker_park, NULL, 0);
}
通过调用堆栈得知,下一个方法为_dispatch_root_queue_drain
static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{
...
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}
...
}
因为任务在队列dq
中,所以我们通过dq
可以得知,和任务有关的下一个方法为_dispatch_continuation_pop_inline
static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_class_t dqu)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._dq, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, flags, dqu);
}
if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
查看_dispatch_continuation_invoke_inline
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou,
dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
dispatch_continuation_t dc = dou._dc, dc1;
...
if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
_dispatch_continuation_with_group_invoke(dc);
} else {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}
重点关注_dispatch_client_callout
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
return f(ctxt);
}
而在之前的我们已经知道,ctxt
就是任务block的拷贝,此时通过ctxt
进行任务执行。
函数的底层
函数分为同步函数和异步函数,我们先来看异步函数的底层。
异步函数 dispatch_async
在上面的代码中,我们已经知道dispatch_async
底层调用了_dispatch_continuation_async
先来看_dispatch_continuation_async
的底层源码,
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
显然关键函数是dx_push
,dx_push
实际上是一个宏,它的定义如下
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
调用的是dq_push
函数,而dq_push
则会根据队列的不同而执行不同的函数,在源码init.c
文件可以看到它的不同定义
DISPATCH_VTABLE_INSTANCE(queue,
// This is the base class for queues, no objects of this type are made
.do_type = _DISPATCH_QUEUE_CLUSTER,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
);
DISPATCH_VTABLE_INSTANCE(workloop,
.do_type = DISPATCH_WORKLOOP_TYPE,
.do_dispose = _dispatch_workloop_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_workloop_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_workloop_wakeup,
.dq_push = _dispatch_workloop_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_pthread_root, lane,
.do_type = DISPATCH_QUEUE_PTHREAD_ROOT_TYPE,
.do_dispose = _dispatch_pthread_root_queue_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
#endif // DISPATCH_USE_PTHREAD_ROOT_QUEUES
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_mgr, lane,
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
#if DISPATCH_USE_MGR_THREAD
.do_invoke = _dispatch_mgr_thread,
#else
.do_invoke = _dispatch_object_no_invoke,
#endif
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_mgr_queue_wakeup,
.dq_push = _dispatch_mgr_queue_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_main, lane,
.do_type = DISPATCH_QUEUE_MAIN_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_main_queue_wakeup,
.dq_push = _dispatch_main_queue_push,
);
#if DISPATCH_COCOA_COMPAT
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_runloop, lane,
.do_type = DISPATCH_QUEUE_RUNLOOP_TYPE,
.do_dispose = _dispatch_runloop_queue_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_runloop_queue_wakeup,
.dq_push = _dispatch_lane_push,
);
#endif
DISPATCH_VTABLE_INSTANCE(source,
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_dispose = _dispatch_source_dispose,
.do_debug = _dispatch_source_debug,
.do_invoke = _dispatch_source_invoke,
.dq_activate = _dispatch_source_activate,
.dq_wakeup = _dispatch_source_wakeup,
.dq_push = _dispatch_lane_push,
);
DISPATCH_VTABLE_INSTANCE(channel,
.do_type = DISPATCH_CHANNEL_TYPE,
.do_dispose = _dispatch_channel_dispose,
.do_debug = _dispatch_channel_debug,
.do_invoke = _dispatch_channel_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_channel_wakeup,
.dq_push = _dispatch_lane_push,
);
#if HAVE_MACH
DISPATCH_VTABLE_INSTANCE(mach,
.do_type = DISPATCH_MACH_CHANNEL_TYPE,
.do_dispose = _dispatch_mach_dispose,
.do_debug = _dispatch_mach_debug,
.do_invoke = _dispatch_mach_invoke,
.dq_activate = _dispatch_mach_activate,
.dq_wakeup = _dispatch_mach_wakeup,
.dq_push = _dispatch_lane_push,
);
#endif // HAVE_MACH
可以看到并发队列queue_concurrent
的dq_push
为_dispatch_lane_concurrent_push
(可以通过打_dispatch_lane_concurrent_push
符号断点验证)
查看_dispatch_lane_concurrent_push
的实现
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
这里有两个分支,我们分别打上_dispatch_continuation_redirect_push
和_dispatch_lane_push
的符号断点,发现这里走的是_dispatch_continuation_redirect_push
函数
来到_dispatch_continuation_redirect_push
函数
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos);
}
可以看到这里又调用了dx_push
,不同的是这里的dq
是队列的do_targetq
,而在队列的创建的过程中,我们知道队列的do_targetq
是
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
我们找到root_queue
对应的dq_push
方法_dispatch_root_queue_push
(这里也可以通过打上_dispatch_root_queue_push
的符号断点进行验证),
通过探寻源码,_dispatch_root_queue_push
的执行流程为:
_dispatch_root_queue_push
->_dispatch_root_queue_push_inline
->_dispatch_root_queue_poke
_dispatch_root_queue_poke
的源码代码较多,但主要是两个步骤,一个是注册方法回调,另一个则是
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
...
_dispatch_root_queues_init();
...
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
创建线程的过程比较简单,通过一个do-while循环创建线程(使用pthread_create
创建)
而_dispatch_root_queues_init
是如何注册回调的呢,我们通过源码查看
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
这里使用了dispatch_once_f
函数保证了传入的func
--_dispatch_root_queues_init_once
只执行一次(和创建单例使用的的dispath_once
一样)。
查看_dispatch_root_queues_init_once
,简化后如下
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
...
...
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
...
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
...
#endif // DISPATCH_USE_KEVENT_SETUP
#endif // DISPATCH_USE_KEVENT_WORKLOOP
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
...
#endif // DISPATCH_USE_KEVENT_SETUP
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}
#pragma clang diagnostic pop
...
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}
在这个方法的不同事务中,调用的句柄都是_dispatch_worker_thread2
,而在我们探究任务block的调用流程的时候,我们就已经值到,线程就是调用_dispatch_worker_thread2
来执行任务的。
小结
至此,我们知道,异步函数将异步任务拷贝并封装,并设置为回调func放到队列中,然后底层又通过dx_push
重定向到根队列的dx_push
,通过pthread_create
创建线程,最后通过dx_invoke
自信任务block
同步函数
同步函数dispatch_sync
的底层源码为
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
进入_dispatch_sync_f
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
接着查看_dispatch_sync_f_inline
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
在这段代码中,dq->dq_width == 1
表示当前队列为串行队列,走的是_dispatch_barrier_sync_f
方法,所以,串行队列同步函数底层实际是通过栅栏函数实现,而串行队列同步执行是有可能导致线程死锁的,那么原因到底是什么呢,我们可以通过这部分底层代码一探究竟。
查看_dispatch_barrier_sync_f
的底层实现
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
首先正常情况下,代码会走_dispatch_lane_barrier_sync_invoke_and_complete
->_dispatch_sync_function_invoke_inline
流程,而如果发生死锁会执行_dispatch_sync_f_slow
方法(可以通过符号断点验证),我们先通过_dispatch_sync_function_invoke_inline
源码看看正常流程。
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
这段代码中主要有三个步骤
-
_dispatch_thread_frame_push
: 将任务压入队列 -
_dispatch_client_callout
: 执行任务block -
_dispatch_thread_frame_pop
: 任务出列
再来看死锁的情况,这段代码为
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
进入当前方法时,当前队列是阻塞、挂起的,通过_dispatch_trace_item_push
方法将任务dsc
压入队列top_dq
中,在通过__DISPATCH_WAIT_FOR_QUEUE__
判断当前是否是正在等待的队列
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor();
_dispatch_thread_event_init(&dsc->dsc_event);
}
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
_dispatch_trace_runtime_event(sync_wait, dq, 0);
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_wait(&dsc->dsc_event); // acquire
} else {
_dispatch_event_loop_wait_for_ownership(dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}
这段代码中,首先会通过_dispatch_wait_prepare
获取当前的队列,再通过_dq_state_drain_locked_by
判断待添加任务的队列和当前队列是否一致,如果一致,就会进入代码
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state, "dispatch_sync called on queue " "already owned by current thread")
抛出异常。
查看_dispatch_wait_prepare
->_dispatch_lock_is_locked_by
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
这里通过异或算法判断两个值是否一致(异或运算中相同为0不同为1,DLOCK_OWNER_MASK
是一个超大的数,如果lock_value
和tid
相同,则值为0,0与上任何数都为0)。
这就是为什么串行队列同步执行会产生死锁。
而如果是并发队列,代码会直接走_dispatch_lane_barrier_sync_invoke_and_complete
->_dispatch_sync_function_invoke_inline
流程,也就是正常的执行流程。
总结
至此,我们可以得出以下结论
- 队列的本质是一个对象。
- 异步函数通过
pthread_create
创建线程,并通过递归的方式找到根队列的dq_push
方法执行流程 - 同步函数底层是通过栅栏函数实现的,串行队列下同步函数因为当前队列和任务所依赖队列相同相互等待导致死锁。
ps: GCD的底层源码是在太难读,很多时候也只能大概读懂,有错误遗漏的地方期待读者指正。
网友评论