美文网首页
GCD ⑦ dispatch_apply

GCD ⑦ dispatch_apply

作者: _涼城 | 来源:发表于2022-04-15 09:37 被阅读0次

dispatch_apply(批量处理)

    dispatch_apply 函数按指定的次数将指定的 Block 追加到指定的 Dispatch Queue 中,并等待全部处理执行结束。
    因为在 Global Dispatch Queue 中执行处理,所以各个处理的执行时间不定。但是输出结果中最后的 done 必定在最后的位置上。这是因为 dispatch_apply 函数会等待全部处理执行结束。

dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_apply (10,queue,^(size_t index) {
       NSLog(@"%ld",index);
});
NSLog(@"done");

dispatch_apply 实现

void
dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
{
 dispatch_apply_f(iterations, dq, work,
   (dispatch_apply_function_t)_dispatch_Block_invoke(work));
}

void
dispatch_apply_f(size_t iterations, dispatch_queue_t _dq, void *ctxt,
  void (*func)(void *, size_t))
{
       //func = (dispatch_apply_function_t)_dispatch_Block_invoke(work));
 _dispatch_apply_with_attr_f(iterations, NULL, _dq, ctxt, (dispatch_function_t)func, DA_FLAG_APPLY);
}

     dispatch_apply 调用 dispatch_apply_f,并传递 _dispatch_Block_invoke 的函数指针,dispatch_apply_f 调用 _dispatch_apply_with_attr_f 并且配置 da_flagsDA_FLAG_APPLY

_dispatch_apply_with_attr_f

static void
_dispatch_apply_with_attr_f(size_t iterations, dispatch_apply_attr_t attr,
  dispatch_queue_t _dq, void *ctxt, dispatch_function_t func, uintptr_t da_flags)
{
  //配置和代码校验 ...
  if (iterations < thr_cnt) {
  thr_cnt = iterations;
  }
 struct dispatch_continuation_s dc = {
          // func = (dispatch_apply_function_t)_dispatch_Block_invoke(work));
  .dc_func = (void*)func,
  // block 块
  .dc_ctxt = ctxt,
  //queue
  .dc_other = dq,
  //DA_FLAG_APPLY
  .dc_data = (void *)da_flags,
 };
 //创建  dispatch_apply_t 对象 da
 dispatch_apply_t da = _dispatch_apply_alloc();
 //index = 0
 os_atomic_init(&da->da_index, 0);
 // todo =  iterations
 os_atomic_init(&da->da_todo, iterations);
 da->da_iterations = iterations;
 da->da_nested = new_nested;
 da->da_thr_cnt = (int32_t)thr_cnt;
 os_atomic_init(&da->da_worker_index, 0);
 _dispatch_apply_da_copy_attr(da, attr);
#if DISPATCH_INTROSPECTION
//任务保存在 da 下
 da->da_dc = _dispatch_continuation_alloc();
 da->da_dc->dc_func = (void *) dc.dc_func;
 da->da_dc->dc_ctxt = dc.dc_ctxt;
 da->da_dc->dc_other = dc.dc_other;
 da->da_dc->dc_data = dc.dc_data;

 da->da_dc->dc_flags = DC_FLAG_ALLOCATED;
#else
 da->da_dc = &dc;
#endif
 da->da_flags = 0;

 //...
 _dispatch_apply_f(upcast(dq)._dgq, da, _dispatch_apply_invoke);
 //...
}

_dispatch_apply_with_attr_f 函数流程:

  1. 校验参数,并将参数进行包装 dispatch_continuation_s dc
  2. 创建初始化 dispatch_apply_t 对象 da,并将任务包装 dc 保存至 da
  3. 调用 _dispatch_apply_f 函数 ,传递 dq 、da 及 _dispatch_apply_invoke 函数指针

_dispatch_apply_f

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_apply_f(dispatch_queue_global_t dq, dispatch_apply_t da,
  dispatch_function_t func)
{
       //声明
 int32_t i = 0;
 dispatch_continuation_t head = NULL, tail = NULL;
 pthread_priority_t pp = _dispatch_get_priority();

 // The current thread does not need a continuation
 // iterations - 1
 int32_t continuation_cnt = da->da_thr_cnt - 1;

 dispatch_assert(continuation_cnt);

 // for 循环遍历
 for (i = 0; i < continuation_cnt; i++) {
  //任务对象创建
  dispatch_continuation_t next = _dispatch_continuation_alloc();
  uintptr_t dc_flags = DC_FLAG_CONSUME;
  //dc = next
  // next->dc_flags = DA_FLAG_APPLY
  // next-> dc_ctxt = da
  // next->dc_func = _dispatch_apply_invoke
  //对 next初始化
  _dispatch_continuation_init_f(next, dq, da, func,
    DISPATCH_BLOCK_HAS_PRIORITY, dc_flags);
  next->dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
  //下一个指向 head
  next->do_next = head;
  //将 head 指向 当前 next 
  head = next;
  //类似链表
  if (!tail) {
   tail = next;
  }
 }

 _dispatch_thread_event_init(&da->da_event);
 // FIXME: dq may not be the right queue for the priority of `head`
 _dispatch_trace_item_push_list(dq, head, tail);
 //和 dispatch_async 一样 压栈 root_queue
 _dispatch_root_queue_push_inline(dq, head, tail, continuation_cnt);
 // Call the first element directly
 _dispatch_apply_invoke_and_wait(da);
}

_dispatch_apply_f 函数流程如下:

  1. 声明 headtail 指针
  2. for 循环遍历,创建 dispatch_continuation_t next ,对 next 进行初始化
  3. 通过 do_next 保存节点,类似链表,tail 为 尾节点, head 为头节点
  4. 通过 _dispatch_root_queue_push_inline 压栈保存,和 dispatch_async 类似流程,最终通过 dispatch_work_thread2 -> _dispatch_root_queue_drain -> _dispatch_client_callout -> _dispatch_apply_invoke 调用

_dispatch_apply_invoke

DISPATCH_NOINLINE
void
_dispatch_apply_invoke(void *ctxt)
{
 _dispatch_apply_invoke2(ctxt, 0);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_apply_invoke2(dispatch_apply_t da, long invoke_flags)
{
 size_t const iter = da->da_iterations;
 size_t idx, done = 0;

 /* workers start over time but never quit until the job is done, so
  * we can allocate an index simply by incrementing
  */
 uint32_t worker_index = 0;
 worker_index = os_atomic_inc_orig2o(da, da_worker_index, relaxed);

 _dispatch_apply_set_attr_behavior(da->da_attr, worker_index);

 idx = os_atomic_inc_orig2o(da, da_index, acquire);
 if (unlikely(idx >= iter)) goto out;
 /*
  * da_dc is only safe to access once the 'index lock' has been acquired
  * because it lives on the stack of the thread calling dispatch_apply.
  *
  * da lives until the last worker thread has finished (protected by
  * da_thr_cnt), but da_dc only lives until the calling thread returns
  * after the last work item is complete, which may be sooner than that.
  * (In fact, the calling thread could do all the workitems itself and
  * return before the worker threads even start.)
  *
  * Therefore the increment (reserving a valid workitem index from
  * da_index) protects our access to da_dc.
  *
  * We also need an acquire barrier, and this is a good place to have one.
  */
 dispatch_function_t const func = da->da_dc->dc_func;
 void *const da_ctxt = da->da_dc->dc_ctxt;
 uintptr_t apply_flags = (uintptr_t)da->da_dc->dc_data;

 _dispatch_perfmon_workitem_dec(); // this unit executes many items

 // Handle nested dispatch_apply rdar://problem/9294578
 dispatch_thread_context_s apply_ctxt = {
  .dtc_key = _dispatch_apply_key,
  .dtc_apply_nesting = da->da_nested,
 };
 _dispatch_thread_context_push(&apply_ctxt);

 dispatch_thread_frame_s dtf;
 dispatch_priority_t old_dbp = 0;
 if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
  dispatch_queue_t dq = da->da_dc->dc_other;
  _dispatch_thread_frame_push(&dtf, dq);
  old_dbp = _dispatch_set_basepri(dq->dq_priority);
 }
 dispatch_invoke_flags_t flags = da->da_flags;

 // Striding is the responsibility of the caller.
 do {
  dispatch_invoke_with_autoreleasepool(flags, {
   if (apply_flags & DA_FLAG_APPLY) {
    _dispatch_client_callout2(da_ctxt, idx, (dispatch_apply_function_t)func);
   } else if (apply_flags & DA_FLAG_APPLY_WITH_ATTR) {
    _dispatch_client_callout3_a(da_ctxt, idx, worker_index, (dispatch_apply_attr_function_t)func);
   } else {
    DISPATCH_INTERNAL_CRASH(apply_flags, "apply continuation has invalid flags");
   }
   _dispatch_perfmon_workitem_inc();
   done++;
   idx = os_atomic_inc_orig2o(da, da_index, relaxed);
  });
 } while (likely(idx < iter));

 if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
  _dispatch_reset_basepri(old_dbp);
  _dispatch_thread_frame_pop(&dtf);
 }

 _dispatch_thread_context_pop(&apply_ctxt);

 /* The thread that finished the last workitem wakes up the possibly waiting
  * thread that called dispatch_apply. They could be one and the same.
  */
 if (os_atomic_sub2o(da, da_todo, done, release) == 0) {
  _dispatch_thread_event_signal(&da->da_event);
 }
out:
 _dispatch_apply_clear_attr_behavior(da->da_attr, worker_index);

 if (invoke_flags & DISPATCH_APPLY_INVOKE_WAIT) {
  _dispatch_thread_event_wait(&da->da_event);
  _dispatch_thread_event_destroy(&da->da_event);
 }
 if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) {
  _dispatch_apply_destroy(da);
 }
}

    _dispatch_apply_invoke 函数调用 _dispatch_apply_invoke2_dispatch_apply_invoke2 取出 block 任务,通过_dispatch_client_callout2 调用执行。

相关文章

  • GCD

    GCD之dispatch_apply dispatch_apply的作用是快速迭代 dispatch_apply替...

  • GCD 队列组 常用函数

    队列组 GCD : dispatch_barrier_async GCD : dispatch_apply G...

  • GCD学习(七) dispatch_apply

    GCD学习(七) dispatch_apply dispathc_apply 是dispatch_sync 和di...

  • GCD相关方法

    1.gcd栅栏函数 2.gcd快速迭代方法(dispatch_apply)同for循环做比较。 案例:将文件夹fr...

  • GCD ⑦ dispatch_apply

    dispatch_apply(批量处理) dispatch_apply 函数按指定的次数将指定的 Block 追加...

  • GCD之dispatch_apply

    dispatch_apply该函数按指定的次数将指定的block追加到指定的dispatch queue中,并等待...

  • GCD学习 dispatch_apply

    dispathc_apply 是dispatch_sync 和dispatch_group的关联API. 它以指定...

  • GCD学习 dispatch_apply

    dispathc_apply 是dispatch_sync 和dispatch_group的关联API. 它以指定...

  • GCD(五)-dispatch_apply

    这个函数很有意思.文档注释解释说:Submits a block to a dispatch queue for ...

  • GCD中dispatch_apply使用

    dispatch_apply使用说明 dispatch_apply类似一个for循环,会在指定的dispatch ...

网友评论

      本文标题:GCD ⑦ dispatch_apply

      本文链接:https://www.haomeiwen.com/subject/sgufertx.html