dispatch_sync 同步函数
dispatch sync
函数。它意味着 同步 ( synchronous).也就是将指定的Block
同步追加到指定的 Dispatch Queue 中。在追加 Block
结束之前,dispatch_sync
函数会一直等待。
一旦调用 dispatch_sync
函数,那么在指定的处理执行结束之前,该函数不会返冋,所以也容易引起 死锁 问题。
例如,在主线程中执行以下源代码就会 死锁。
dispatch_queue_t queue = dispatch_get_main_queue();
dispatch_sync (queue,^{
NSLog (@"Hello?");
});
该源代码在主线程中执行指定的 Block
,并等待其执行结束。而其实在主线程中正在执行这些源代码,所以无法执行追加到主线程的 Block
。当然串行队列也会引起相同的问题。
dispatch_queue_t queue = dispatch_queue_create("com.example.gcd.MySerialDispatchQueue",NULL);
dispatch_async(queue, ^{
dispatch_sync(queue, ^{
NSLog (@"Hello");
});
});
dispatch_async 异步函数
dispatch_async
函数的 async 意味 非同步(asynchronous),就是将指定的 Block
非同步地追加到指定的 Dispatch Queue 中。dispatch_async
函数不做任何等待。
dispatch_async(dispatch_get_global_queue(0, 0), ^{
//执行耗时操作
dispatch_async(dispatch_get_main_queue(), ^{
//回到主线程进行UI操作
});
});
dispatch_sync 的实现
_dispatch_sync
在 libdispatch 中搜索 dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
找到源码实现调用 _dispatch_sync_f
函数,继续搜索 _dispatch_sync_f
调用 _dispatch_sync_f_inline
函数:
DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
//...
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
_dispatch_sync_f_inline
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
_dispatch_sync_f_inline
内部实现分为两个流程:
-
dq
为串行队列,调用_dispatch_barrier_sync_f
函数 - 否则为并发队列,调用
_dispatch_sync_invoke_and_complete
_dispatch_sync_invoke_and_complete
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_lane_non_barrier_complete(dq, 0);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
_dispatch_sync_invoke_and_complete
执行流程如下:
-
_dispatch_thread_frame_push
任务压栈到队列 -
_dispatch_client_callout
执行任务 -
_dispatch_thread_frame_pop
出栈 - 调用 ·_dispatch_lane_non_barrier_complete` 完成
总结
- 同步串行情况调用
_dispatch_barrier_sync_f
同步栅栏函数 - 同步并发情况是先将任务
push
队列中,然后执行block
回调,在将任务pop
,所以任务是顺序执行的。
dispatch_async 是怎样执行 block 任务的?
lldb 调试
尝试通过lldb查看堆栈信息如下:
并发线程Block任务执行堆栈查看.png
通过上图可知, 在执行Block任务之前,是按照 start_wqthread
-> _pthread_wqthread
-> _dispatch_worker_thread2
-> _dispatch_root_queue_drain
-> _dispatch_async_redirect_invoke
-> _dispatch_continuation_pop
-> _dispatch_client_callout
-> _dispatch_call_block_and_release
,最终进行调用执行 block
。 那么在 dispatch_async
函数后,start_wqthread
之前,做了些什么呢?
dispatch_async 实现
打开 libdispatch 工程后搜索 dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
进行查看,可以看到整体流程分为两步:
-
_dispatch_continuation_init
任务包装函数,block
块的实参只在这里有使用; -
dispatch_continuation_async
并发处理函数
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
//任务包装器
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
// 并发处理函数
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
_dispatch_continuation_init
任务的包装
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
void *ctxt = _dispatch_Block_copy(work);//拷贝任务
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
// dc_flags = DC_FLAG_CONSUME
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;//赋值
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
//func = _dispatch_call_block_and_release;
func = _dispatch_call_block_and_release;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has one
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_continuation_voucher_set(dc, flags);
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
通过查看源代码得知 _dispatch_continuation_init
函数:
-
通过
_dispatch_Block_copy
拷贝任务 , 得到ctxt
函数指针,赋值给dc
; -
由于
dc_flags = DC_FLAG_CONSUME
,因此dc_flags & DC_FLAG_CONSUME
结果为真,func
为_dispatch_call_block_and_release
void _dispatch_call_block_and_release(void *block) { void (^b)(void) = block; b(); Block_release(b); }
-
通过
_dispatch_continuation_init_f
方法将 最终得到的func
以及ctxt
进行存储。
_dispatch_continuation_async
并发处理函数
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
_dispatch_continuation_async
最终会调用 dx_push
函数,搜索查看 dx_push
是什么?
dx_push
查看源码后发现最终调用 dx_push
通过工程中搜索 dx_push
其实是一个宏,实际上执行的 dq_push
,
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
搜索 DISPATCH_VTABLE_SUBCLASS_INSTANCE
会发现 OS_OBJECT_VTABLE_SUBCLASS_INSTANCE
根据队列类型进行初始化。
#define DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, ctype, ...) \
OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(dispatch_##name, dispatch_##ctype, \
_dispatch_xref_dispose, _dispatch_dispose, \
.do_kind = #name, __VA_ARGS__)
因此得出结论 _dispatch_continuation_async
函数中 dx_push
最终调用到的是各个队列对象中的 dq_push
存储的函数,如下图。
_dispatch_lane_concurrent_push
继续以并发队列为例,在源码中搜索 _dispatch_lane_concurrent_push
找到下面代码:
void _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (unlikely(_dispatch_queue_is_cooperative(dq))) {
/* If we're here, means that we're in the simulator fallback case. We
* still restrict what can target the cooperative thread pool */
if (_dispatch_object_has_vtable(dou) &&
dx_type(dou._do) != DISPATCH_SWIFT_JOB_TYPE) {
DISPATCH_CLIENT_CRASH(dou._do, "Cannot target the cooperative global queue - not implemented");
}
}
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
_dispatch_continuation_redirect_push
在 _dispatch_lane_concurrent_push
中 会看到函数 _dispatch_continuation_redirect_push
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatchync_r_asedirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos);
}
通过上面源码我们可以发现,_dispatch_continuation_redirect_push
函数通过 dispatch_queue_t dq = dl->do_targetq
再次对新的队列调用 dx_push
,那么 do_targetq
是什么呢?
在 _dispatch_lane_create_with_target
函数中我们可以找到 do_targetq
是通过 _dispatch_get_root_queue
生成的,而这个函数返回的是 dispatch_queue_global_t
类型,根据 dq_push
存储的函数类型可知,调用的是 _dispatch_root_queue_push
函数。因此,dispatch_continuation_redirect_push
其实重定向到了 _dispatch_root_queue_push
函数中。
自定义队列非常强大,在自定义队列中被调度的所有 block 最终都将被放入到系统的全局队列中和线程池中。源自并发编程:API 及挑战
dispatch_root_queue_push
DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
//...
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
//...
return _dispatch_root_queue_poke(dq, n, 0);
}
DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
//...
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
通过查看 dispatch_root_queue_push
函数可以发现,执行函数顺序为:
- _dispatch_root_queue_push
- _dispatch_root_queue_push_inline
- _dispatch_root_queue_poke
- _dispatch_root_queue_poke_slow
_dispatch_root_queue_poke_slow
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
#if !defined(_WIN32)
int r = ENOSYS;
#endif
//初始化
_dispatch_root_queues_init();
//...
//全局类型
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
//...
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
//...
}
_dispatch_root_queue_poke_slow
执行流程如下:
- 调用了
dispatch_root_queues_init()
初始化 root_queue - 判断如果是全局线程类型,调用 libpthread 中的
_workqueue_addthreads
, - remaining 递减 ,通过
do-while
使用pthread_create
方法循环创建线程.
dispatch_root_queues_init()
进入 _dispatch_root_queues_init
源码实现,发现是一个 dispatch_once_f
单例),其中传入的 func
是_dispatch_root_queues_init_once
。
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,_dispatch_root_queues_init_once);
}
_dispatch_root_queues_init_once
进入_dispatch_root_queues_init_once的源码,其内部不同事务的调用句柄都是 _dispatch_worker_thread2
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
//...
cfg->workloop_cb = _dispatch_worker_thread2
//...
}
_dispatch_worker_thread2
前往 libpthread开源库 中,搜索 _pthread_workqueue_setup
,也可以发现该函数中 将 _dispatch_worker_thread2
存储为 __libdispatch_workloopfunction
。
int
pthread_workqueue_setup(struct pthread_workqueue_config *cfg, size_t cfg_size)
{
//...
__libdispatch_workloopfunction = cfg->workloop_cb;
//...
}
搜索 __libdispatch_workloopfunction
可以发现确实在 _pthread_wqthread
有调用执行,通过注释也可以在 pthread_asm 文件中找到 _start_wqthread
与 _pthread_wqthread
的先后调用。
// workqueue entry point from kernel
void
_pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr,
void *keventlist, int flags, int nkevents)
{
//...代码省略
(*__libdispatch_workloopfunction)(kqidptr, &self->arg, &self->wq_nevents);
//...代码省略
/*
* 52858993: we should never return but the compiler insists on outlining,
* so the __builtin_trap() is in _start_wqthread in pthread_asm.s
*/
}
总结
所以,综上所述,异步函数的底层分析如下
-
_dispatch_continuation_init
处理block
,func
为_dispatch_call_block_and_release
,ctxt
为block
; -
dispatch_continuation_async
处理信息,通过 dx_push 递归,重定向到 dispatch_root_queue_push; - 通过
pthread_creat
创建线程 -
start_wqthread
->_pthread_wqthread
->_dispatch_worker_thread2
->_dispatch_root_queue_drain
- 通过 dx_invoke 找到
_dispatch_async_redirect_invoke
->_dispatch_continuation_pop
->_dispatch_client_callout
->_dispatch_call_block_and_release
调用执行 block
网友评论