美文网首页底层原理
GCD 底层源码分析(三)

GCD 底层源码分析(三)

作者: 晨曦的简书 | 来源:发表于2021-08-18 17:12 被阅读0次

    栅栏函数

    栅栏函数最直接的作用就是控制任务的执行顺序,同步。

    • dispatch_barrier_async 前面的任务执行完毕才会来到这里
    • dispatch_barrier_sync 作用相同,但是这个会堵塞线程,影响后面的任务执行
    • 非常重要的一点: 栅栏函数只能控制同一并发队列

    栅栏函数的应用

    在开发过程中我们应该都会遇到一种情况,多个请求并发执行,在请求都完成的时候,我们统一处理一件事情。下面我们看几个栅栏函数应用的案例。

    • 案例 1


    • 案例 2


    • 案例 3


    • 案例 4

    /**
     可变数组 线程不安全 解决办法
     */
    - (void)demo3{
        // 可变数组线程安全?
        dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT);
        // 多线程 操作marray
        for (int i = 0; i<1000; i++) {
            dispatch_async(concurrentQueue, ^{
                NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
                NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
                NSData *data = [NSData dataWithContentsOfURL:url];
                UIImage *image = [UIImage imageWithData:data];
                dispatch_barrier_async(concurrentQueue , ^{
                    [self.mArray addObject:image];
                });
            });
        }
    }
    

    通过上面的案例案例我们可以看到,案例 1 使用 dispatch_barrier_async,只会对相同队列的任务产生阻隔,并不会影响任务 5 的执行,但是案例 2 使用 dispatch_barrier_sync 的时候会对后面所有的任务产生阻隔,包括任务 5。案例 3 当我们改为全局并发队列的时候并没有起到阻隔作用。案例 4,当多个线程操作 marray 会不安全,这是因为对 marray 添加元素的时候,其实就是对旧值的 release,对新值的 retain,多线程的情况下就会出现两个线程同时对 marray 进行了 release ,这个时候刚好又有一个线程对 marray 进行访问的时候,就会出现坏地址的访问,所以就会出错。用栅栏函数可以解决这个问题。最后我们带着两个问题来探究一下栅栏函数的底层实现原理。

    • 栅栏函数为什么能起到阻隔作用
    • 为什全局队列下栅栏函数没起作用

    栅栏函数的底层原理

    这里我们来看一下栅栏函数的底层逻辑,首先我们在 libdispatch.dylib 源码中搜索 dispatch_barrier_async,然后我们依次跟流程 dispatch_barrier_sync -> _dispatch_barrier_sync_f -> _dispatch_barrier_sync_f_inline ,最后会来到 _dispatch_barrier_sync_f_inline 函数。

    在这里我们可以看到,这里的代码跟我们前面看的同步函数很相似,也有可能会执行 _dispatch_sync_f_slow 函数,也会出现死锁现象。只是这里多了一个 DC_FLAG_BARRIER 的标识。接着我们看下 _dispatch_sync_recurse 函数的实现。

    我们在执行栅栏函数之前,有个前提会对队列中的任务进行清空,保证队列中没有其他任务。完成之后会执行 _dispatch_sync_invoke_and_complete_recurse 函数。_dispatch_sync_invoke_and_complete_recurse -> _dispatch_sync_complete_recurse,我们跟流程会来到 _dispatch_sync_complete_recurse 函数。

    _dispatch_lane_non_barrier_complete 函数会告诉系统完成,并对状态进行修复。这里我们来看一下 dx_wakeup


    这里我们会看到自定义串行队列, 并发队列的 dq_wakeup 赋值跟全局并发队列有些区别,这里我们先看下 _dispatch_root_queue_wakeup

    接着我们进入 _dispatch_lane_barrier_complete 函数。

    _dispatch_lane_class_barrier_complete 函数这里代表对栅栏进行清空,表示栅栏函数之后的任务可以执行了。

    到这里我们知道了栅栏函数能起到任务阻隔作用的原理,但是我们还有一个问题没解决,就是全局并发队列栅栏函数无效的原因,这里我们来看一下。

    当我们搜索 _dispatch_root_queue_wakeup 的时候可以看到这里并没有对 barrier 进行处理,也就是栅栏函数无效的原因。苹果这样设计的原因是全局并发队列是全局共用的,也有可能系统任务也会用到,如果栅栏函数能生效的话就会影响这些任务的执行。

    信号量

    • dispatch_semaphore_create 创建信号量
    • dispatch_semaphore_wait 信号量等待
    • dispatch_semaphore_signal 信号量释放

    同步->当锁,控制GCD最大并发数

    信号量的使用

    • 案例 1
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
        dispatch_semaphore_t sem = dispatch_semaphore_create(2);
        
        //任务1
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            sleep(2);
            NSLog(@"执行任务1");
            NSLog(@"任务1完成");
            dispatch_semaphore_signal(sem);
        });
        
        //任务2
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            sleep(2);
            NSLog(@"执行任务2");
            NSLog(@"任务2完成");
            dispatch_semaphore_signal(sem);
        });
        
        //任务3
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            sleep(2);
            NSLog(@"执行任务3");
            NSLog(@"任务3完成");
            dispatch_semaphore_signal(sem);
        });
    
        //任务4
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
            sleep(2);
            NSLog(@"执行任务4");
            NSLog(@"任务4完成");
            dispatch_semaphore_signal(sem);
        });
    

    通过案例 1 我们可以看到,我们通过信号量设置线程的并发数为 2,通过打印可以看到,同一时间只有任务 1, 任务 2 执行了,两秒之后才执行的任务 3, 任务 4。在我们平时的开发过程中,数据上传的时候可以采用这种方式。

    • 案例 2


    通过案例 2 我们可以看到,我们通过 dispatch_semaphore_waitdispatch_semaphore_signal 结合使用能达到同步的效果,只有任务 2 完成才能执行任务 1。

    信号量底层原理

    了解了信号量的作用,那么现在我们来看一下信号量的底层原理。

    • dispatch_semaphore_create

    通过注释我们可以看到,这里讲只有 value 大于等于 0 的时候才会起作用,小于 0 就会执行。

    • dispatch_semaphore_wait
    intptr_t
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
        long value = os_atomic_dec2o(dsema, dsema_value, acquire);
        if (likely(value >= 0)) {
            return 0;
        }
        return _dispatch_semaphore_wait_slow(dsema, timeout);
    }
    

    os_atomic_dec2o 函数代表减减操作,会对 dsema_value 值减 1,因为 dsema_value 初始值为 0,所以不会走到 if (likely(value >= 0)) 判断里面,所以就会执行 _dispatch_semaphore_wait_slow 函数。

    static intptr_t
    _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
            dispatch_time_t timeout)
    {
        long orig;
    
        _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
        switch (timeout) {
        default:
            if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
                break;
            }
            // Fall through and try to undo what the fast path did to
            // dsema->dsema_value
        case DISPATCH_TIME_NOW:
            orig = dsema->dsema_value;
            while (orig < 0) {
                if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                        &orig, relaxed)) {
                    return _DSEMA4_TIMEOUT();
                }
            }
            // Another thread called semaphore_signal().
            // Fall through and drain the wakeup.
        case DISPATCH_TIME_FOREVER:
            _dispatch_sema4_wait(&dsema->dsema_sema);
            break;
        }
        return 0;
    }
    

    这里如果我们 dispatch_semaphore_wait 函数第二个参数设置的是 DISPATCH_TIME_NOW 这里就执行超时判断,如果是 DISPATCH_TIME_FOREVER 就会执行 _dispatch_sema4_wait 函数。我们常用的参数是 DISPATCH_TIME_FOREVER

    void
    _dispatch_sema4_wait(_dispatch_sema4_t *sema)
    {
        int ret = 0;
        do {
            ret = sem_wait(sema);
        } while (ret == -1 && errno == EINTR);
        DISPATCH_SEMAPHORE_VERIFY_RET(ret);
    }
    

    这里 sem_waitpthread 的下层封装,这里我们不需要太关注,到这来我们可以看到其实 dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER) 可以看做 do {} while 循环。

    • dispatch_semaphore_signal
    intptr_t
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
        long value = os_atomic_inc2o(dsema, dsema_value, release);
        if (likely(value > 0)) {
            return 0;
        }
        if (unlikely(value == LONG_MIN)) {
            DISPATCH_CLIENT_CRASH(value,
                    "Unbalanced call to dispatch_semaphore_signal()");
        }
        return _dispatch_semaphore_signal_slow(dsema);
    }
    

    这里 os_atomic_inc2o 相当于对 dsema_value 加 1,如果大于 0 就直接返回 0,如果 (likely(value > 0)) 的条件不成立,就会抛出异常,说明 dispatch_semaphore_wait 操作过多,没有跟 dispatch_semaphore_signal 一一对应。_dispatch_semaphore_signal_slow 会进行异常相关的处理,一直对 dsema_value 加 1,直到大于 0。

    调度组

    调度组最直接的作用: 控制任务执行顺序

    • dispatch_group_create 创建组
    • dispatch_group_async 进组任务
    • dispatch_group_notify 进组任务执行完毕通知
    • dispatch_group_wait 进组任务执行等待时间
    • dispatch_group_enter 进组
    • dispatch_group_leave 出组 注意搭配使用

    调度组的应用

    • 案例


    例如上面案例,我们并发请求多张图片,并把异步任务添加到调度组,在调度组内的任务都完成之后,会调用 dispatch_group_notify,我们在此进行多张图片的合成。

    调度组的底层原理

    这里我们需要带着几个问题来探究一下调度组的底层原理。

    • 进组与出组为什么要搭配使用,要先进后出

    • 调度组是如何进行流程控制的

    • 如案例所示,为什么 dispatch_group_async = dispatch_group_enter + dispatch_group_leave

    • dispatch_group_create

    dispatch_group_create(void)
    {
        return _dispatch_group_create_with_count(0);
    }
    
    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
        dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
                sizeof(struct dispatch_group_s));
        dg->do_next = DISPATCH_OBJECT_LISTLESS;
        dg->do_targetq = _dispatch_get_default_queue(false);
        if (n) {
            os_atomic_store2o(dg, dg_bits,
                    (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
            os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
        }
        return dg;
    }
    

    通过源码我们可以看到调度组的创建跟信号量的写法类似,按照信号量创建的写法又写了一套,通过 os_atomic_store2o 的形式来保存 n

    • dispatch_group_enter
    void
    dispatch_group_enter(dispatch_group_t dg)
    {
        // The value is decremented on a 32bits wide atomic so that the carry
        // for the 0 -> -1 transition is not propagated to the upper 32bits.
        uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
                DISPATCH_GROUP_VALUE_INTERVAL, acquire);
        uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
        if (unlikely(old_value == 0)) {
            _dispatch_retain(dg); // <rdar://problem/22318411>
        }
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
            DISPATCH_CLIENT_CRASH(old_bits,
                    "Too many nested calls to dispatch_group_enter()");
        }
    }
    

    这里可以看到也跟信号量类似,会对初始值进行减减,把 信号量的值变为 -1,与信号量不同的是这里没有 _dispatch_sema4_wait 函数。

    • dispatch_group_leave
    void
    dispatch_group_leave(dispatch_group_t dg)
    {
        // The value is incremented on a 64bits wide atomic so that the carry for
        // the -1 -> 0 transition increments the generation atomically.
        uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
                DISPATCH_GROUP_VALUE_INTERVAL, release);
        uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
        if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
            old_state += DISPATCH_GROUP_VALUE_INTERVAL;
            do {
                new_state = old_state;
                if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                    new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                } else {
                    // If the group was entered again since the atomic_add above,
                    // we can't clear the waiters bit anymore as we don't know for
                    // which generation the waiters are for
                    new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
                }
                if (old_state == new_state) break;
            } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                    old_state, new_state, &old_state, relaxed)));
            return _dispatch_group_wake(dg, old_state, true);
        }
    
        if (unlikely(old_value == 0)) {
            DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                    "Unbalanced call to dispatch_group_leave()");
        }
    }
    

    这里就会对原始值加 1,old_state 等于 0,然后就会走到 if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) 判断里面,然后执行 _dispatch_group_wake,这里有个细节,在这里执行 _dispatch_group_wake 函数的原因是可能异步任务比较耗时,在 dispatch_group_leave(group) 前就执行了 dispatch_group_notify,这里就是为了消除多线程的影响。

    • dispatch_group_notify
    _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dsn)
    {
        uint64_t old_state, new_state;
        dispatch_continuation_t prev;
    
        dsn->dc_data = dq;
        _dispatch_retain(dq);
    
        prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
        os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
        if (os_mpsc_push_was_empty(prev)) {
            os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
                new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
                if ((uint32_t)old_state == 0) {
                    os_atomic_rmw_loop_give_up({
                        return _dispatch_group_wake(dg, new_state, false);
                    });
                }
            });
        }
    }
    

    这里会层层判断,当 old_state == 0 的时候会执行 _dispatch_group_wake 函数,也就是同步函数或者异步函数的 weak_up 流程(block callout 流程)。

    • dispatch_group_async
    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_block_t db)
    {
        dispatch_continuation_t dc = _dispatch_continuation_alloc();
        uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
        dispatch_qos_t qos;
    
        qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
        _dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
            dispatch_continuation_t dc, dispatch_qos_t qos)
    {
        dispatch_group_enter(dg);
        dc->dc_data = dg;
        _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    

    追踪流程来到这里会执行 dispatch_group_enter(dg),那么 dispatch_group_leave 函数在哪里执行呢?我们继续跟流程。

    _dispatch_continuation_async(dispatch_queue_class_t dqu,
            dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_push(dqu, dc);
        }
    #else
        (void)dc_flags;
    #endif
        return dx_push(dqu._dq, dc, qos);
    }
    
    #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    
    DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
        .do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
        .do_dispose     = _dispatch_object_no_dispose,
        .do_debug       = _dispatch_queue_debug,
        .do_invoke      = _dispatch_object_no_invoke,
    
        .dq_activate    = _dispatch_queue_no_activate,
        .dq_wakeup      = _dispatch_root_queue_wakeup,
        .dq_push        = _dispatch_root_queue_push,
    );
    

    _dispatch_root_queue_push -> _dispatch_root_queue_push_inline -> _dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow -> _dispatch_root_queues_init -> _dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline -> _dispatch_continuation_with_group_invoke。我们一直跟流程来到 _dispatch_continuation_with_group_invoke 函数的实现。

    _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
    {
        struct dispatch_object_s *dou = dc->dc_data;
        unsigned long type = dx_type(dou);
        if (type == DISPATCH_GROUP_TYPE) {
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_trace_item_complete(dc);
            dispatch_group_leave((dispatch_group_t)dou);
        } else {
            DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
        }
    }
    

    在这里可以看到 dispatch_group_leave((dispatch_group_t)dou) 的调用,这也是为什么 dispatch_group_async = dispatch_group_enter + dispatch_group_leave 的原因。

    Dispatch_Source

    • 其 CPU 负荷非常小,尽量不占用资源
    • 联结的优势

    在任一线程上调用它的的一个函数 dispatch_source_merge_data 后,会执行 Dispatch Source 事先定义好的句柄(可以把句柄简单理解为一个 block ) 这个过程叫 Custom event ,用户事件。是 dispatch source 支持处理的一种事件。

    句柄是一种指向指针的指针 它指向的就是一个类或者结构,它和系统有很密切的关系 HINSTANCE(实例句柄),HBITMAP(位图句柄),HDC(设备表述句柄),HICON(图标句柄)等。这当中还有一个通用的句柄,就是HANDLE。

    • dispatch_source 常用函数:

    • dispatch_source_create 创建源

    • dispatch_source_set_event_handler 设置源事件回调

    • dispatch_source_merge_data 源事件设置数据
      dispatch_source_get_data 获取源事件数据

    • dispatch_resume 继续

    • dispatch_suspend 挂起

    • 案例

    - (void)viewDidLoad {
        [super viewDidLoad];
        self.totalComplete = 0;
        
        self.queue = dispatch_queue_create("chenxi.com", NULL);
        // 这里创建一个事件源
        self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
        // 这里设置一个事件句柄,相当于回调函数
        dispatch_source_set_event_handler(self.source, ^{
            
            NSLog(@"%@",[NSThread currentThread]);
            
            NSUInteger value = dispatch_source_get_data(self.source);
            self.totalComplete += value;
            NSLog(@"进度: %.2f",self.totalComplete/100.0);
            self.progressView.progress = self.totalComplete/100.0;
        });
        
        self.isRunning = YES;
        dispatch_resume(self.source);
    }
    
    // 这里通过点击事件控件 totalComplete 加操作
    - (IBAction)didClickStartOrPauseAction:(id)sender {
       
        if (self.isRunning) {
            dispatch_suspend(self.source);
            dispatch_suspend(self.queue);
            NSLog(@"已经暂停");
            self.isRunning = NO;
            [sender setTitle:@"暂停中.." forState:UIControlStateNormal];
        }else{
            dispatch_resume(self.source);
            dispatch_resume(self.queue);
            NSLog(@"已经执行了");
            self.isRunning = YES;
            [sender setTitle:@"暂停中.." forState:UIControlStateNormal];
        }
    }
    
    /**
     在这里控制循环 100 次,每次执行 dispatch_source_merge_data,事件句柄中
     dispatch_source_get_data(self.source) 能获取到 1 的值,对 self.totalComplete 进行加操作
     */
    - (void)touchesBegan:(NSSet<UITouch *> *)touches withEvent:(UIEvent *)event{
        NSLog(@"开始了");
        for (int i= 0; i<100; i++) {
            dispatch_async(self.queue, ^{
                if (!self.isRunning) {
                    NSLog(@"已经暂停");
                    return;
                }
                sleep(1);
                dispatch_source_merge_data(self.source, 1);
            });
        }
    }
    

    在这里我们可以看到 dispatch_source 的使用案例。 dispatch_source 的好处就是它是基于 pthread 的封装,不受 runloop 的影响。

    补充:可变数组不安全原因分析

    // 可变数组线程安全?
        dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT);
        // 多线程 操作marray
        for (int i = 0; i<1000; i++) {
            dispatch_async(concurrentQueue, ^{
                NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
                NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
                NSData *data = [NSData dataWithContentsOfURL:url];
                UIImage *image = [UIImage imageWithData:data];
                [self.mArray addObject:image];
            });
        }
    

    例如这段代码,我们执行的时候出现错误 ,原因就是:假如当前 mArray 的数据为 @[@(1)],当我们同时有多个线程对当前数组的第二个位置进行插入操作,就会出现同一时间对同一片内存空间进行操作的情况,就会导致错误。

    相关文章

      网友评论

        本文标题:GCD 底层源码分析(三)

        本文链接:https://www.haomeiwen.com/subject/jgimbltx.html