Grand Central Dispatch (GCD)
转载自:https://blog.boolchow.com/2018/04/06/iOS-Concurrency-Programming/
GCD 是基于 C 实现的一套 API,而且是开源的,如果有兴趣,可以在 这里 down 一份源码研究一下。GCD 是由系统帮我们处理多线程调度,很是方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。
在讲解之前,我们先有个概览,看一下 GCD 为我们提供了那些东西:
GCD总体结构系统所提供的 API,完全可以满足我们日常开发需求了。下面就根据这些模块分别讲解一下。
1. Dispatch Queue
GCD 为我们提供了两类队列,串行队列 和 并行队列。两者的区别是:
- 串行队列中,按照 FIFO 的顺序执行任务,前面一个任务执行完,后面一个才开始执行。
- 并行队列中,也是按照 FIFO 的顺序执行任务,只要前一个被拿去执行,继而后面一个就开始执行,后面的任务无需等到前面的任务执行完再开始执行。
除此之外,还要解释一个容易混淆的概念,并发和并行:
- 并发:是指单独部分可以同时执行,但是需要系统决定怎样发生。
- 并行:两个任务互不干扰,同时执行。单核设备,系统需要通过切换上下文来实现并发;多核设备,系统可以通过并行来执行并发任务。
最后,还有一个概念,同步和异步:
- 同步 : 同步执行的任务会阻塞当前线程。
- 异步 : 异步执行的任务不会阻塞当前线程。是否开启新的线程,由系统管理。如果当前有空闲的线程,使用当前线程执行这个异步任务;如果没有空闲的线程,而且线程数量没有达到系统最大,则开启新的线程;如果线程数量已经达到系统最大,则需要等待其他线程中任务执行完毕。
队列
我们使用时,一般使用这几个队列:
- 主队列 - dispatch_get_main_queue :一个特殊的串行队列。在 GCD 中,方法主队列中的任务都是在主线程执行。当我们更新 UI 时想 dispatch 到主线程,可以使用这个队列。
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_async(dispatch_get_main_queue(), ^{
// UI 相关操作
});
}
-
全局并行队列 - dispatch_get_global_queue : 系统提供的一个全局并行队列,我们可以通过指定参数,来获取不同优先级的队列。系统提供了四个优先级,所以也可以认为系统为我们提供了四个并行队列,分别为 :
- DISPATCH_QUEUE_PRIORITY_HIGH
- DISPATCH_QUEUE_PRIORITY_DEFAULT
- DISPATCH_QUEUE_PRIORITY_LOW
- DISPATCH_QUEUE_PRIORITY_BACKGROUND
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
dispatch_async(queue, ^{
// 相关操作
});
- 自定义队列 :你可以自己定义串行或者并行队列,来执行一些相关的任务,平时开发中也建议用自定义队列。创建自定义队列时,需要两个参数。一个是队列的名字,方便我们再调试时查找队列使用,命名方式采用的是反向 DNS 命名规则;一个是队列类型,传 NULL 或者 DISPATCH_QUEUE_SERIAL 代表串行队列,传 DISPATCH_QUEUE_CONCURRENT 代表并行队列,通常情况下,不要传 NULL,会降低可读性。
DISPATCH_QUEUE_SERIAL_INACTIVE 代表串行不活跃队列,DISPATCH_QUEUE_CONCURRENT_INACTIVE 代表并行不活跃队列,在执行 block 任务时,需要被激活。
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL);
- 你可以使用
dispatch_queue_set_specific
、dispatch_queue_get_specific
和dispatch_get_specific
方法,为 queue 设置关联的 key 或者根据 key 找到关联对象等操作。
可以说,系统为我们提供了 5 中不同的队列,运行在主线程中的 main queue;3 个不同优先级的 global queue; 一个优先级更低的 background queue。除此之外,开发者可以自定义一些串行和并行队列,这些自定义队列中被调度的所有 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:
gcd-queues同步 VS 异步
我们大多数情况下,都是使用 dispatch_asyn()
做异步操作,因为程序本来就是顺序执行,很少用到同步操作。有时候我们会把 dispatch_syn()
当做锁来用,以达到保护的作用。
系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候我们想把一些任务延后执行以下,例如 App 启动时,我想让主线程中一个耗时的工作放在后,可以尝试用一下 dispatch_asyn()
,相当于把任务重新追加到了队尾。
dispatch_async(dispatch_get_main_queue(), ^{
// 想要延后的任务
});
通常情况下,我们使用 dispatch_asyn()
是不会造成死锁的。死锁一般出现在使用 dispatch_syn()
的时候。例如:
dispatch_sync(dispatch_get_main_queue(), ^{
NSLog(@"dead lock");
});
想上面这样写,启动就会报错误。以下情况也如此:
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn");
dispatch_sync(queue, ^{
NSLog(@"dispatch asyn -> dispatch syn");
});
});
在上面的代码中,dispatch_asyn()
整个 block(称作 blcok_asyn) 当做一个任务追加到串行队列队尾,然后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn()
,想想要执行 block_syn。因为是串行队列,需要前一个执行完(block_asyn),再执行后面一个(block_syn);但是要执行完 block_asyn,需要执行内部的 block_syn。互相等待,形成死锁。
现实开发中,还有更复杂的死锁场景。不过现在编译器很友好,我们能在编译执行时就检测到了。
基本原理
针对下面这几行代码,我们分析一下它的底层过程:
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn test");
});
}
创建队列
源码很长,但实际只有一个方法,逻辑比较清晰,如下:
/** 开发者调用的方法 */
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
/** 内部实际调用方法 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// 1.初步判断
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
// 2.配置队列参数
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}
if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}
// 3. 初始化队列
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}
const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqa->dqa_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (legacy) {
dqf |= DQF_LEGACY;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
#endif
_dispatch_retain(tq);
if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}
根据代码生成的流程图,不想看代码直接看图,下同:
Create_Queue根据流程图,这个方法的步骤如下:
- 开发者调用
dispatch_queue_create()
方法之后,内部会调用_dispatch_queue_create_with_target()
方法。 - 然后进行初步判断,多数情况下,我们是不会传队列类型的,都是穿 NULL,所以这里是个 slowpath。如果传了参数,但是不是规定的队列类型,系统会认为你是个智障,并抛出错误。
- 然后初始化一些配置项。主要是 target_queue,overcommit 项和 qos。target_queue 是依赖的目标队列,像任何队列提交的任务(block),最终都会放到目标队列中执行;支持 overcommit 时,每当想队列提交一个任务时,都会开一个新的线程处理,这样是为了避免单一线程任务太多而过载;qos 是队列优先级,之前已经说过。
- 然后进入判断分支。普通的串行队列的目标队列,就是一个支持 overcommit 的全局队列(对应 else 分支);当前 tq 对象的引用计数为 DISPATCH_OBJECT_GLOBAL_REFCNT (永远不会释放)时,且还没有目标队列时,才可以设置 overcommit 项,而且当优先级为 DISPATCH_QOS_UNSPECIFIED 时,需要重置 tq (对应 if 分支);其他情况(else if 分支)。
- 然后配置队列的标识,以方便在调试时找到自己的那个队列。
- 使用
_dispatch_object_alloc
方法申请一个 dispatch_queue_t 对象空间,dq。 - 根据传入的信息(并行 or 串行;活跃 or 非活跃)来初始化这个队列。并行队列的 width 会设置为
DISPATCH_QUEUE_WIDTH_MAX
即最大,不设限;串行的会设置为 1。 - 将上面获得配置项,目标队列,是否支持 overcommit,优先级和 dq 绑定。
- 返回这个队列。返回去还输出了一句信息,便于调试。
异步执行
这个版本异步执行的代码,因为方法拆分很多,所以显得很乱。源码如下:
/** 开发者调用 */
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_async(dq, dc);
}
/** 内部调用,包一层,再深入调用 */
DISPATCH_NOINLINE
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
_dispatch_continuation_async2(dq, dc,
dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}
/** 根据 barrier 关键字区别串行还是并行,分两支 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
// 串行
return _dispatch_continuation_push(dq, dc);
}
// 并行
return _dispatch_async_f2(dq, dc);
}
/** 并行又多了一层调用,就是这个方法 */
DISPATCH_NOINLINE
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
if (slowpath(dq->dq_items_tail)) {// 少路径
return _dispatch_continuation_push(dq, dc);
}
if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
return _dispatch_continuation_push(dq, dc);
}
// 多路径
return _dispatch_async_f_redirect(dq, dc,
_dispatch_continuation_override_qos(dq, dc));
}
/** 主要用来重定向 */
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
}
dq = dq->do_targetq;
// Find the queue to redirect to
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
break;
}
if (!dou._dc->dc_ctxt) {
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
}
dq = dq->do_targetq;
}
// 同步异步最终都是调用的这个方法,将任务追加到队列中
dx_push(dq, dou, qos);
}
... 省略一些调用层级,
/** 核心方法,通过 dc_flags 参数区分了是 group,还是串行,还是并行 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
dispatch_invoke_flags_t flags)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
_dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
_dispatch_continuation_with_group_invoke(dc);
} else { // 串行
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}
不想看代码,直接看图:
Dispatch_Asyn根据流程图描述一下过程:
- 首先开发者调用
dispatch_async()
方法,然后内部创建了一个_dispatch_continuation_init
队列,将 queue、block 这些信息和这个 dc 绑定起来。这过程中 copy 了 block。 - 然后经过了几个层次的调用,主要为了区分并行还是串行。
- 如果是串行(这种情况比较常见,所以是 fastpath),直接就 dx_push 了,其实就是讲任务追加到一个链表里面。
- 如果是并行,需要做重定向。之前我们说过,放到队列中的任务,最终都会以各种形式追加到目标队列里面。在
_dispatch_async_f_redirect
方法中,重新寻找依赖目标队列,然后追加过去。 - 经过一系列调用,我们会在
_dispatch_continuation_invoke_inline
方法里区分串行还是并行。因为这个方法会被频繁调用,所以定义成了内联函数。对于串行队列,我们使用信号量控制,执行前信号量置为 wait,执行完毕后发送 singal;对于调度组,我们会在执行完之后调用dispatch_group_leave
。 - 底层的线程池,是使用 pthread 维护的,所以最终都会使用 pthread 来处理这些任务。
同步执行
同步执行,相对来说比较简单,源码如下 :
/** 开发者调用 */
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}
/** 内部调用 */
DISPATCH_NOINLINE
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
if (likely(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
return _dispatch_sync_f_slow(dq, ctxt, func, 0);
}
_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
同步执行,相对来说简单些,大体逻辑差不多。偷懒一下,就不画图了,直接描述:
- 开发者使用
dispatch_sync()
方法,大多数路径,都会调用dispatch_sync_f()
方法。 - 如果是串行队列,则通过
dispatch_barrier_sync_f()
方法来保证原子操作。 - 如果不是串行的(一般很少),我们使用
_dispatch_introspection_sync_begin
和_dispatch_sync_invoke_and_complete
来保证同步。
dispatch_after
dispatch_after 一般用于延后执行一些任务,可以用来代替 NSTimer,因为有时候 NSTimer 问题太多了。在后面的一章里,我会总体讲一下多线程中的问题,这里就不详细说了。一般我们这样来使用 dispatch_after :
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
// 2.0 second execute
});
}
在做页面过渡时,刚进入到新的页面我们并不会立即更新一些 view,为了引起用户注意,我们会过会儿再进行更新,可以中此 API 来完成。
源码如下:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
void *ctxt, void *handler, bool block)
{
dispatch_timer_source_refs_t dt;
dispatch_source_t ds;
uint64_t leeway, delta;
if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
return;
}
delta = _dispatch_timeout(when);
if (delta == 0) {
if (block) {
return dispatch_async(queue, handler);
}
return dispatch_async_f(queue, ctxt, handler);
}
leeway = delta / 10; // <rdar://problem/13447496>
if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
// this function can and should be optimized to not use a dispatch source
ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
dt = ds->ds_timer_refs;
dispatch_continuation_t dc = _dispatch_continuation_alloc();
if (block) {
_dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
} else {
_dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
}
// reference `ds` so that it doesn't show up as a leak
dc->dc_data = ds;
_dispatch_trace_continuation_push(ds->_as_dq, dc);
os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
if ((int64_t)when < 0) {
// wall clock
when = (dispatch_time_t)-((int64_t)when);
} else {
// absolute clock
dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
leeway = _dispatch_time_nano2mach(leeway);
}
dt->dt_timer.target = when;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_timer.deadline = when + leeway;
dispatch_activate(ds);
}
dispatch_after()
内部会调用 _dispatch_after()
方法,然后先判断延迟时间。如果为 DISPATCH_TIME_FOREVER
(永远不执行),则会出现异常;如果为 0 则立即执行;否则的话会创建一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。然后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。
dispatch_once
如果我们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如我们单例中经常这样用:
+ (instancetype)sharedManager {
static BLDispatchManager *sharedInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sharedInstance = [[BLDispatchManager alloc] initPrivate];
});
return sharedInstance;
}
还有在定义 NSDateFormatter
时使用:
- (NSString *)todayDateString {
static NSDateFormatter *formatter = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
formatter = [NSDateFormatter new];
formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
formatter.dateFormat = @"yyyyMMdd";
});
return [formatter stringFromDate:[NSDate date]];
}
因为这是很常用的一个代码片段,所以被加在了 Xcode 的 code snippet 中。
它的源代码如下:
/** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
typedef struct _dispatch_once_waiter_s {
volatile struct _dispatch_once_waiter_s *volatile dow_next;
dispatch_thread_event_s dow_event;
mach_port_t dow_thread;
} *_dispatch_once_waiter_t;
/** 我们调用的方法 */
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
/** 实际执行的方法 */
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if !DISPATCH_ONCE_INLINE_FASTPATH
if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
return;
}
#endif // !DISPATCH_ONCE_INLINE_FASTPATH
return dispatch_once_f_slow(val, ctxt, func);
}
DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
if (_dispatch_once_gate_tryenter(l)) {
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
} else {
_dispatch_once_gate_wait(l);
}
#else
_dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
struct _dispatch_once_waiter_s dow = { };
_dispatch_once_waiter_t tail = &dow, next, tmp;
dispatch_thread_event_t event;
if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
dow.dow_thread = _dispatch_tid_self();
_dispatch_client_callout(ctxt, func);
next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
while (next != tail) {
tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
event = &next->dow_event;
next = tmp;
_dispatch_thread_event_signal(event);
}
} else {
_dispatch_thread_event_init(&dow.dow_event);
next = *vval;
for (;;) {
if (next == DISPATCH_ONCE_DONE) {
break;
}
if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
dow.dow_thread = next->dow_thread;
dow.dow_next = next;
if (dow.dow_thread) {
pthread_priority_t pp = _dispatch_get_priority();
_dispatch_thread_override_start(dow.dow_thread, pp, val);
}
_dispatch_thread_event_wait(&dow.dow_event);
if (dow.dow_thread) {
_dispatch_thread_override_end(dow.dow_thread, val);
}
break;
}
}
_dispatch_thread_event_destroy(&dow.dow_event);
}
#endif
}
不想看代码直接看图 (emmm… 根据逻辑画完图才发现,其实这个图也挺乱的,所以我将两个主分支用不同颜色标记处理):
Dispatch_Once根据这个图,我来表述一下主要过程:
- 我们调用
dispatch_once()
方法之后,内部多数情况下会调用dispatch_once_f_slow()
方法,这个方法才是真正的执行方法。 -
os_atomic_cmpxchg(vval, NULL, tail, acquire)
这个方法,执行过程实际是这个样子
if (*vval == NULL) {
*vval = tail = &dow;
return true;
} else {
return false
}
我们初始化的 once_token,也就是 *vval 实际是 0,所以第一次执行时是返回 true 的。if() 中的这个方法是原子操作,也就是说,如果多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其他都进入 else 分支。
- 这里先说进入 true 分支。进入之后,会执行对应的 block,也就是对应的任务。然后 next 指向 *vval, *vval 标记为
DISPATCH_ONCE_DONE
,即执行的是这样一个过程:
next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
// 实际执行时这样的
next = *vval;
*vval = DISPATCH_ONCE_DONE;
-
然后
tail = &dow
。此时我们发现,原来的*vval = &dow -> next = *vval
,实际则是next = &dow
,如果没有其他线程(或者调用)进入 else 分支,&dow 实际没有改变,即tail == tmp
。此时while (tail != tmp)
是不会执行的,分支结束。 -
如果有其他线程(或者调用)进入了 else 分支,那么就已经生成了一个等待响应的链表。此时进入 &dow 已经改变,成为了链表尾部,*vval 是链表头部。进入 while 循环后,开始遍历链表,依次发送信号进行唤起。
-
然后说进入 else 分支的这些调用。进入分支后,随即进入一个死循环,直到发现 *vval 已经标记为了
DISPATCH_ONCE_DONE
才跳出循环。 -
发现 *vval 不是
DISPATCH_ONCE_DONE
之后,会将这个节点追加到链表尾部,并调用信号量的 wait 方法,等待被唤起。
以上为全部的执行过程。通过源码可以看出,使用的是 原子操作 + 信号量来保证 block 只会被执行多次,哪怕是在多线程情况下。
这样一个关于 dispatch_once
递归调用会产生死锁的现象,也就很好解释了。看下面代码:
- (void)dispatchOnceTest {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self dispatchOnceTest];
});
}
通过上面分析,在 block 执行完,并将 *vval 置为 DISPATCH_ONCE_DONE
之前,其他的调用都会进入 else 分支。第二次递归调用,信号量处于等待状态,需要等到第一个 block 执行完才能被唤起;但是第一个 block 所执行的内容就是进行第二次调用,这个任务被 wait 了,也即是说 block 永远执行不完。死锁就这样发生了。
dispatch_apply
有时候没有时序性依赖的时候,我们会用 dispatch_apply
来代替 for loop
。例如我们下载一组图片:
/** 使用 for loop */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
for (NSURL *imageURL in imageURLs) {
[self downloadImageWithURL:imageURL];
}
}
/** dispatch_apply */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_queue_t downloadQueue = dispatch_queue_create("com.bool.download", DISPATCH_QUEUE_CONCURRENT);
dispatch_apply(imageURLs.count, downloadQueue, ^(size_t index) {
NSURL *imageURL = imageURLs[index];
[self downloadImageWithURL:imageURL];
});
}
进行替换是需要注意几个问题:
- 任务之间没有时序性依赖,谁先执行都可以。
- 一般在并发队列,并发执行任务时,才替换。串行队列替换没有意义。
- 如果数组中数据很少,或者每个任务执行时间很短,替换也没有意义。强行进行并发的消耗,可能比使用 for loop 还要多,并不能得到优化。
至于原理,就不大篇幅讲了。大概是这个样子:这个方法是同步的,会阻塞当前线程,直到所有的 block 任务都完成。如果提交到并发队列,每个任务执行顺序是不一定的。
更多时候,我们执行下载任务,并不希望阻塞当前线程,这时我们可以使用 dispatch_group
。
dispatch_group
当处理批量异步任务时,dispatch_group
是一个很好的选择。针对上面说的下载图片的例子,我们可以这样做:
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_group_t taskGroup = dispatch_group_create();
dispatch_queue_t queue = dispatch_queue_create("com.bool.group", DISPATCH_QUEUE_CONCURRENT);
for (NSURL *imageURL in imageURLs) {
dispatch_group_enter(taskGroup);
// 下载方法是异步的
[self downloadImageWithURL:imageURL withQueue:queue completeHandler:^{
dispatch_group_leave(taskGroup);
}];
}
dispatch_group_notify(taskGroup, queue, ^{
// all task finish
});
/** 如果使用这个方法,内部执行异步任务,会立即到 dispatch_group_notify 方法中,因为是异步,系统认为已经执行完了。所以这个方法使用不多。
*/
dispatch_group_async(taskGroup, queue, ^{
})
}
关于原理方面,和 dispatch_async()
方法类似,前面也提到。这里只说一段代码:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc);
}
这段代码中,调用了 dispatch_group_enter(dg)
方法进行标记,最终都会和 dispatch_async()
走到同样的方法里 _dispatch_continuation_invoke_inline()
。在里面判断类型为 group,执行 task,执行结束后调用 dispatch_group_leave((dispatch_group_t)dou)
,和之前的 enter 对应。
以上是 Dispatch Queues 内容的介绍,我们平时使用 GCD 的过程中,60% 都是使用的以上内容。
2. Dispatch Block
在 iOS 8 中,Apple 为我们提供了新的 API,Dispatch Block
相关。虽然之前我们可以向 dispatch 传递 block 参数,作为任务,但是这里和之前的不一样。之前经常说,使用 NSOperation
创建的任务可以 cancel,使用 GCD 不可以。但是在 iOS 8 之后,可以 cancel 任务了。
基本使用
- 创建一个 block 并执行。
- (void)dispatchBlockTest {
// 不指定优先级
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
// 指定优先级
dispatch_block_t dsQosBlock = dispatch_block_create_with_qos_class(0, QOS_CLASS_USER_INITIATED, -1, ^{
NSLog(@"test block");
});
dispatch_async(dispatch_get_main_queue(), dsBlock);
dispatch_async(dispatch_get_main_queue(), dsQosBlock);
// 直接创建并执行
dispatch_block_perform(0, ^{
NSLog(@"test block");
});
}
- 阻塞当前任务,等 block 执行完在继续执行。
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
dispatch_async(queue, dsBlock);
// 等到 block 执行完
dispatch_block_wait(dsBlock, DISPATCH_TIME_FOREVER);
NSLog(@"block was finished");
}
- block 执行完后,收到通知,执行其他任务
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock = dispatch_block_create(0, ^{
NSLog(@"test block");
});
dispatch_async(queue, dsBlock);
// block 执行完收到通知
dispatch_block_notify(dsBlock, queue, ^{
NSLog(@"block was finished,do other thing");
});
NSLog(@"execute first");
}
- 对 block 进行 cancel 操作
- (void)dispatchBlockTest {
dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
dispatch_block_t dsBlock1 = dispatch_block_create(0, ^{
NSLog(@"test block1");
});
dispatch_block_t dsBlock2 = dispatch_block_create(0, ^{
NSLog(@"test block2");
});
dispatch_async(queue, dsBlock1);
dispatch_async(queue, dsBlock2);
// 第二个 block 将会被 cancel,不执行
dispatch_block_cancel(dsBlock2);
}
3. Dispatch Barriers
Dispatch Barriers 可以理解为调度屏障,常用于多线程并发读写操作。例如:
@interface ViewController ()
@property (nonatomic, strong) dispatch_queue_t imageQueue;
@property (nonatomic, strong) NSMutableArray *imageArray;
@end
@implementation ViewController
- (void)viewDidLoad {
[super viewDidLoad];
self.imageQueue = dispatch_queue_create("com.bool.image", DISPATCH_QUEUE_CONCURRENT);
self.imageArray = [NSMutableArray array];
}
/** 保证写入时不会有其他操作,写完之后到主线程更新 UI */
- (void)addImage:(UIImage *)image {
dispatch_barrier_async(self.imageQueue, ^{
[self.imageArray addObject:image];
dispatch_async(dispatch_get_main_queue(), ^{
// update UI
});
});
}
/** 这里的 dispatch_sync 起到了 lock 的作用 */
- (NSArray <UIImage *> *)images {
__block NSArray *imagesArray = nil;
dispatch_sync(self.imageQueue, ^{
imagesArray = [self.imageArray mutableCopy];
});
return imagesArray;
}
@end
转化成图可能好理解一些:
Dispatch_Barrierdispatch_barrier_async()
的原理和 dispatch_async()
差不多,只不过设置的 flags 不一样:
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
// 在 dispatch_async() 中只设置了 DISPATCH_OBJ_CONSUME_BIT
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_push(dq, dc);
}
后面都是 push 到队列中,然后,获取任务时一个死循环,在从队列中获取任务一个一个执行,如果判断 flag 为 barrier,终止循环,则单独执行这个任务。它后面的任务放入一个队列,等它执行完了再开始执行。
DISPATCH_ALWAYS_INLINE
static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
...
for (;;) {
...
first_iteration:
dq_state = os_atomic_load(&dq->dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
break;
}
if (unlikely(orig_tq != dq->do_targetq)) {
break;
}
if (serial_drain || _dispatch_object_is_barrier(dc)) {
if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_IN_BARRIER;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned = 0;
dic->dic_deferred = dc;
goto out_with_deferred;
}
} else {
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// we just ran barrier work items, we have to make their
// effect visible to other sync work items on other threads
// that may start coming in after this point, hence the
// release barrier
os_atomic_xor2o(dq, dq_state, owned, release);
owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
} else if (unlikely(owned == 0)) {
if (_dispatch_object_is_sync_waiter(dc)) {
// sync "readers" don't observe the limit
_dispatch_queue_reserve_sync_width(dq);
} else if (!_dispatch_queue_try_acquire_async(dq)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_sync_waiter_redirect_or_wake(dq,
DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
continue;
}
...
}
4. Dispatch Source
关于 dispatch_source
我们使用的少之又少,他是 BSD 系统内核功能的包装,经常用来监测某些事件发生。例如监测断点的使用和取消。[这里][https://developer.apple.com/documentation/dispatch/dispatch_source_type_constants?language=objc] 介绍了可以监测的事件:
- DISPATCH_SOURCE_TYPE_DATA_ADD : 自定义事件
- DISPATCH_SOURCE_TYPE_DATA_OR : 自定义事件
- DISPATCH_SOURCE_TYPE_MACH_RECV : MACH 端口接收事件
- DISPATCH_SOURCE_TYPE_MACH_SEND : MACH 端口发送事件
- DISPATCH_SOURCE_TYPE_PROC : 进程相关事件
- DISPATCH_SOURCE_TYPE_READ : 文件读取事件
- DISPATCH_SOURCE_TYPE_SIGNAL : 信号相关事件
- DISPATCH_SOURCE_TYPE_TIMER : 定时器相关事件
- DISPATCH_SOURCE_TYPE_VNODE : 文件属性修改事件
- DISPATCH_SOURCE_TYPE_WRITE : 文件写入事件
- DISPATCH_SOURCE_TYPE_MEMORYPRESSURE : 内存压力事件
例如我们可以通过下面代码,来监测断点的使用和取消:
@interface ViewController ()
@property (nonatomic, strong) dispatch_source_t signalSource;
@property (nonatomic, assign) dispatch_once_t signalOnceToken;
@end
@implementation ViewController
- (void)viewDidLoad {
dispatch_once(&_signalOnceToken, ^{
dispatch_queue_t queue = dispatch_get_main_queue();
self.signalSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGSTOP, 0, queue);
if (self.signalSource) {
dispatch_source_set_event_handler(self.signalSource, ^{
// 点击一下断点,再取消断点,便会执行这里。
NSLog(@"debug test");
});
dispatch_resume(self.signalSource);
}
});
}
还有 diapatch_after()
就是依赖 dispatch_source()
来实现的。我们可以自己实现一个类似的定时器:
- (void)customTimer {
dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 5.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 5);
dispatch_source_set_event_handler(timerSource, ^{
NSLog(@"dispatch source timer");
});
self.signalSource = timerSource;
dispatch_resume(self.signalSource);
}
基本原理
使用 dispatch_source
时,大致过程是这样的:我们创建一个 source,然后加到队列中,并调用 dispatch_resume()
方法,便会从队列中唤起 source,执行对应的 block。下面是一个详细的流程图,我们结合这张图来说一下:
- 创建一个 source 对象,过程和创建 queue 类似,所以后面一些操作,和操作 queue 很类似。
dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
unsigned long mask, dispatch_queue_t dq)
{
dispatch_source_refs_t dr;
dispatch_source_t ds;
dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}
// 申请内存空间
ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
sizeof(struct dispatch_source_s));
// 初始化一个队列,然后配置参数,完全被当做一个 queue 来处理
_dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
ds->dq_label = "source";
ds->do_ref_cnt++; // the reference the manager queue holds
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);
if (slowpath(!dq)) {
dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
ds->do_targetq = dq;
if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
_dispatch_source_set_interval(ds, handle);
}
_dispatch_object_debug(ds, "%s", __func__);
return ds;
}
-
设置 event_handler。从源码中看出,用的是
dispatch_continuation_t
进行绑定,和之前绑定 queue 一样,将 block copy 了一份。后面执行的时候,拿出来用。然后将这个任务 push 到队列里。void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
dispatch_continuation_t dc;
// 这里实际就是在初始化 dispatch_continuation_t
dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
// 经过一顿操作,将任务 push 到队列中。
_dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
} -
调用 resume 方法,执行 source。一般新创建的都是暂停状态,这里判断是暂停状态,就开始唤起。
void
dispatch_resume(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
if (dx_vtable(dou._do)->do_suspend) {
dx_vtable(dou._do)->do_resume(dou._do, false);
}
}
- 最后一步,是最核心的异步,唤起任务开始执行。之前的 queue 最终也是走到这样类似的一步,可以看返回类型都是
dispatch_queue_wakeup_target_t
,基本是沿着 queue 的逻辑一路 copy 过来。这个方法,经过一系列判断,保证所有的 source 都会在正确的队列上面执行;如果队列和任务不对应,那么就返回正确的队列,重新派发让任务在正确的队列上执行。
DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_wakeup_target_t
_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned)
{
dispatch_source_t ds = dou._ds;
dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
// 获取当前 queue
dispatch_queue_t dq = _dispatch_queue_get_current();
dispatch_source_refs_t dr = ds->ds_refs;
dispatch_queue_flags_t dqf;
...
// timer 事件处理
if (dr->du_is_timer &&
os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
// timer has to be configured on the kevent queue
if (dq != dkq) {
return dkq;
}
_dispatch_source_timer_configure(ds);
}
}
// 是否安装 source
if (!ds->ds_is_installed) {
// The source needs to be installed on the kevent queue.
if (dq != dkq) {
return dkq;
}
_dispatch_source_install(ds, _dispatch_get_wlh(),
_dispatch_get_basepri());
}
// 是否暂停,因为之前判断过,一般不可能走到这里
if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
// Source suspended by an item drained from the source queue.
return ds->do_targetq;
}
// 是否在
if (_dispatch_source_get_registration_handler(dr)) {
// The source has been registered and the registration handler needs
// to be delivered on the target queue.
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
// clears ds_registration_handler
_dispatch_source_registration_callout(ds, dq, flags);
}
...
if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
os_atomic_load2o(ds, ds_pending_data, relaxed)) {
// 有些 source 还有未完成的数据,需要通过目标队列上的回调进行传送;有些 source 则需要切换到管理队列上去。
if (dq == ds->do_targetq) {
_dispatch_source_latch_and_call(ds, dq, flags);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
prevent_starvation = dq->do_targetq ||
!(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
if (prevent_starvation &&
os_atomic_load2o(ds, ds_pending_data, relaxed)) {
retq = ds->do_targetq;
}
} else {
return ds->do_targetq;
}
}
if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
// 已经被取消的 source 需要从管理队列中卸载。卸载完成后,取消的 handler 需要交付到目标队列。
if (!(dqf & DSF_DELETED)) {
if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
// timers can cheat if not armed because there's nothing left
// to do on the manager queue and unregistration can happen
// on the regular target queue
} else if (dq != dkq) {
return dkq;
}
_dispatch_source_refs_unregister(ds, 0);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
if (!(dqf & DSF_ARMED)) {
goto unregister_event;
}
// we need to wait for the EV_DELETE
return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
}
}
if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
_dispatch_source_get_cancel_handler(dr) ||
_dispatch_source_get_registration_handler(dr))) {
retq = ds->do_targetq;
} else {
_dispatch_source_cancel_callout(ds, dq, flags);
dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
}
prevent_starvation = false;
}
if (_dispatch_unote_needs_rearm(dr) &&
!(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
// 需要在管理队列进行 rearm 的
if (dq != dkq) {
return dkq;
}
if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
// 如果我们可以直接注销,不需要 resume
goto unregister_event;
}
if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
// 如果 source 已经暂停,不需要在管理队列 rearm
return ds->do_targetq;
}
if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
return ds->do_targetq;
}
if (unlikely(!_dispatch_source_refs_resume(ds))) {
goto unregister_event;
}
if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
_dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
}
}
return retq;
}
5. Dispatch I/O
我们可以使用 Dispatch I/O 快速读取一些文件,例如这样 :
- (void)readFile {
NSString *filePath = @"/.../青花瓷.m";
dispatch_queue_t queue = dispatch_queue_create("com.bool.readfile", DISPATCH_QUEUE_SERIAL);
dispatch_fd_t fd = open(filePath.UTF8String, O_RDONLY,0);
dispatch_io_t fileChannel = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
close(fd);
});
NSMutableData *fileData = [NSMutableData new];
dispatch_io_set_low_water(fileChannel, SIZE_MAX);
dispatch_io_read(fileChannel, 0, SIZE_MAX, queue, ^(bool done, dispatch_data_t _Nullable data, int error) {
if (error == 0 && dispatch_data_get_size(data) > 0) {
[fileData appendData:(NSData *)data];
}
if (done) {
NSString *str = [[NSString alloc] initWithData:fileData encoding:NSUTF8StringEncoding];
NSLog(@"read file completed, string is :\n %@",str);
}
});
}
输出结果:
ConcurrencyTest[41479:5357296] read file completed, string is :
天青色等烟雨 而我在等你
月色被打捞起 晕开了结局
如果读取大文件,我们可以进行切片读取,将文件分割多个片,放在异步线程中并发执行,这样会比较快一些。
关于源码,简单看了一下,调度逻辑和之前的任务类似。然后读写操作,是调用的一些底层接口实现,这里就偷懒一下不详细说了。使用 Dispatch I/O,多数情况下是为了并发读取一个大文件,提高读取速度。
6. Other
上面已经讲了概览图中的大部分东西,还有一些未讲述,这里简单描述一下:
-
dispatch_object。GCD 用 C 函数实现的对象,不能通过集成 dispatch 类实现,也不能用 alloc 方法初始化。GCD 针对 dispatch_object 提供了一些接口,我们使用这些接口可以处理一些内存事件、取消和暂停操作、定义上下文和处理日志相关工作。dispatch_object 必须要手动管理内存,不遵循垃圾回收机制。
-
dispatch_time。在 GCD 中使用的时间对象,可以创建自定义时间,也可以使用
DISPATCH_TIME_NOW
、DISPATCH_TIME_FOREVER
这两个系统给出的时间。
以上为 GCD 相关知识,这次使用的源码版本为最新版本 —— 912.30.4.tar.gz,和之前看的版本代码差距很大,因为代码量的增加,新版本代码比较乱,不过基本原理还是差不多的。曾经我一度认为,最上面的是最新版本…
网友评论