栅栏函数
栅栏函数最直接的作用就是控制任务的执行顺序,同步。
- dispatch_barrier_async 前面的任务执行完毕才会来到这里
- dispatch_barrier_sync 作用相同,但是这个会堵塞线程,影响后面的任务执行
- 非常重要的一点: 栅栏函数只能控制同一并发队列
栅栏函数的应用
在开发过程中我们应该都会遇到一种情况,多个请求并发执行,在请求都完成的时候,我们统一处理一件事情。下面我们看几个栅栏函数应用的案例。
-
案例 1
-
案例 2
-
案例 3
-
案例 4
/**
可变数组 线程不安全 解决办法
*/
- (void)demo3{
// 可变数组线程安全?
dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
dispatch_barrier_async(concurrentQueue , ^{
[self.mArray addObject:image];
});
});
}
}
通过上面的案例案例我们可以看到,案例 1 使用 dispatch_barrier_async
,只会对相同队列的任务产生阻隔,并不会影响任务 5 的执行,但是案例 2 使用 dispatch_barrier_sync
的时候会对后面所有的任务产生阻隔,包括任务 5。案例 3 当我们改为全局并发队列的时候并没有起到阻隔作用。案例 4,当多个线程操作 marray
会不安全,这是因为对 marray
添加元素的时候,其实就是对旧值的 release
,对新值的 retain
,多线程的情况下就会出现两个线程同时对 marray
进行了 release
,这个时候刚好又有一个线程对 marray
进行访问的时候,就会出现坏地址的访问,所以就会出错。用栅栏函数可以解决这个问题。最后我们带着两个问题来探究一下栅栏函数的底层实现原理。
- 栅栏函数为什么能起到阻隔作用
- 为什全局队列下栅栏函数没起作用
栅栏函数的底层原理
这里我们来看一下栅栏函数的底层逻辑,首先我们在 libdispatch.dylib
源码中搜索 dispatch_barrier_async
,然后我们依次跟流程 dispatch_barrier_sync
-> _dispatch_barrier_sync_f
-> _dispatch_barrier_sync_f_inline
,最后会来到 _dispatch_barrier_sync_f_inline
函数。
在这里我们可以看到,这里的代码跟我们前面看的同步函数很相似,也有可能会执行 _dispatch_sync_f_slow
函数,也会出现死锁现象。只是这里多了一个 DC_FLAG_BARRIER
的标识。接着我们看下 _dispatch_sync_recurse
函数的实现。
我们在执行栅栏函数之前,有个前提会对队列中的任务进行清空,保证队列中没有其他任务。完成之后会执行 _dispatch_sync_invoke_and_complete_recurse
函数。_dispatch_sync_invoke_and_complete_recurse
-> _dispatch_sync_complete_recurse
,我们跟流程会来到 _dispatch_sync_complete_recurse
函数。
_dispatch_lane_non_barrier_complete
函数会告诉系统完成,并对状态进行修复。这里我们来看一下 dx_wakeup
。
这里我们会看到自定义串行队列, 并发队列的 dq_wakeup
赋值跟全局并发队列有些区别,这里我们先看下 _dispatch_root_queue_wakeup
。
接着我们进入 _dispatch_lane_barrier_complete
函数。
_dispatch_lane_class_barrier_complete
函数这里代表对栅栏进行清空,表示栅栏函数之后的任务可以执行了。
到这里我们知道了栅栏函数能起到任务阻隔作用的原理,但是我们还有一个问题没解决,就是全局并发队列栅栏函数无效的原因,这里我们来看一下。
当我们搜索 _dispatch_root_queue_wakeup
的时候可以看到这里并没有对 barrier
进行处理,也就是栅栏函数无效的原因。苹果这样设计的原因是全局并发队列是全局共用的,也有可能系统任务也会用到,如果栅栏函数能生效的话就会影响这些任务的执行。
信号量
- dispatch_semaphore_create 创建信号量
- dispatch_semaphore_wait 信号量等待
- dispatch_semaphore_signal 信号量释放
同步->当锁,控制GCD最大并发数
信号量的使用
- 案例 1
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(2);
//任务1
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(2);
NSLog(@"执行任务1");
NSLog(@"任务1完成");
dispatch_semaphore_signal(sem);
});
//任务2
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(2);
NSLog(@"执行任务2");
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem);
});
//任务3
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(2);
NSLog(@"执行任务3");
NSLog(@"任务3完成");
dispatch_semaphore_signal(sem);
});
//任务4
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(2);
NSLog(@"执行任务4");
NSLog(@"任务4完成");
dispatch_semaphore_signal(sem);
});
通过案例 1 我们可以看到,我们通过信号量设置线程的并发数为 2,通过打印可以看到,同一时间只有任务 1, 任务 2 执行了,两秒之后才执行的任务 3, 任务 4。在我们平时的开发过程中,数据上传的时候可以采用这种方式。
-
案例 2
通过案例 2 我们可以看到,我们通过 dispatch_semaphore_wait
跟 dispatch_semaphore_signal
结合使用能达到同步的效果,只有任务 2 完成才能执行任务 1。
信号量底层原理
了解了信号量的作用,那么现在我们来看一下信号量的底层原理。
dispatch_semaphore_create
通过注释我们可以看到,这里讲只有 value
大于等于 0 的时候才会起作用,小于 0 就会执行。
dispatch_semaphore_wait
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
os_atomic_dec2o
函数代表减减操作,会对 dsema_value
值减 1,因为 dsema_value
初始值为 0,所以不会走到 if (likely(value >= 0))
判断里面,所以就会执行 _dispatch_semaphore_wait_slow
函数。
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
这里如果我们 dispatch_semaphore_wait
函数第二个参数设置的是 DISPATCH_TIME_NOW
这里就执行超时判断,如果是 DISPATCH_TIME_FOREVER
就会执行 _dispatch_sema4_wait
函数。我们常用的参数是 DISPATCH_TIME_FOREVER
。
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
int ret = 0;
do {
ret = sem_wait(sema);
} while (ret == -1 && errno == EINTR);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
这里 sem_wait
是 pthread
的下层封装,这里我们不需要太关注,到这来我们可以看到其实 dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER)
可以看做 do {} while
循环。
dispatch_semaphore_signal
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
这里 os_atomic_inc2o
相当于对 dsema_value
加 1,如果大于 0 就直接返回 0,如果 (likely(value > 0))
的条件不成立,就会抛出异常,说明 dispatch_semaphore_wait
操作过多,没有跟 dispatch_semaphore_signal
一一对应。_dispatch_semaphore_signal_slow
会进行异常相关的处理,一直对 dsema_value
加 1,直到大于 0。
调度组
调度组最直接的作用: 控制任务执行顺序
- dispatch_group_create 创建组
- dispatch_group_async 进组任务
- dispatch_group_notify 进组任务执行完毕通知
- dispatch_group_wait 进组任务执行等待时间
- dispatch_group_enter 进组
- dispatch_group_leave 出组 注意搭配使用
调度组的应用
-
案例
例如上面案例,我们并发请求多张图片,并把异步任务添加到调度组,在调度组内的任务都完成之后,会调用 dispatch_group_notify
,我们在此进行多张图片的合成。
调度组的底层原理
这里我们需要带着几个问题来探究一下调度组的底层原理。
-
进组与出组为什么要搭配使用,要先进后出
-
调度组是如何进行流程控制的
-
如案例所示,为什么
dispatch_group_async
=dispatch_group_enter
+dispatch_group_leave
-
dispatch_group_create
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
通过源码我们可以看到调度组的创建跟信号量的写法类似,按照信号量创建的写法又写了一套,通过 os_atomic_store2o
的形式来保存 n
。
dispatch_group_enter
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
这里可以看到也跟信号量类似,会对初始值进行减减,把 信号量的值变为 -1,与信号量不同的是这里没有 _dispatch_sema4_wait
函数。
dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
这里就会对原始值加 1,old_state
等于 0,然后就会走到 if (unlikely(old_value == DISPATCH_GROUP_VALUE_1))
判断里面,然后执行 _dispatch_group_wake
,这里有个细节,在这里执行 _dispatch_group_wake
函数的原因是可能异步任务比较耗时,在 dispatch_group_leave(group)
前就执行了 dispatch_group_notify
,这里就是为了消除多线程的影响。
dispatch_group_notify
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
这里会层层判断,当 old_state == 0
的时候会执行 _dispatch_group_wake
函数,也就是同步函数或者异步函数的 weak_up
流程(block callout
流程)。
dispatch_group_async
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
追踪流程来到这里会执行 dispatch_group_enter(dg)
,那么 dispatch_group_leave
函数在哪里执行呢?我们继续跟流程。
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
_dispatch_root_queue_push
-> _dispatch_root_queue_push_inline
-> _dispatch_root_queue_poke
-> _dispatch_root_queue_poke_slow
-> _dispatch_root_queues_init
-> _dispatch_root_queues_init_once
-> _dispatch_worker_thread2
-> _dispatch_root_queue_drain
-> _dispatch_continuation_pop_inline
-> _dispatch_continuation_invoke_inline
-> _dispatch_continuation_with_group_invoke
。我们一直跟流程来到 _dispatch_continuation_with_group_invoke
函数的实现。
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
在这里可以看到 dispatch_group_leave((dispatch_group_t)dou)
的调用,这也是为什么 dispatch_group_async
= dispatch_group_enter
+ dispatch_group_leave
的原因。
Dispatch_Source
- 其 CPU 负荷非常小,尽量不占用资源
- 联结的优势
在任一线程上调用它的的一个函数 dispatch_source_merge_data 后,会执行 Dispatch Source 事先定义好的句柄(可以把句柄简单理解为一个 block ) 这个过程叫 Custom event ,用户事件。是 dispatch source 支持处理的一种事件。
句柄是一种指向指针的指针 它指向的就是一个类或者结构,它和系统有很密切的关系 HINSTANCE(实例句柄),HBITMAP(位图句柄),HDC(设备表述句柄),HICON(图标句柄)等。这当中还有一个通用的句柄,就是HANDLE。
-
dispatch_source
常用函数: -
dispatch_source_create 创建源
-
dispatch_source_set_event_handler 设置源事件回调
-
dispatch_source_merge_data 源事件设置数据
dispatch_source_get_data 获取源事件数据 -
dispatch_resume 继续
-
dispatch_suspend 挂起
-
案例
- (void)viewDidLoad {
[super viewDidLoad];
self.totalComplete = 0;
self.queue = dispatch_queue_create("chenxi.com", NULL);
// 这里创建一个事件源
self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
// 这里设置一个事件句柄,相当于回调函数
dispatch_source_set_event_handler(self.source, ^{
NSLog(@"%@",[NSThread currentThread]);
NSUInteger value = dispatch_source_get_data(self.source);
self.totalComplete += value;
NSLog(@"进度: %.2f",self.totalComplete/100.0);
self.progressView.progress = self.totalComplete/100.0;
});
self.isRunning = YES;
dispatch_resume(self.source);
}
// 这里通过点击事件控件 totalComplete 加操作
- (IBAction)didClickStartOrPauseAction:(id)sender {
if (self.isRunning) {
dispatch_suspend(self.source);
dispatch_suspend(self.queue);
NSLog(@"已经暂停");
self.isRunning = NO;
[sender setTitle:@"暂停中.." forState:UIControlStateNormal];
}else{
dispatch_resume(self.source);
dispatch_resume(self.queue);
NSLog(@"已经执行了");
self.isRunning = YES;
[sender setTitle:@"暂停中.." forState:UIControlStateNormal];
}
}
/**
在这里控制循环 100 次,每次执行 dispatch_source_merge_data,事件句柄中
dispatch_source_get_data(self.source) 能获取到 1 的值,对 self.totalComplete 进行加操作
*/
- (void)touchesBegan:(NSSet<UITouch *> *)touches withEvent:(UIEvent *)event{
NSLog(@"开始了");
for (int i= 0; i<100; i++) {
dispatch_async(self.queue, ^{
if (!self.isRunning) {
NSLog(@"已经暂停");
return;
}
sleep(1);
dispatch_source_merge_data(self.source, 1);
});
}
}
在这里我们可以看到 dispatch_source
的使用案例。 dispatch_source
的好处就是它是基于 pthread
的封装,不受 runloop
的影响。
补充:可变数组不安全原因分析
// 可变数组线程安全?
dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
[self.mArray addObject:image];
});
}
例如这段代码,我们执行的时候出现错误 ,原因就是:假如当前 mArray
的数据为 @[@(1)]
,当我们同时有多个线程对当前数组的第二个位置进行插入操作,就会出现同一时间对同一片内存空间进行操作的情况,就会导致错误。
网友评论