dispatch_apply(批量处理)
dispatch_apply
函数按指定的次数将指定的 Block
追加到指定的 Dispatch Queue 中,并等待全部处理执行结束。
因为在 Global Dispatch Queue 中执行处理,所以各个处理的执行时间不定。但是输出结果中最后的 done
必定在最后的位置上。这是因为 dispatch_apply
函数会等待全部处理执行结束。
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_apply (10,queue,^(size_t index) {
NSLog(@"%ld",index);
});
NSLog(@"done");
dispatch_apply 实现
void
dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
{
dispatch_apply_f(iterations, dq, work,
(dispatch_apply_function_t)_dispatch_Block_invoke(work));
}
void
dispatch_apply_f(size_t iterations, dispatch_queue_t _dq, void *ctxt,
void (*func)(void *, size_t))
{
//func = (dispatch_apply_function_t)_dispatch_Block_invoke(work));
_dispatch_apply_with_attr_f(iterations, NULL, _dq, ctxt, (dispatch_function_t)func, DA_FLAG_APPLY);
}
dispatch_apply
调用 dispatch_apply_f
,并传递 _dispatch_Block_invoke
的函数指针,dispatch_apply_f
调用 _dispatch_apply_with_attr_f
并且配置 da_flags
为 DA_FLAG_APPLY
_dispatch_apply_with_attr_f
static void
_dispatch_apply_with_attr_f(size_t iterations, dispatch_apply_attr_t attr,
dispatch_queue_t _dq, void *ctxt, dispatch_function_t func, uintptr_t da_flags)
{
//配置和代码校验 ...
if (iterations < thr_cnt) {
thr_cnt = iterations;
}
struct dispatch_continuation_s dc = {
// func = (dispatch_apply_function_t)_dispatch_Block_invoke(work));
.dc_func = (void*)func,
// block 块
.dc_ctxt = ctxt,
//queue
.dc_other = dq,
//DA_FLAG_APPLY
.dc_data = (void *)da_flags,
};
//创建 dispatch_apply_t 对象 da
dispatch_apply_t da = _dispatch_apply_alloc();
//index = 0
os_atomic_init(&da->da_index, 0);
// todo = iterations
os_atomic_init(&da->da_todo, iterations);
da->da_iterations = iterations;
da->da_nested = new_nested;
da->da_thr_cnt = (int32_t)thr_cnt;
os_atomic_init(&da->da_worker_index, 0);
_dispatch_apply_da_copy_attr(da, attr);
#if DISPATCH_INTROSPECTION
//任务保存在 da 下
da->da_dc = _dispatch_continuation_alloc();
da->da_dc->dc_func = (void *) dc.dc_func;
da->da_dc->dc_ctxt = dc.dc_ctxt;
da->da_dc->dc_other = dc.dc_other;
da->da_dc->dc_data = dc.dc_data;
da->da_dc->dc_flags = DC_FLAG_ALLOCATED;
#else
da->da_dc = &dc;
#endif
da->da_flags = 0;
//...
_dispatch_apply_f(upcast(dq)._dgq, da, _dispatch_apply_invoke);
//...
}
_dispatch_apply_with_attr_f
函数流程:
- 校验参数,并将参数进行包装
dispatch_continuation_s dc
- 创建初始化
dispatch_apply_t
对象 da,并将任务包装 dc 保存至 da - 调用
_dispatch_apply_f
函数 ,传递 dq 、da 及_dispatch_apply_invoke
函数指针
_dispatch_apply_f
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_apply_f(dispatch_queue_global_t dq, dispatch_apply_t da,
dispatch_function_t func)
{
//声明
int32_t i = 0;
dispatch_continuation_t head = NULL, tail = NULL;
pthread_priority_t pp = _dispatch_get_priority();
// The current thread does not need a continuation
// iterations - 1
int32_t continuation_cnt = da->da_thr_cnt - 1;
dispatch_assert(continuation_cnt);
// for 循环遍历
for (i = 0; i < continuation_cnt; i++) {
//任务对象创建
dispatch_continuation_t next = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
//dc = next
// next->dc_flags = DA_FLAG_APPLY
// next-> dc_ctxt = da
// next->dc_func = _dispatch_apply_invoke
//对 next初始化
_dispatch_continuation_init_f(next, dq, da, func,
DISPATCH_BLOCK_HAS_PRIORITY, dc_flags);
next->dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
//下一个指向 head
next->do_next = head;
//将 head 指向 当前 next
head = next;
//类似链表
if (!tail) {
tail = next;
}
}
_dispatch_thread_event_init(&da->da_event);
// FIXME: dq may not be the right queue for the priority of `head`
_dispatch_trace_item_push_list(dq, head, tail);
//和 dispatch_async 一样 压栈 root_queue
_dispatch_root_queue_push_inline(dq, head, tail, continuation_cnt);
// Call the first element directly
_dispatch_apply_invoke_and_wait(da);
}
_dispatch_apply_f
函数流程如下:
- 声明
head
,tail
指针 -
for
循环遍历,创建dispatch_continuation_t next
,对next
进行初始化 - 通过
do_next
保存节点,类似链表,tail
为 尾节点,head
为头节点 - 通过
_dispatch_root_queue_push_inline
压栈保存,和dispatch_async
类似流程,最终通过dispatch_work_thread2
->_dispatch_root_queue_drain
->_dispatch_client_callout
->_dispatch_apply_invoke
调用
_dispatch_apply_invoke
DISPATCH_NOINLINE
void
_dispatch_apply_invoke(void *ctxt)
{
_dispatch_apply_invoke2(ctxt, 0);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_apply_invoke2(dispatch_apply_t da, long invoke_flags)
{
size_t const iter = da->da_iterations;
size_t idx, done = 0;
/* workers start over time but never quit until the job is done, so
* we can allocate an index simply by incrementing
*/
uint32_t worker_index = 0;
worker_index = os_atomic_inc_orig2o(da, da_worker_index, relaxed);
_dispatch_apply_set_attr_behavior(da->da_attr, worker_index);
idx = os_atomic_inc_orig2o(da, da_index, acquire);
if (unlikely(idx >= iter)) goto out;
/*
* da_dc is only safe to access once the 'index lock' has been acquired
* because it lives on the stack of the thread calling dispatch_apply.
*
* da lives until the last worker thread has finished (protected by
* da_thr_cnt), but da_dc only lives until the calling thread returns
* after the last work item is complete, which may be sooner than that.
* (In fact, the calling thread could do all the workitems itself and
* return before the worker threads even start.)
*
* Therefore the increment (reserving a valid workitem index from
* da_index) protects our access to da_dc.
*
* We also need an acquire barrier, and this is a good place to have one.
*/
dispatch_function_t const func = da->da_dc->dc_func;
void *const da_ctxt = da->da_dc->dc_ctxt;
uintptr_t apply_flags = (uintptr_t)da->da_dc->dc_data;
_dispatch_perfmon_workitem_dec(); // this unit executes many items
// Handle nested dispatch_apply rdar://problem/9294578
dispatch_thread_context_s apply_ctxt = {
.dtc_key = _dispatch_apply_key,
.dtc_apply_nesting = da->da_nested,
};
_dispatch_thread_context_push(&apply_ctxt);
dispatch_thread_frame_s dtf;
dispatch_priority_t old_dbp = 0;
if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
dispatch_queue_t dq = da->da_dc->dc_other;
_dispatch_thread_frame_push(&dtf, dq);
old_dbp = _dispatch_set_basepri(dq->dq_priority);
}
dispatch_invoke_flags_t flags = da->da_flags;
// Striding is the responsibility of the caller.
do {
dispatch_invoke_with_autoreleasepool(flags, {
if (apply_flags & DA_FLAG_APPLY) {
_dispatch_client_callout2(da_ctxt, idx, (dispatch_apply_function_t)func);
} else if (apply_flags & DA_FLAG_APPLY_WITH_ATTR) {
_dispatch_client_callout3_a(da_ctxt, idx, worker_index, (dispatch_apply_attr_function_t)func);
} else {
DISPATCH_INTERNAL_CRASH(apply_flags, "apply continuation has invalid flags");
}
_dispatch_perfmon_workitem_inc();
done++;
idx = os_atomic_inc_orig2o(da, da_index, relaxed);
});
} while (likely(idx < iter));
if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
_dispatch_reset_basepri(old_dbp);
_dispatch_thread_frame_pop(&dtf);
}
_dispatch_thread_context_pop(&apply_ctxt);
/* The thread that finished the last workitem wakes up the possibly waiting
* thread that called dispatch_apply. They could be one and the same.
*/
if (os_atomic_sub2o(da, da_todo, done, release) == 0) {
_dispatch_thread_event_signal(&da->da_event);
}
out:
_dispatch_apply_clear_attr_behavior(da->da_attr, worker_index);
if (invoke_flags & DISPATCH_APPLY_INVOKE_WAIT) {
_dispatch_thread_event_wait(&da->da_event);
_dispatch_thread_event_destroy(&da->da_event);
}
if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) {
_dispatch_apply_destroy(da);
}
}
_dispatch_apply_invoke
函数调用 _dispatch_apply_invoke2
,_dispatch_apply_invoke2
取出 block
任务,通过_dispatch_client_callout2
调用执行。
网友评论