美文网首页
GCD ② Dispatch Sync & Dispatch A

GCD ② Dispatch Sync & Dispatch A

作者: _涼城 | 来源:发表于2022-04-10 08:11 被阅读0次

dispatch_sync 同步函数

    dispatch sync 函数。它意味着 同步 ( synchronous).也就是将指定的Block 同步追加到指定的 Dispatch Queue 中。在追加 Block 结束之前,dispatch_sync函数会一直等待。

dispatch_sync函数的处理流程

一旦调用 dispatch_sync 函数,那么在指定的处理执行结束之前,该函数不会返冋,所以也容易引起 死锁 问题。

例如,在主线程中执行以下源代码就会 死锁

dispatch_queue_t queue = dispatch_get_main_queue(); 
dispatch_sync (queue,^{
    NSLog (@"Hello?");
});

该源代码在主线程中执行指定的 Block,并等待其执行结束。而其实在主线程中正在执行这些源代码,所以无法执行追加到主线程的 Block。当然串行队列也会引起相同的问题。

dispatch_queue_t queue = dispatch_queue_create("com.example.gcd.MySerialDispatchQueue",NULL); 
dispatch_async(queue, ^{
   dispatch_sync(queue, ^{
      NSLog (@"Hello");
   });
});

dispatch_async 异步函数

    dispatch_async 函数的 async 意味 非同步(asynchronous),就是将指定的 Block 非同步地追加到指定的 Dispatch Queue 中。dispatch_async 函数不做任何等待。

dispatch_async函数的处理流程
 dispatch_async(dispatch_get_global_queue(0, 0), ^{
        //执行耗时操作
        dispatch_async(dispatch_get_main_queue(), ^{
            //回到主线程进行UI操作
        });
  });

dispatch_sync 的实现

_dispatch_sync

libdispatch 中搜索 dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) 找到源码实现调用 _dispatch_sync_f 函数,继续搜索 _dispatch_sync_f 调用 _dispatch_sync_f_inline 函数:

DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
//...
 _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
  uintptr_t dc_flags)
{
 _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

_dispatch_sync_f_inline

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
  dispatch_function_t func, uintptr_t dc_flags)
{
 if (likely(dq->dq_width == 1)) {
  return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
 }

 if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
  DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
 }

 dispatch_lane_t dl = upcast(dq)._dl;
 // Global concurrent queues and queues bound to non-dispatch threads
 // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
 if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
  return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
 }

 if (unlikely(dq->do_targetq->do_targetq)) {
  return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
 }
 _dispatch_introspection_sync_begin(dl);
 _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
   _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

_dispatch_sync_f_inline 内部实现分为两个流程:

  1. dq 为串行队列,调用 _dispatch_barrier_sync_f 函数
  2. 否则为并发队列,调用 _dispatch_sync_invoke_and_complete

_dispatch_sync_invoke_and_complete

DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
  dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
 _dispatch_trace_item_complete(dc);
 _dispatch_lane_non_barrier_complete(dq, 0);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
  dispatch_function_t func)
{
 dispatch_thread_frame_s dtf;
 _dispatch_thread_frame_push(&dtf, dq);
 _dispatch_client_callout(ctxt, func);
 _dispatch_perfmon_workitem_inc();
 _dispatch_thread_frame_pop(&dtf);
}

_dispatch_sync_invoke_and_complete 执行流程如下:

  1. _dispatch_thread_frame_push 任务压栈到队列
  2. _dispatch_client_callout 执行任务
  3. _dispatch_thread_frame_pop 出栈
  4. 调用 ·_dispatch_lane_non_barrier_complete` 完成

总结

  • 同步串行情况调用 _dispatch_barrier_sync_f 同步栅栏函数
  • 同步并发情况是先将任务 push 队列中,然后执行 block 回调,在将任务 pop,所以任务是顺序执行的。

dispatch_async 是怎样执行 block 任务的?

lldb 调试

尝试通过lldb查看堆栈信息如下:


并发线程Block任务执行堆栈查看.png

    通过上图可知, 在执行Block任务之前,是按照 start_wqthread -> _pthread_wqthread -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_async_redirect_invoke -> _dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release,最终进行调用执行 block。 那么在 dispatch_async 函数后,start_wqthread 之前,做了些什么呢?

dispatch_async 实现

    打开 libdispatch 工程后搜索 dispatch_async(dispatch_queue_t dq, dispatch_block_t work) 进行查看,可以看到整体流程分为两步:

  • _dispatch_continuation_init 任务包装函数,block 块的实参只在这里有使用;

  • dispatch_continuation_async 并发处理函数

  void
  dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
  {
      dispatch_continuation_t dc = _dispatch_continuation_alloc();
      uintptr_t dc_flags = DC_FLAG_CONSUME;
      dispatch_qos_t qos;
        //任务包装器
      qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
      // 并发处理函数
      _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
  }

_dispatch_continuation_init 任务的包装

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    void *ctxt = _dispatch_Block_copy(work);//拷贝任务

    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        // dc_flags = DC_FLAG_CONSUME
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;//赋值
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    dispatch_function_t func = _dispatch_Block_invoke(work);
    
    if (dc_flags & DC_FLAG_CONSUME) {
        //func =  _dispatch_call_block_and_release;
        func = _dispatch_call_block_and_release;
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
  dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
  dispatch_block_flags_t flags, uintptr_t dc_flags)
{
   pthread_priority_t pp = 0;
   dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
   dc->dc_func = f;
   dc->dc_ctxt = ctxt;
   // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
   // should not be propagated, only taken from the handler if it has one
   if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
    pp = _dispatch_priority_propagate();
   }
   _dispatch_continuation_voucher_set(dc, flags);
   return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

通过查看源代码得知 _dispatch_continuation_init 函数:

  • 通过 _dispatch_Block_copy 拷贝任务 , 得到 ctxt 函数指针,赋值给 dc;

  • 由于 dc_flags = DC_FLAG_CONSUME,因此 dc_flags & DC_FLAG_CONSUME 结果为真,func_dispatch_call_block_and_release

           void
        _dispatch_call_block_and_release(void *block)
        {
         void (^b)(void) = block;
         b();
         Block_release(b);
        }
    
    
  • 通过 _dispatch_continuation_init_f 方法将 最终得到的 func 以及 ctxt 进行存储。

_dispatch_continuation_async 并发处理函数

  static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
  dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
    #if DISPATCH_INTROSPECTION
     if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
      _dispatch_trace_item_push(dqu, dc);
     }
    #else
     (void)dc_flags;
    #endif
     return dx_push(dqu._dq, dc, qos);
}

     _dispatch_continuation_async 最终会调用 dx_push 函数,搜索查看 dx_push 是什么?

dx_push

    查看源码后发现最终调用 dx_push 通过工程中搜索 dx_push 其实是一个宏,实际上执行的 dq_push

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
 .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
 .do_dispose     = _dispatch_lane_dispose,
 .do_debug       = _dispatch_queue_debug,
 .do_invoke      = _dispatch_lane_invoke,

 .dq_activate    = _dispatch_lane_activate,
 .dq_wakeup      = _dispatch_lane_wakeup,
 .dq_push        = _dispatch_lane_concurrent_push,
);

     搜索 DISPATCH_VTABLE_SUBCLASS_INSTANCE 会发现 OS_OBJECT_VTABLE_SUBCLASS_INSTANCE 根据队列类型进行初始化。

#define DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, ctype, ...) \
  OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(dispatch_##name, dispatch_##ctype, \
    _dispatch_xref_dispose, _dispatch_dispose, \
    .do_kind = #name, __VA_ARGS__)

     因此得出结论 _dispatch_continuation_async 函数中 dx_push 最终调用到的是各个队列对象中的 dq_push 存储的函数,如下图。

dq_push.png

_dispatch_lane_concurrent_push

    继续以并发队列为例,在源码中搜索 _dispatch_lane_concurrent_push 找到下面代码:

void _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
  dispatch_qos_t qos)
{
 if (unlikely(_dispatch_queue_is_cooperative(dq))) {
  /* If we're here, means that we're in the simulator fallback case. We
   * still restrict what can target the cooperative thread pool */
  if (_dispatch_object_has_vtable(dou) &&
    dx_type(dou._do) != DISPATCH_SWIFT_JOB_TYPE) {
   DISPATCH_CLIENT_CRASH(dou._do, "Cannot target the cooperative global queue - not implemented");
  }
 }
 if (dq->dq_items_tail == NULL &&
   !_dispatch_object_is_waiter(dou) &&
   !_dispatch_object_is_barrier(dou) &&
   _dispatch_queue_try_acquire_async(dq)) {
  return _dispatch_continuation_redirect_push(dq, dou, qos);
 }

 _dispatch_lane_push(dq, dou, qos);
}

_dispatch_continuation_redirect_push

    在 _dispatch_lane_concurrent_push 中 会看到函数 _dispatch_continuation_redirect_push

DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
  dispatch_object_t dou, dispatch_qos_t qos)
{
   if (likely(!_dispatch_object_is_redirection(dou))) {
    dou._dc = _dispatchync_r_asedirect_wrap(dl, dou);
   } else if (!dou._dc->dc_ctxt) {
    // find first queue in descending target queue order that has
    // an autorelease frequency set, and use that as the frequency for
    // this continuation.
    dou._dc->dc_ctxt = (void *)
    (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
   }

   dispatch_queue_t dq = dl->do_targetq;
   if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
   dx_push(dq, dou, qos);
}

    通过上面源码我们可以发现,_dispatch_continuation_redirect_push 函数通过 dispatch_queue_t dq = dl->do_targetq 再次对新的队列调用 dx_push,那么 do_targetq 是什么呢?
    在 _dispatch_lane_create_with_target 函数中我们可以找到 do_targetq 是通过 _dispatch_get_root_queue 生成的,而这个函数返回的是 dispatch_queue_global_t 类型,根据 dq_push 存储的函数类型可知,调用的是 _dispatch_root_queue_push 函数。因此,dispatch_continuation_redirect_push 其实重定向到了 _dispatch_root_queue_push 函数中。

自定义队列非常强大,在自定义队列中被调度的所有 block 最终都将被放入到系统的全局队列中和线程池中。源自并发编程:API 及挑战

dispatch_root_queue_push

DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
  dispatch_qos_t qos)
{
  //...
 _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
  dispatch_object_t _head, dispatch_object_t _tail, int n)
{
  //...
  return _dispatch_root_queue_poke(dq, n, 0);
}

DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
  //...
 return _dispatch_root_queue_poke_slow(dq, n, floor);
}

通过查看 dispatch_root_queue_push 函数可以发现,执行函数顺序为:

  • _dispatch_root_queue_push
  • _dispatch_root_queue_push_inline
  • _dispatch_root_queue_poke
  • _dispatch_root_queue_poke_slow

_dispatch_root_queue_poke_slow

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
 int remaining = n;
#if !defined(_WIN32)
 int r = ENOSYS;
#endif
 //初始化
 _dispatch_root_queues_init();
 //...

//全局类型
 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
 {
  _dispatch_root_queue_debug("requesting new worker thread for global "
    "queue: %p", dq);
  r = _pthread_workqueue_addthreads(remaining,
    _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
  (void)dispatch_assume_zero(r);
  return;
 }
 //...

 do {
  _dispatch_retain(dq); // released in _dispatch_worker_thread
  while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
   if (r != EAGAIN) {
    (void)dispatch_assume_zero(r);
   }
   _dispatch_temporary_resource_shortage();
  }
 } while (--remaining);

 //...
}

_dispatch_root_queue_poke_slow 执行流程如下:

  1. 调用了 dispatch_root_queues_init()初始化 root_queue
  2. 判断如果是全局线程类型,调用 libpthread 中的 _workqueue_addthreads
  3. remaining 递减 ,通过 do-while 使用 pthread_create 方法循环创建线程.

dispatch_root_queues_init()

    进入 _dispatch_root_queues_init 源码实现,发现是一个 dispatch_once_f单例),其中传入的 func_dispatch_root_queues_init_once

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,_dispatch_root_queues_init_once);
}

_dispatch_root_queues_init_once

    进入_dispatch_root_queues_init_once的源码,其内部不同事务的调用句柄都是 _dispatch_worker_thread2

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{ 
  //...
  cfg->workloop_cb = _dispatch_worker_thread2
  //...

}

_dispatch_worker_thread2

    前往 libpthread开源库 中,搜索 _pthread_workqueue_setup,也可以发现该函数中 将 _dispatch_worker_thread2 存储为 __libdispatch_workloopfunction

int
pthread_workqueue_setup(struct pthread_workqueue_config *cfg, size_t cfg_size)
{
 //...
   __libdispatch_workloopfunction = cfg->workloop_cb;
  //...
}

    搜索 __libdispatch_workloopfunction 可以发现确实在 _pthread_wqthread 有调用执行,通过注释也可以在 pthread_asm 文件中找到 _start_wqthread_pthread_wqthread 的先后调用。

// workqueue entry point from kernel
void
_pthread_wqthread(pthread_t self, mach_port_t kport, void *stacklowaddr,
  void *keventlist, int flags, int nkevents)
{
  //...代码省略
  (*__libdispatch_workloopfunction)(kqidptr, &self->arg, &self->wq_nevents);
//...代码省略
/*
  * 52858993: we should never return but the compiler insists on outlining,
  * so the __builtin_trap() is in _start_wqthread in pthread_asm.s
  */
}

总结

所以,综上所述,异步函数的底层分析如下

  1. _dispatch_continuation_init 处理 blockfunc_dispatch_call_block_and_releasectxtblock;
  2. dispatch_continuation_async 处理信息,通过 dx_push 递归,重定向到 dispatch_root_queue_push;
  3. 通过 pthread_creat 创建线程
  4. start_wqthread -> _pthread_wqthread -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain
  5. 通过 dx_invoke 找到_dispatch_async_redirect_invoke -> _dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release 调用执行 block

相关文章

网友评论

      本文标题:GCD ② Dispatch Sync & Dispatch A

      本文链接:https://www.haomeiwen.com/subject/asxvsrtx.html