GCD分析(下)

作者: 浅墨入画 | 来源:发表于2021-09-18 21:43 被阅读0次

栅栏函数的应用

栅栏函数

作用: 控制任务执行顺序,同步

  • dispatch_barrier_async 前面的任务执行完毕才会来到这里
  • dispatch_barrier_sync 作用相同,但是这个会堵塞线程,影响后面的任务执行

栅栏函数注意事项

  • 栅栏函数只能控制同一并发队列
  • 同步栅栏函数添加队列,当前线程会被锁死,直到栅栏之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行;
  • 只能是自定义并发队列,而不是全局并发队列。全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行;
  • 如果是串行队列,使用栅栏函数的作用等同于一个同步函数,没有任何意义;
并发栅栏函数异步
- (void)demo2{
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    dispatch_async(concurrentQueue, ^{
        NSLog(@"456");
    });
    /* 2. 栅栏函数 */ // - dispatch_barrier_async
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 4
    NSLog(@"**********起来干!!");
 }

// 控制台打印如下
2021-09-15 23:28:03.304006+0800 004--GCD进阶使用[77046:18719514] 123
2021-09-15 23:28:03.304019+0800 004--GCD进阶使用[77046:18719440] **********起来干!!
2021-09-15 23:28:03.304025+0800 004--GCD进阶使用[77046:18719515] 456
2021-09-15 23:28:03.304306+0800 004--GCD进阶使用[77046:18719514] ----<NSThread: 0x600002b3dd40>{number = 6, name = (null)}-----
2021-09-15 23:28:03.304475+0800 004--GCD进阶使用[77046:18719515] 加载那么多,喘口气!!!

成功达到了执行完123 456之后,再执行栅栏函数以及并发队列中的其他函数。这在开发中是比较实用的场景。

并发栅栏函数同步
- (void)demo2{
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    dispatch_async(concurrentQueue, ^{
        NSLog(@"456");
    });
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_sync(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 4
    NSLog(@"**********起来干!!");
 }

// 控制台打印如下
2021-09-15 23:33:59.188968+0800 004--GCD进阶使用[77072:18724006] 456
2021-09-15 23:33:59.188968+0800 004--GCD进阶使用[77072:18724000] 123
2021-09-15 23:33:59.189320+0800 004--GCD进阶使用[77072:18723924] ----<NSThread: 0x60000306c180>{number = 1, name = main}-----
2021-09-15 23:33:59.189484+0800 004--GCD进阶使用[77072:18723924] **********起来干!!
2021-09-15 23:33:59.189500+0800 004--GCD进阶使用[77072:18724000] 加载那么多,喘口气!!!

栅栏函数堵塞了代码块4的执行,只有当代码块1代码块2都执行完了,才会执行代码块3代码块4

全局队列栅栏函数异步
- (void)demo2{
    dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    
    dispatch_async(concurrentQueue, ^{
        NSLog(@"456");
    });
    /* 2. 栅栏函数 */
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 4
    NSLog(@"**********起来干!!");
}

// 控制台打印如下
2021-09-16 21:57:48.718673+0800 004--GCD进阶使用[80623:19179460] 456
2021-09-16 21:57:48.718685+0800 004--GCD进阶使用[80623:19179463] 123
2021-09-16 21:57:48.718696+0800 004--GCD进阶使用[80623:19179265] **********起来干!!
2021-09-16 21:57:48.718881+0800 004--GCD进阶使用[80623:19179463] 加载那么多,喘口气!!!
2021-09-16 21:57:48.718841+0800 004--GCD进阶使用[80623:19179458] ----<NSThread: 0x600001d6b880>{number = 7, name = (null)}-----

运行打印结果并发无序,不可使用全局队列

全局队列栅栏函数同步
- (void)demo2{
    dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    
    dispatch_async(concurrentQueue, ^{
        NSLog(@"456");
    });
    /* 2. 栅栏函数 */
    dispatch_barrier_sync(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 4
    NSLog(@"**********起来干!!");
}

// 控制台打印如下
2021-09-16 22:01:44.248399+0800 004--GCD进阶使用[80682:19184579] 123
2021-09-16 22:01:44.248399+0800 004--GCD进阶使用[80682:19184581] 456
2021-09-16 22:01:44.248441+0800 004--GCD进阶使用[80682:19184501] ----<NSThread: 0x600001fb81c0>{number = 1, name = main}-----
2021-09-16 22:01:44.248572+0800 004--GCD进阶使用[80682:19184501] **********起来干!!
2021-09-16 22:01:44.248584+0800 004--GCD进阶使用[80682:19184579] 加载那么多,喘口气!!!

串行栅栏函数异步串行栅栏函数同步自行打印探索

可变数组线程不安全问题
/**
 可变数组 线程不安全 解决办法
 */
- (void)demo3{
    // 可变数组线程安全?
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
    // 多线程 操作marray
    for (int i = 0; i<1000; i++) {
        dispatch_async(concurrentQueue, ^{
            NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
            NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
            NSData *data = [NSData dataWithContentsOfURL:url];
            UIImage *image = [UIImage imageWithData:data];
            // self.mArray 多线程 地址永远是一个
            // self.mArray 0 - 1 - 2 变化
            // name = kc  getter - setter (retain release)
            // self.mArray 读 - 写  self.mArray = newaRRAY (1 2)
            // 多线程 同时  写  1: (1,2)  2: (1,2,3)  3: (1,2,4)
            // 同一时间对同一片内存空间进行操作 不安全,添加栅栏函数保证线程安全
            dispatch_barrier_async(concurrentQueue , ^{
                [self.mArray addObject:image];
            });
        });
    }
}

// 控制台打印如下
2021-09-16 22:05:12.647804+0800 004--GCD进阶使用[80682:19184501] 数组的个数:1000

栅栏函数的底层原理

为什么栅栏函数可以堵塞下面的代码并且控制流程,为什么可以使用自定义并发队列而不能使用全局队列? 下面我们来探究原理

同步栅栏函数分析
  • 查看dispatch_barrier_sync方法
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
  • 查看进入 _dispatch_barrier_sync_f方法
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
  • 查看_dispatch_barrier_sync_f_inline方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

存在进入_dispatch_sync_f_slow函数的代码,证明同步栅栏函数也可能出现死锁的情况。

  • 查看_dispatch_sync_recurse方法
DISPATCH_NOINLINE
static void
_dispatch_sync_recurse(dispatch_lane_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();
    dispatch_queue_t tq = dq->do_targetq;

    do {
        // 串行
        if (likely(tq->dq_width == 1)) {
            if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
                return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq,
                        DC_FLAG_BARRIER);
            }
        } else {
        // 并发
            dispatch_queue_concurrent_t dl = upcast(tq)._dl;
            if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
                return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq, 0);
            }
        }
        tq = tq->do_targetq;
    } while (unlikely(tq->do_targetq));

    _dispatch_introspection_sync_begin(dq);
    _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags)));
}

要想dispatch_barrier_syncblock回调,必须得把dispatch_barrier_sync前面的任务全部执行完,再执行block意味着当前队列无任务

  • 查看_dispatch_sync_invoke_and_complete_recurse方法
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
        void *ctxt, dispatch_function_t func, uintptr_t dc_flags
        DISPATCH_TRACE_ARG(void *dc))
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    _dispatch_trace_item_complete(dc);
    _dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
}
  • 查看_dispatch_sync_complete_recurse方法
DISPATCH_NOINLINE
static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
        uintptr_t dc_flags)
{
    bool barrier = (dc_flags & DC_FLAG_BARRIER);
    do {
        if (dq == stop_dq) return;
        if (barrier) {
        // 存在栅栏函数,就唤醒线程
            dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
        } else {
        // 不存在栅栏函数,就往下执行
            _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
        }
        dq = dq->do_targetq;
        barrier = (dq->dq_width == 1);
    } while (unlikely(dq->do_targetq));
}
  • dx_wakeup的宏定义
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
  • 查看dq_wakeup
    image.png

串行和并发 dq_wakeup->_dispatch_lane_wakeup
全局队列 dq_wakeup->_dispatch_root_queue_wakeup

  • 查看_dispatch_lane_wakeup方法
DISPATCH_NOINLINE
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    // 如果有栅栏函数barrier,进入_dispatch_lane_barrier_complete
    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_lane_barrier_complete(dqu, qos, flags);
    }
    if (_dispatch_queue_class_probe(dqu)) {
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }
    return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
  • 查看_dispatch_lane_barrier_complete方法
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    dispatch_lane_t dq = dqu._dl;

    if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
        struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
        // 如果是串行或者barrier就等待,执行_dispatch_lane_drain_barrier_waiter
        if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
            if (_dispatch_object_is_waiter(dc)) {
                return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
            }
        // 并发队列,把所有的barrier里面的任务执行完
        } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
            return _dispatch_lane_drain_non_barriers(dq, dc, flags);
        }

        if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq);
            flags |= DISPATCH_WAKEUP_CONSUME_2;
        }
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }

    uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
            dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
    return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}

进入_dispatch_lane_class_barrier_complete方法,把所有状态清空

  • 全局队列dq_wakeup ->_dispatch_root_queue_wakeup 这里面没有对栅栏处理
DISPATCH_NOINLINE
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
        DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
    if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
        DISPATCH_INTERNAL_CRASH(dq->dq_priority,
                "Don't try to wake up or override a root queue");
    }
    if (flags & DISPATCH_WAKEUP_CONSUME_2) {
        return _dispatch_release_2_tailcall(dq);
    }
}
异步栅栏函数分析
  • 查看dispatch_barrier_async方法
#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif
  • 查看_dispatch_continuation_async方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}
  • 进入dx_push->dq_push
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
image.png
  • 并发进入_dispatch_lane_concurrent_push方法
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}
  • 是栅栏函数barrier,进入_dispatch_lane_push方法
DISPATCH_NOINLINE
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    dispatch_wakeup_flags_t flags = 0;
    struct dispatch_object_s *prev;

    if (unlikely(_dispatch_object_is_waiter(dou))) {
        return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
    }

    dispatch_assert(!_dispatch_object_is_global(dq));
    qos = _dispatch_queue_push_qos(dq, qos);

    prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
    if (unlikely(os_mpsc_push_was_empty(prev))) {
        _dispatch_retain_2_unsafe(dq);
        flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
    } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
        _dispatch_retain_2_unsafe(dq);
        flags = DISPATCH_WAKEUP_CONSUME_2;
    }
    os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
    if (flags) {
        return dx_wakeup(dq, qos, flags);
    }
}

最后走同步的流程进入dx_wakeup -> dq_wakeup

信号量使用

信号量dispatch_semaphore_t

信号量的作用一般是用来使任务同步执行,类似于互斥锁,用户可以根据需要控制GCD最大并发数

  • dispatch_semaphore_create 创建信号量
  • dispatch_semaphore_wait 信号量等待
  • dispatch_semaphore_signal 信号量释放

案例一

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(1);
    //任务1
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
        dispatch_semaphore_signal(sem); // 发信号
    });

    //任务2
    dispatch_async(queue, ^{
        sleep(2);
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem); // 发信号
    });

    //任务3
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        sleep(1);
        NSLog(@"执行任务3");
        NSLog(@"任务3完成");
        dispatch_semaphore_signal(sem);
    });
    
// 控制台打印如下
2021-09-16 23:06:01.395877+0800 006---GCD最大并发数[80966:19226077] 执行任务3
2021-09-16 23:06:01.396082+0800 006---GCD最大并发数[80966:19226077] 任务3完成
2021-09-16 23:06:01.396254+0800 006---GCD最大并发数[80966:19226075] 执行任务1
2021-09-16 23:06:01.396430+0800 006---GCD最大并发数[80966:19226075] 任务1完成
2021-09-16 23:06:02.360325+0800 006---GCD最大并发数[80966:19226072] 执行任务2
2021-09-16 23:06:02.360757+0800 006---GCD最大并发数[80966:19226072] 任务2完成

案例二

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(2);
    //任务1
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        sleep(0.5);
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
        dispatch_semaphore_signal(sem); // 发信号
    });
    //任务2
    dispatch_async(queue, ^{
        sleep(0.5);
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem); // 发信号
    });
    //任务3
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        sleep(0.5);

        NSLog(@"执行任务3");
        NSLog(@"任务3完成");
        dispatch_semaphore_signal(sem);
    });
    //任务4
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        sleep(0.5);
        NSLog(@"执行任务4");
        NSLog(@"任务4完成");
        dispatch_semaphore_signal(sem);
    });

// 控制台打印如下
2021-09-16 23:11:40.027571+0800 006---GCD最大并发数[81011:19230316] 执行任务1
2021-09-16 23:11:40.027571+0800 006---GCD最大并发数[81011:19230319] 执行任务2
2021-09-16 23:11:40.027743+0800 006---GCD最大并发数[81011:19230316] 任务1完成
2021-09-16 23:11:40.027772+0800 006---GCD最大并发数[81011:19230319] 任务2完成
2021-09-16 23:11:40.027909+0800 006---GCD最大并发数[81011:19230318] 执行任务3
2021-09-16 23:11:40.027933+0800 006---GCD最大并发数[81011:19230323] 执行任务4
2021-09-16 23:11:40.028024+0800 006---GCD最大并发数[81011:19230318] 任务3完成
2021-09-16 23:11:40.028070+0800 006---GCD最大并发数[81011:19230323] 任务4完成

dispatch_semaphore_create控制最大并发数

案例三

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(1);

    dispatch_async(queue, ^{
        dispatch_semaphore_signal(sem); // 发信号
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
    });

    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        sleep(2);
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
    });

// 控制台打印如下
2021-09-16 23:25:01.768407+0800 006---GCD最大并发数[81107:19240717] 执行任务1
2021-09-16 23:25:01.768554+0800 006---GCD最大并发数[81107:19240717] 任务1完成
2021-09-16 23:25:03.773641+0800 006---GCD最大并发数[81107:19240715] 执行任务2
2021-09-16 23:25:03.774055+0800 006---GCD最大并发数[81107:19240715] 任务2完成

案例四

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(0);

    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
    });

    dispatch_async(queue, ^{
        sleep(2);
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem); // 发信号
    });

// 控制台打印如下
2021-09-16 23:29:53.591144+0800 006---GCD最大并发数[81162:19245786] 执行任务2
2021-09-16 23:29:53.591648+0800 006---GCD最大并发数[81162:19245786] 任务2完成
2021-09-16 23:29:53.592009+0800 006---GCD最大并发数[81162:19245787] 执行任务1
2021-09-16 23:29:53.592322+0800 006---GCD最大并发数[81162:19245787] 任务1完成

信号量原理

dispatch_semaphore_wait等待,dispatch_semaphore_signal发信号,底层做了什么?

  • 查看dispatch_semaphore_create方法
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
    dispatch_semaphore_t dsema;

    // If the internal value is negative, then the absolute of the value is
    // equal to the number of waiting threads. Therefore it is bogus to
    // initialize the semaphore with a negative value.
    if (value < 0) {
        return DISPATCH_BAD_INPUT;
    }

    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
            sizeof(struct dispatch_semaphore_s));
    dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    dsema->do_targetq = _dispatch_get_default_queue(false);
    dsema->dsema_value = value;
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    dsema->dsema_orig = value;
    return dsema;
}
  1. 初始化信号量,设置GCD最大并发数
  2. 最大并发数必须>= 0
  • 查看dispatch_semaphore_wait方法
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}
  1. os_atomic_dec2o宏,进行减1操作;
  2. 若信号量>= 0,直接返回0,执行wait之后的代码;
  3. 若信号量< 0,将阻塞当前线程,进入_dispatch_semaphore_wait_slow函数;

如果创建的dispatch_semaphore_t sem = dispatch_semaphore_create(0),0-1 = -1 直接走 _dispatch_semaphore_wait_slow

  • 查看_dispatch_semaphore_wait_slow方法
DISPATCH_NOINLINE
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
        dispatch_time_t timeout)
{
    long orig;

    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    switch (timeout) {
    default:
        if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
            break;
        }
        // Fall through and try to undo what the fast path did to
        // dsema->dsema_value
    case DISPATCH_TIME_NOW:
        orig = dsema->dsema_value;
        while (orig < 0) {
            if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                    &orig, relaxed)) {
                // 超时处理
                return _DSEMA4_TIMEOUT();
            }
        }
        // Another thread called semaphore_signal().
        // Fall through and drain the wakeup.
    case DISPATCH_TIME_FOREVER:
        _dispatch_sema4_wait(&dsema->dsema_sema);
        break;
    }
    return 0;
}
  • 查看_dispatch_sema4_wait方法
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
    int ret = 0;
    do {
        ret = sem_wait(sema);
    } while (ret == -1 && errno == EINTR);
    DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}

ret == -1就一直dowhile循环卡死这里
所以应该先执行另外的线程,先执行dispatch_semaphore_signal再执行dispatch_semaphore_wait
如果创建的dispatch_semaphore_t sem = dispatch_semaphore_create(0),0+1 =1 直接return,然后执行dispatch_semaphore_wait 1-1 = 0 直接return

  • 查看dispatch_semaphore_signal方法
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    long value = os_atomic_inc2o(dsema, dsema_value, release);
    if (likely(value > 0)) {
        return 0;
    }
    if (unlikely(value == LONG_MIN)) {
        DISPATCH_CLIENT_CRASH(value,
                "Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);
}
  1. os_atomic_inc2o宏,进行加1操作;
  2. 若信号量> 0,直接返回0,继续执行后续代码;
  3. 若信号量等于LONG_MIN,抛出异常。这种情况表示wait操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow函数,进入延迟等待;

调度组的应用问题

调度组最直接的作⽤:控制任务执⾏顺序

  • dispatch_group_create:创建组
  • dispatch_group_async:进组任务
  • dispatch_group_notify:进组任务执行完毕通知
  • dispatch_group_wait:进组任务执行等待时间
  • dispatch_group_enter:进组
  • dispatch_group_leave:出组
使用场景
  • 目前有两个任务,需要等待这两个任务都执行完毕,才会更新UI,可以使用调度组
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^{
    //创建调度组
    [self.mArray addObject:@"1"];
});
dispatch_group_async(group, queue, ^{
    [self.mArray addObject:@"2"];
    
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
    NSLog(@"更新UI");
    NSLog(@"数组=%@",self.mArray);
});

// 控制台打印如下
2021-09-19 11:26:55.752125+0800 005---GCD进阶使用(下)[97084:20217983] 更新UI
2021-09-19 11:26:55.752397+0800 005---GCD进阶使用(下)[97084:20217983] 数组=(
    2,
    1
)
  • 进组出组要成对出现,否则会出现崩溃
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^{
    //创建调度组
    [self.mArray addObject:@"1"];
});
dispatch_group_async(group, queue, ^{
    [self.mArray addObject:@"2"];
    
});
// 进组和出组 成对  先进后出
dispatch_group_enter(group);
dispatch_async(queue, ^{
    [self.mArray addObject:@"3"];
    dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
   NSLog(@"数组=%@",self.mArray);
});

// 控制台打印如下
2021-09-19 11:39:55.506223+0800 005---GCD进阶使用(下)[97277:20233016] 数组=(
    2,
    1,
    3
)

调度组的原理

鉴于上面调度组的使用,这里有几个疑问?
  • 调度组是如何控制流程的?
  • 为什么会出现进组出组的搭配崩溃?
  • dispatch_group_async为什么能够起到进组出组一样的效果?dispatch_group_async等价于dispatch_group_enter + dispatch_group_leave
带着这三个疑问,我们进行调度组源码探索
创建组dispatch_group_create
  • 查看dispatch_group_create方法
dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}
  • 查看_dispatch_group_create_with_count方法,跟信号量的初始化不同(仿照信号量的写法)
DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
            sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
        os_atomic_store2o(dg, dg_bits,
                (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
        os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}

创建dispatch_group_t结构体,参数n默认传入0。

进组dispatch_group_enter
  • 查看dispatch_group_enter方法
void
dispatch_group_enter(dispatch_group_t dg)
{
    // The value is decremented on a 32bits wide atomic so that the carry
    // for the 0 -> -1 transition is not propagated to the upper 32bits.
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
            DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
        _dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
        DISPATCH_CLIENT_CRASH(old_bits,
                "Too many nested calls to dispatch_group_enter()");
    }
}
  1. 使用os_atomic_sub_orig2o宏,对dg_bits进行减1操作;
  2. old_bits只可能是-10两种可能;
  3. old_bitsDISPATCH_GROUP_VALUE_MASK进行&运算,将结果赋值给old_value
    如果old_bits为0,old_value为0,调用_dispatch_retain函数
    如果old_bits为-1,old_valueDISPATCH_GROUP_VALUE_MASK,表示进组和出组函数使用不平衡,报出异常
出组dispatch_group_leave
  • 查看dispatch_group_leave方法
void
dispatch_group_leave(dispatch_group_t dg)
{
    // The value is incremented on a 64bits wide atomic so that the carry for
    // the -1 -> 0 transition increments the generation atomically.
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
            DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
        old_state += DISPATCH_GROUP_VALUE_INTERVAL;
        do {
            new_state = old_state;
            if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            } else {
                // If the group was entered again since the atomic_add above,
                // we can't clear the waiters bit anymore as we don't know for
                // which generation the waiters are for
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            }
            if (old_state == new_state) break;
        } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                old_state, new_state, &old_state, relaxed)));
        return _dispatch_group_wake(dg, old_state, true);
    }

    if (unlikely(old_value == 0)) {
        DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                "Unbalanced call to dispatch_group_leave()");
    }
}
  1. 使用os_atomic_add_orig2o宏,进行加1操作;
  2. &运算后的旧值等于DISPATCH_GROUP_VALUE_1,等待do...while停止循环,调用_dispatch_group_wake函数;
  3. DISPATCH_GROUP_VALUE_1等同于DISPATCH_GROUP_VALUE_MASK;
  4. 如果旧值为0,表示进组和出组函数使用不平衡,报出异常;
  • 查看_dispatch_group_wake方法
DISPATCH_NOINLINE
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
        dispatch_continuation_t dc, next_dc, tail;

        // Snapshot before anything is notified/woken <rdar://problem/8554546>
        dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
        do {
            dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
            next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
            _dispatch_continuation_async(dsn_queue, dc,
                    _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
            _dispatch_release(dsn_queue);
        } while ((dc = next_dc));

        refs++;
    }

    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }

    if (refs) _dispatch_release_n(dg, refs);
}
  1. 函数的作用,唤醒dispatch_group_notify函数;
  2. 核心代码在do...while循环中,调用_dispatch_continuation_async函数。
  • 查看dispatch_group_notify方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dsn)
{
    uint64_t old_state, new_state;
    dispatch_continuation_t prev;

    dsn->dc_data = dq;
    _dispatch_retain(dq);

    prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
    os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) {
        os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
            new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
            if ((uint32_t)old_state == 0) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_group_wake(dg, new_state, false);
                });
            }
        });
    }
}
  1. 判断状态为0,调用_dispatch_group_wake函数
  2. 所以通知并不需要一直等待,因为dispatch_group_notifydispatch_group_leave中,都有_dispatch_group_wake函数的调用
  • 查看进入_dispatch_continuation_async方法,最终会调用dx_push方法
异步函数dispatch_group_async
  • 查看dispatch_group_async方法
#ifdef __BLOCKS__
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}
#endif
  • 查看_dispatch_continuation_group_async方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc, dispatch_qos_t qos)
{
    // 调用dispatch_group_enter函数,进行进组操作   0 -> -1  堵塞住
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
  • 查看_dispatch_continuation_async方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}
  • 查看dx_push -> dq_push方法
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
    .do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
    .do_dispose     = _dispatch_object_no_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_object_no_invoke,

    .dq_activate    = _dispatch_queue_no_activate,
    .dq_wakeup      = _dispatch_root_queue_wakeup,
    .dq_push        = _dispatch_root_queue_push,
);
  • 查看_dispatch_root_queue_push方法
  • 查看_dispatch_root_queue_push_inline方法
  • 查看_dispatch_root_queue_poke方法
  • 查看_dispatch_root_queue_poke_slow -> _dispatch_root_queues_init -> _dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline -> _dispatch_continuation_with_group_invoke方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        _dispatch_trace_item_complete(dc);
        dispatch_group_leave((dispatch_group_t)dou);
    } else {
        DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}

调用dispatch_group_leave函数,进行出组操作

dispatch_source

基本介绍

dispatch_source是基础数据类型,用于协调特定底层系统事件的处理

dispatch_source替代了异步回调函数,来处理系统相关的事件。当配置一个dispatch时,你需要指定监测的事件队列、以及任务回调。当事件发生时,dispatch source会提交block或函数到指定的queue去执行。

使用dispatch_source代替dispatch_async的原因在于联结的优势。
联结:在任一线程上调用它的一个函数dispatch_source_merge_data后,会执行Dispatch Source事先定义好的句柄(可以把句柄简单理解为一个block),这个过程叫Custom event用户事件。是dispatch source支持处理的一种事件。

句柄:是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有密切的关系,包含:

  • 实例句柄HINSTANCE
  • 位图句柄HBITMAP
  • 设备表句柄HDC
  • 图标句柄HICON
  • 通用句柄HANDLE

使用dispatch_source的优点:

  1. 其 CPU 负荷非常小,尽量不占用资源
  2. 联结的优势
常用函数
  • dispatch_source_create 创建源
  • dispatch_source_set_event_handler 设置源回调事件
  • dispatch_source_merge_data 源事件设置数据
  • dispatch_source_get_data 获取事件源数据
  • dispatch_resume 继续
  • dispatch_suspend 挂起
dispatch_source封装的计时器,用来控制流程
 __block int timeout=10; //倒计时时间
      dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
      self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,queue);
      dispatch_source_set_timer(self.timer ,dispatch_walltime(NULL, 0),1.0*NSEC_PER_SEC, 0); //每秒执行
      dispatch_source_set_event_handler(self.timer, ^{
          if(timeout<=0){ //倒计时结束,关闭
              dispatch_source_cancel(self.timer);
              dispatch_async(dispatch_get_main_queue(), ^{
                  //设置界面的按钮显示 根据自己需求设置
                  NSLog(@"timer 停止");
               
              });
          }else{
              int minutes = timeout / 60;
              int seconds = timeout % 60;
              NSString *strTime = [NSString stringWithFormat:@"%d分%.2d秒后重新获取验证码",minutes, seconds];
              dispatch_async(dispatch_get_main_queue(), ^{
                  //设置界面的按钮显示 根据自己需求设置
                  NSLog(@"%@",strTime);
              });
              timeout--;
                   
          }
      });
      
     //恢复 启动
    dispatch_resume(self.timer);
    //挂起
    dispatch_suspend(self.timer);

可变数组不安全分析

上面我们在学习栅栏函数时,曾遇到多线程操作可变数组导致的崩溃

dispatch_queue_t concurrentQueue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    // 多线程 操作marray
    for (int i = 0; i<1000; i++) {
        dispatch_async(concurrentQueue, ^{
            NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
            NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
            NSData *data = [NSData dataWithContentsOfURL:url];
            UIImage *image = [UIImage imageWithData:data];
            [self.mArray addObject:image];
        });
    }

崩溃原因是线程不安全self.mArray不断的读写,写的操作是把旧值release新值retain,有可能一个线程正在release另外一个线程正在读,这时候读取的是野指针。同一时间对同一片内存空间进行操作 是不安全的,为了让其安全需要加锁或者加dispatch_barrier_async

  dispatch_queue_t concurrentQueue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    // 多线程 操作marray
    for (int i = 0; i<1000; i++) {
        dispatch_async(concurrentQueue, ^{
            NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
            NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
            NSData *data = [NSData dataWithContentsOfURL:url];
            UIImage *image = [UIImage imageWithData:data];
            // 添加栅栏函数保证线程安全
            dispatch_barrier_async(concurrentQueue , ^{
                [self.mArray addObject:image];
            });
        });
    }

相关文章

网友评论

    本文标题:GCD分析(下)

    本文链接:https://www.haomeiwen.com/subject/euqxgltx.html