栅栏函数的应用
栅栏函数
作用: 控制任务执行顺序,同步
-
dispatch_barrier_async
前面的任务执行完毕才会来到这里 -
dispatch_barrier_sync
作用相同,但是这个会堵塞线程,影响后面的任务执行
栅栏函数注意事项
- 栅栏函数只能控制
同一并发队列
-
同步栅栏函数
添加队列,当前线程会被锁死
,直到栅栏之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行; - 只能是
自定义并发队列
,而不是全局并发队列。全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行; - 如果是
串行队列
,使用栅栏函数的作用等同于一个同步函数,没有任何意义;
并发栅栏函数异步
- (void)demo2{
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"123");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"456");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_async
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"----%@-----",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"加载那么多,喘口气!!!");
});
// 4
NSLog(@"**********起来干!!");
}
// 控制台打印如下
2021-09-15 23:28:03.304006+0800 004--GCD进阶使用[77046:18719514] 123
2021-09-15 23:28:03.304019+0800 004--GCD进阶使用[77046:18719440] **********起来干!!
2021-09-15 23:28:03.304025+0800 004--GCD进阶使用[77046:18719515] 456
2021-09-15 23:28:03.304306+0800 004--GCD进阶使用[77046:18719514] ----<NSThread: 0x600002b3dd40>{number = 6, name = (null)}-----
2021-09-15 23:28:03.304475+0800 004--GCD进阶使用[77046:18719515] 加载那么多,喘口气!!!
成功达到了执行完123 456
之后,再执行栅栏函数
以及并发队列
中的其他函数。这在开发中是比较实用的场景。
并发栅栏函数同步
- (void)demo2{
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"123");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"456");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@"----%@-----",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"加载那么多,喘口气!!!");
});
// 4
NSLog(@"**********起来干!!");
}
// 控制台打印如下
2021-09-15 23:33:59.188968+0800 004--GCD进阶使用[77072:18724006] 456
2021-09-15 23:33:59.188968+0800 004--GCD进阶使用[77072:18724000] 123
2021-09-15 23:33:59.189320+0800 004--GCD进阶使用[77072:18723924] ----<NSThread: 0x60000306c180>{number = 1, name = main}-----
2021-09-15 23:33:59.189484+0800 004--GCD进阶使用[77072:18723924] **********起来干!!
2021-09-15 23:33:59.189500+0800 004--GCD进阶使用[77072:18724000] 加载那么多,喘口气!!!
栅栏函数堵塞了代码块4
的执行,只有当代码块1
、代码块2
都执行完了,才会执行代码块3
、代码块4
全局队列栅栏函数异步
- (void)demo2{
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"123");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"456");
});
/* 2. 栅栏函数 */
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"----%@-----",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"加载那么多,喘口气!!!");
});
// 4
NSLog(@"**********起来干!!");
}
// 控制台打印如下
2021-09-16 21:57:48.718673+0800 004--GCD进阶使用[80623:19179460] 456
2021-09-16 21:57:48.718685+0800 004--GCD进阶使用[80623:19179463] 123
2021-09-16 21:57:48.718696+0800 004--GCD进阶使用[80623:19179265] **********起来干!!
2021-09-16 21:57:48.718881+0800 004--GCD进阶使用[80623:19179463] 加载那么多,喘口气!!!
2021-09-16 21:57:48.718841+0800 004--GCD进阶使用[80623:19179458] ----<NSThread: 0x600001d6b880>{number = 7, name = (null)}-----
运行打印结果并发无序
,不可使用全局队列
全局队列栅栏函数同步
- (void)demo2{
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"123");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"456");
});
/* 2. 栅栏函数 */
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@"----%@-----",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"加载那么多,喘口气!!!");
});
// 4
NSLog(@"**********起来干!!");
}
// 控制台打印如下
2021-09-16 22:01:44.248399+0800 004--GCD进阶使用[80682:19184579] 123
2021-09-16 22:01:44.248399+0800 004--GCD进阶使用[80682:19184581] 456
2021-09-16 22:01:44.248441+0800 004--GCD进阶使用[80682:19184501] ----<NSThread: 0x600001fb81c0>{number = 1, name = main}-----
2021-09-16 22:01:44.248572+0800 004--GCD进阶使用[80682:19184501] **********起来干!!
2021-09-16 22:01:44.248584+0800 004--GCD进阶使用[80682:19184579] 加载那么多,喘口气!!!
串行栅栏函数异步
、串行栅栏函数同步
自行打印探索
可变数组线程不安全问题
/**
可变数组 线程不安全 解决办法
*/
- (void)demo3{
// 可变数组线程安全?
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
// self.mArray 多线程 地址永远是一个
// self.mArray 0 - 1 - 2 变化
// name = kc getter - setter (retain release)
// self.mArray 读 - 写 self.mArray = newaRRAY (1 2)
// 多线程 同时 写 1: (1,2) 2: (1,2,3) 3: (1,2,4)
// 同一时间对同一片内存空间进行操作 不安全,添加栅栏函数保证线程安全
dispatch_barrier_async(concurrentQueue , ^{
[self.mArray addObject:image];
});
});
}
}
// 控制台打印如下
2021-09-16 22:05:12.647804+0800 004--GCD进阶使用[80682:19184501] 数组的个数:1000
栅栏函数的底层原理
为什么栅栏函数
可以堵塞下面的代码并且控制流程
,为什么可以使用自定义并发队列
而不能使用全局队列
? 下面我们来探究原理
同步栅栏函数分析
- 查看
dispatch_barrier_sync
方法
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
- 查看
进入 _dispatch_barrier_sync_f
方法
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
- 查看
_dispatch_barrier_sync_f_inline
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
存在进入_dispatch_sync_f_slow
函数的代码,证明同步栅栏函数
也可能出现死锁
的情况。
- 查看
_dispatch_sync_recurse
方法
DISPATCH_NOINLINE
static void
_dispatch_sync_recurse(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
dispatch_queue_t tq = dq->do_targetq;
do {
// 串行
if (likely(tq->dq_width == 1)) {
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq,
DC_FLAG_BARRIER);
}
} else {
// 并发
dispatch_queue_concurrent_t dl = upcast(tq)._dl;
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq, 0);
}
}
tq = tq->do_targetq;
} while (unlikely(tq->do_targetq));
_dispatch_introspection_sync_begin(dq);
_dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags)));
}
要想dispatch_barrier_sync
的block
回调,必须得把dispatch_barrier_sync
前面的任务全部执行完,再执行block
意味着当前队列无任务
。
- 查看
_dispatch_sync_invoke_and_complete_recurse
方法
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
void *ctxt, dispatch_function_t func, uintptr_t dc_flags
DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
}
- 查看
_dispatch_sync_complete_recurse
方法
DISPATCH_NOINLINE
static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
uintptr_t dc_flags)
{
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
if (barrier) {
// 存在栅栏函数,就唤醒线程
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
// 不存在栅栏函数,就往下执行
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq));
}
-
dx_wakeup
的宏定义
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
- 查看
dq_wakeup
image.png
串行和并发 dq_wakeup->_dispatch_lane_wakeup
全局队列 dq_wakeup->_dispatch_root_queue_wakeup
- 查看
_dispatch_lane_wakeup
方法
DISPATCH_NOINLINE
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
// 如果有栅栏函数barrier,进入_dispatch_lane_barrier_complete
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
- 查看
_dispatch_lane_barrier_complete
方法
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
// 如果是串行或者barrier就等待,执行_dispatch_lane_drain_barrier_waiter
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
// 并发队列,把所有的barrier里面的任务执行完
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
进入_dispatch_lane_class_barrier_complete
方法,把所有状态清空
- 全局队列
dq_wakeup
->_dispatch_root_queue_wakeup
这里面没有对栅栏处理
DISPATCH_NOINLINE
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
DISPATCH_INTERNAL_CRASH(dq->dq_priority,
"Don't try to wake up or override a root queue");
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
}
异步栅栏函数分析
- 查看
dispatch_barrier_async
方法
#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif
- 查看
_dispatch_continuation_async
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
- 进入
dx_push->dq_push
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

- 并发进入
_dispatch_lane_concurrent_push
方法
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
- 是栅栏函数
barrier
,进入_dispatch_lane_push
方法
DISPATCH_NOINLINE
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
dispatch_wakeup_flags_t flags = 0;
struct dispatch_object_s *prev;
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
dispatch_assert(!_dispatch_object_is_global(dq));
qos = _dispatch_queue_push_qos(dq, qos);
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2;
}
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) {
return dx_wakeup(dq, qos, flags);
}
}
最后走同步的流程
进入dx_wakeup
-> dq_wakeup
信号量使用
信号量dispatch_semaphore_t
信号量的作用一般是用来使任务同步执行
,类似于互斥锁
,用户可以根据需要控制GCD最大并发数
-
dispatch_semaphore_create
创建信号量 -
dispatch_semaphore_wait
信号量等待 -
dispatch_semaphore_signal
信号量释放
案例一
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
//任务1
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
NSLog(@"执行任务1");
NSLog(@"任务1完成");
dispatch_semaphore_signal(sem); // 发信号
});
//任务2
dispatch_async(queue, ^{
sleep(2);
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
NSLog(@"执行任务2");
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem); // 发信号
});
//任务3
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(1);
NSLog(@"执行任务3");
NSLog(@"任务3完成");
dispatch_semaphore_signal(sem);
});
// 控制台打印如下
2021-09-16 23:06:01.395877+0800 006---GCD最大并发数[80966:19226077] 执行任务3
2021-09-16 23:06:01.396082+0800 006---GCD最大并发数[80966:19226077] 任务3完成
2021-09-16 23:06:01.396254+0800 006---GCD最大并发数[80966:19226075] 执行任务1
2021-09-16 23:06:01.396430+0800 006---GCD最大并发数[80966:19226075] 任务1完成
2021-09-16 23:06:02.360325+0800 006---GCD最大并发数[80966:19226072] 执行任务2
2021-09-16 23:06:02.360757+0800 006---GCD最大并发数[80966:19226072] 任务2完成
案例二
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(2);
//任务1
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
sleep(0.5);
NSLog(@"执行任务1");
NSLog(@"任务1完成");
dispatch_semaphore_signal(sem); // 发信号
});
//任务2
dispatch_async(queue, ^{
sleep(0.5);
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
NSLog(@"执行任务2");
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem); // 发信号
});
//任务3
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(0.5);
NSLog(@"执行任务3");
NSLog(@"任务3完成");
dispatch_semaphore_signal(sem);
});
//任务4
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(0.5);
NSLog(@"执行任务4");
NSLog(@"任务4完成");
dispatch_semaphore_signal(sem);
});
// 控制台打印如下
2021-09-16 23:11:40.027571+0800 006---GCD最大并发数[81011:19230316] 执行任务1
2021-09-16 23:11:40.027571+0800 006---GCD最大并发数[81011:19230319] 执行任务2
2021-09-16 23:11:40.027743+0800 006---GCD最大并发数[81011:19230316] 任务1完成
2021-09-16 23:11:40.027772+0800 006---GCD最大并发数[81011:19230319] 任务2完成
2021-09-16 23:11:40.027909+0800 006---GCD最大并发数[81011:19230318] 执行任务3
2021-09-16 23:11:40.027933+0800 006---GCD最大并发数[81011:19230323] 执行任务4
2021-09-16 23:11:40.028024+0800 006---GCD最大并发数[81011:19230318] 任务3完成
2021-09-16 23:11:40.028070+0800 006---GCD最大并发数[81011:19230323] 任务4完成
dispatch_semaphore_create
控制最大并发数
案例三
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_async(queue, ^{
dispatch_semaphore_signal(sem); // 发信号
NSLog(@"执行任务1");
NSLog(@"任务1完成");
});
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
sleep(2);
NSLog(@"执行任务2");
NSLog(@"任务2完成");
});
// 控制台打印如下
2021-09-16 23:25:01.768407+0800 006---GCD最大并发数[81107:19240717] 执行任务1
2021-09-16 23:25:01.768554+0800 006---GCD最大并发数[81107:19240717] 任务1完成
2021-09-16 23:25:03.773641+0800 006---GCD最大并发数[81107:19240715] 执行任务2
2021-09-16 23:25:03.774055+0800 006---GCD最大并发数[81107:19240715] 任务2完成
案例四
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
NSLog(@"执行任务1");
NSLog(@"任务1完成");
});
dispatch_async(queue, ^{
sleep(2);
NSLog(@"执行任务2");
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem); // 发信号
});
// 控制台打印如下
2021-09-16 23:29:53.591144+0800 006---GCD最大并发数[81162:19245786] 执行任务2
2021-09-16 23:29:53.591648+0800 006---GCD最大并发数[81162:19245786] 任务2完成
2021-09-16 23:29:53.592009+0800 006---GCD最大并发数[81162:19245787] 执行任务1
2021-09-16 23:29:53.592322+0800 006---GCD最大并发数[81162:19245787] 任务1完成
信号量原理
dispatch_semaphore_wait
等待,dispatch_semaphore_signal
发信号,底层做了什么?
- 查看
dispatch_semaphore_create
方法
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
- 初始化信号量,设置
GCD最大并发数
; - 最大并发数必须
>= 0
;
- 查看
dispatch_semaphore_wait
方法
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
-
os_atomic_dec2o
宏,进行减1操作; - 若信号量>= 0,直接返回0,执行wait之后的代码;
- 若信号量< 0,将阻塞当前线程,进入
_dispatch_semaphore_wait_slow
函数;
如果创建的dispatch_semaphore_t sem = dispatch_semaphore_create(0)
,0-1 = -1 直接走 _dispatch_semaphore_wait_slow
- 查看
_dispatch_semaphore_wait_slow
方法
DISPATCH_NOINLINE
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
// 超时处理
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
- 查看
_dispatch_sema4_wait
方法
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
int ret = 0;
do {
ret = sem_wait(sema);
} while (ret == -1 && errno == EINTR);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
ret == -1
就一直dowhile
循环卡死这里
所以应该先执行另外的线程,先执行dispatch_semaphore_signal
再执行dispatch_semaphore_wait
如果创建的dispatch_semaphore_t sem = dispatch_semaphore_create(0)
,0+1 =1 直接return
,然后执行dispatch_semaphore_wait
1-1 = 0 直接return
- 查看
dispatch_semaphore_signal
方法
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
-
os_atomic_inc2o
宏,进行加1操作; - 若信号量> 0,直接返回0,继续执行后续代码;
- 若信号量等于
LONG_MIN
,抛出异常。这种情况表示wait操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow
函数,进入延迟等待;
调度组的应用问题
调度组最直接的作⽤:控制任务执⾏顺序
-
dispatch_group_create
:创建组 -
dispatch_group_async
:进组任务 -
dispatch_group_notify
:进组任务执行完毕通知 -
dispatch_group_wait
:进组任务执行等待时间 -
dispatch_group_enter
:进组 -
dispatch_group_leave
:出组
使用场景
- 目前有两个任务,需要等待这
两个任务都执行完毕
,才会更新UI
,可以使用调度组
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^{
//创建调度组
[self.mArray addObject:@"1"];
});
dispatch_group_async(group, queue, ^{
[self.mArray addObject:@"2"];
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"更新UI");
NSLog(@"数组=%@",self.mArray);
});
// 控制台打印如下
2021-09-19 11:26:55.752125+0800 005---GCD进阶使用(下)[97084:20217983] 更新UI
2021-09-19 11:26:55.752397+0800 005---GCD进阶使用(下)[97084:20217983] 数组=(
2,
1
)
-
进组
、出组
要成对出现,否则会出现崩溃
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^{
//创建调度组
[self.mArray addObject:@"1"];
});
dispatch_group_async(group, queue, ^{
[self.mArray addObject:@"2"];
});
// 进组和出组 成对 先进后出
dispatch_group_enter(group);
dispatch_async(queue, ^{
[self.mArray addObject:@"3"];
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"数组=%@",self.mArray);
});
// 控制台打印如下
2021-09-19 11:39:55.506223+0800 005---GCD进阶使用(下)[97277:20233016] 数组=(
2,
1,
3
)
调度组的原理
鉴于上面调度组的使用,这里有几个疑问?
- 调度组是如何控制流程的?
- 为什么会出现
进组出组
的搭配崩溃? -
dispatch_group_async
为什么能够起到进组出组
一样的效果?dispatch_group_async
等价于dispatch_group_enter
+dispatch_group_leave
带着这三个疑问,我们进行调度组源码探索
创建组dispatch_group_create
- 查看
dispatch_group_create
方法
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
- 查看
_dispatch_group_create_with_count
方法,跟信号量的初始化不同(仿照信号量的写法)
DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
创建dispatch_group_t
结构体,参数n默认传入0。
进组dispatch_group_enter
- 查看
dispatch_group_enter
方法
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
- 使用
os_atomic_sub_orig2o
宏,对dg_bits
进行减1
操作; -
old_bits
只可能是-1
和0
两种可能; -
old_bits
和DISPATCH_GROUP_VALUE_MASK
进行&运算,将结果赋值给old_value
;
如果old_bits
为0,old_value
为0,调用_dispatch_retain
函数
如果old_bits
为-1,old_value
为DISPATCH_GROUP_VALUE_MASK
,表示进组和出组函数使用不平衡,报出异常
出组dispatch_group_leave
- 查看
dispatch_group_leave
方法
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
- 使用
os_atomic_add_orig2o
宏,进行加1
操作; -
&
运算后的旧值等于DISPATCH_GROUP_VALUE_1
,等待do...while
停止循环,调用_dispatch_group_wake
函数; -
DISPATCH_GROUP_VALUE_1
等同于DISPATCH_GROUP_VALUE_MASK
; - 如果
旧值为0
,表示进组和出组函数使用不平衡,报出异常;
- 查看
_dispatch_group_wake
方法
DISPATCH_NOINLINE
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
- 函数的作用,唤醒
dispatch_group_notify
函数; - 核心代码在
do...while
循环中,调用_dispatch_continuation_async
函数。
- 查看
dispatch_group_notify
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
- 判断状态为0,调用
_dispatch_group_wake
函数 - 所以通知并不需要一直等待,因为
dispatch_group_notify
和dispatch_group_leave
中,都有_dispatch_group_wake
函数的调用
- 查看
进入_dispatch_continuation_async
方法,最终会调用dx_push
方法
异步函数dispatch_group_async
- 查看
dispatch_group_async
方法
#ifdef __BLOCKS__
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
#endif
- 查看
_dispatch_continuation_group_async
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
// 调用dispatch_group_enter函数,进行进组操作 0 -> -1 堵塞住
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
- 查看
_dispatch_continuation_async
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
- 查看
dx_push
->dq_push
方法
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
- 查看
_dispatch_root_queue_push
方法 - 查看
_dispatch_root_queue_push_inline
方法 - 查看
_dispatch_root_queue_poke
方法 - 查看
_dispatch_root_queue_poke_slow
->_dispatch_root_queues_init
->_dispatch_root_queues_init_once
->_dispatch_worker_thread2
->_dispatch_root_queue_drain
->_dispatch_continuation_pop_inline
->_dispatch_continuation_invoke_inline
->_dispatch_continuation_with_group_invoke
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
调用dispatch_group_leave
函数,进行出组
操作
dispatch_source
基本介绍
dispatch_source
是基础数据类型,用于协调特定底层系统事件
的处理
dispatch_source
替代了异步回调函数
,来处理系统相关的事件。当配置一个dispatch
时,你需要指定监测的事件
、队列
、以及任务回调
。当事件发生时,dispatch source
会提交block或函数到指定的queue
去执行。
使用dispatch_source
代替dispatch_async
的原因在于联结的优势。
联结:在任一线程上调用它的一个函数dispatch_source_merge_data
后,会执行Dispatch Source
事先定义好的句柄(可以把句柄简单理解为一个block),这个过程叫Custom event
用户事件。是dispatch source
支持处理的一种事件。
句柄:是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有密切的关系,包含:
- 实例句柄
HINSTANCE
- 位图句柄
HBITMAP
- 设备表句柄
HDC
- 图标句柄
HICON
- 通用句柄
HANDLE
使用dispatch_source的优点:
- 其 CPU 负荷非常小,尽量不占用资源
- 联结的优势
常用函数
-
dispatch_source_create
创建源 -
dispatch_source_set_event_handler
设置源回调事件 -
dispatch_source_merge_data
源事件设置数据 -
dispatch_source_get_data
获取事件源数据 -
dispatch_resume
继续 -
dispatch_suspend
挂起
dispatch_source
封装的计时器,用来控制流程
__block int timeout=10; //倒计时时间
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,queue);
dispatch_source_set_timer(self.timer ,dispatch_walltime(NULL, 0),1.0*NSEC_PER_SEC, 0); //每秒执行
dispatch_source_set_event_handler(self.timer, ^{
if(timeout<=0){ //倒计时结束,关闭
dispatch_source_cancel(self.timer);
dispatch_async(dispatch_get_main_queue(), ^{
//设置界面的按钮显示 根据自己需求设置
NSLog(@"timer 停止");
});
}else{
int minutes = timeout / 60;
int seconds = timeout % 60;
NSString *strTime = [NSString stringWithFormat:@"%d分%.2d秒后重新获取验证码",minutes, seconds];
dispatch_async(dispatch_get_main_queue(), ^{
//设置界面的按钮显示 根据自己需求设置
NSLog(@"%@",strTime);
});
timeout--;
}
});
//恢复 启动
dispatch_resume(self.timer);
//挂起
dispatch_suspend(self.timer);
可变数组不安全分析
上面我们在学习栅栏函数
时,曾遇到多线程
操作可变数组
导致的崩溃
dispatch_queue_t concurrentQueue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
[self.mArray addObject:image];
});
}
崩溃原因是线程不安全
,self.mArray
不断的读写
,写的操作是把旧值release新值retain,有可能一个线程正在release另外一个线程正在读,这时候读取的是野指针。同一时间对同一片内存空间进行操作 是不安全的,为了让其安全需要加锁
或者加dispatch_barrier_async
dispatch_queue_t concurrentQueue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
// 添加栅栏函数保证线程安全
dispatch_barrier_async(concurrentQueue , ^{
[self.mArray addObject:image];
});
});
}
网友评论