美文网首页iOS
dispatch_group

dispatch_group

作者: 码代码的鱼 | 来源:发表于2022-03-10 14:55 被阅读0次

    源码解析:https://www.jianshu.com/p/e93fd15d93d3?ivk_sa=1024320u

    常用方法

    1dispatch_group_async

    //将block加入到queue中,并和group相关联

    void dispatch_group_async(dispatch_group_t group,

                              dispatch_queue_t queue,

                              dispatch_block_t block); 

    如果提交到dispatch_queue_t queue中与group关联的任务全都执行完毕会调用dispatch_group_notify并且dispatch_group_wait会停止等待;

    2dispatch_group_enter & dispatch_group_leave

    //将代码块加入到group中,与dispatch_group_async功能类似

    //开始一个任务

    void dispatch_group_enter(dispatch_group_t group);

    //结束的任务

    void dispatch_group_leave(dispatch_group_t group);

    当调用enter时信号量-1,调用leave时信号量+1,当计数回复原值时会调用dispatch_group_notify并且dispatch_group_wait会停止等待;

    注意点: dispatch_group_enter 和 dispatch_group_leave 必须成对出现

    3dispatch_group_wait

    //阻塞当前线程(不能放在主线程)等待group任务的完成,可以设置超时时间

    long dispatch_group_wait(dispatch_group_t group, 

                              dispatch_time_t timeout);

    4 dispatch_group_notify

    /*  dispatch_group_notify会把block加入到queue中, 

      *  当dispatch_group_t中的任务执行完成时执行

      *  或者在dispatch_group_enter和dispatch_group_leave存在的情况下,计数为0的时候执行

    */

    void dispatch_group_notify(dispatch_group_t group,

                                dispatch_queue_t queue,

                                dispatch_block_t block);

    dipatch_group_notify和dispatch_group_wait都能实现等待group中的任务执行完之后再进行其他操作;不同的是dispatch_group_notify可以将要执行的任务放到block中,不会阻塞当前线程,dispatch_group_wait会阻塞当前线程

    5、dispatch_apply:需要反复执行某个任务时使用,会阻塞当前线程

    需要注意的点⚠️

    1、dispatch_group_enter 和 dispatch_group_leave 必须成对出现

    如果dispatch_group_enter比dispatch_group_leave多,不会走到唤醒逻辑,dispatch_group_notify中的任务无法执行或者dispatch_group_wait收不到信号而卡住线程。如果是dispatch_group_leave多,则会引起崩溃。

    2、 dispatch_group_wait(group, DISPATCH_TIME_FOREVER);

    dispatch_group_wait 会阻塞当前线程,不要放在主线程使用

    3、dispatch_group_async内部会加dispatch_group_enter 和dispatch_group_leave,但是block内部进行异步操作,会出现notify没有在想要的时机执行的问题

    如下面两张图,图1正常,但图2时机错误

    可以改用dispatch_group_enter & dispatch_group_leave


    下面看下dispatch_group的实现原理

    dispatch_group的实现是基于dispatch_semaphore实现的

    dispatch_group_create

    dispatch_group_t

    dispatch_group_create(void)

    {

        return (dispatch_group_t)dispatch_semaphore_create(LONG_MAX);

    }

    dispatch_group_create其实就是创建了一个value为LONG_MAX的dispatch_semaphore信号量

    dispatch_group_async

    void

    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,

            dispatch_block_t db)

    {

        dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db),

                _dispatch_call_block_and_release);

    }

    dispatch_group_async只是dispatch_group_async_f的封装

    dispatch_group_async_f

    void

    dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,

            dispatch_function_t func)

    {

        dispatch_continuation_t dc;

        _dispatch_retain(dg);

        dispatch_group_enter(dg);

        dc = fastpath(_dispatch_continuation_alloc_cacheonly());

        if (!dc) {

            dc = _dispatch_continuation_alloc_from_heap();

        }

        dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);

        dc->dc_func = func;

        dc->dc_ctxt = ctxt;

        dc->dc_group = dg;

        // No fastpath/slowpath hint because we simply don't know

        if (dq->dq_width != 1 && dq->do_targetq) {

            return _dispatch_async_f2(dq, dc);

        }

        _dispatch_queue_push(dq, dc);

    }

    从上面的代码我们可以看出dispatch_group_async_f和dispatch_async_f相似。dispatch_group_async_f多了dispatch_group_enter(dg);,另外在do_vtable的赋值中dispatch_group_async_f多了一个DISPATCH_OBJ_GROUP_BIT的标记符。既然添加了dispatch_group_enter必定会存在dispatch_group_leave。_dispatch_continuation_pop函数的源码中有一段代码如下:

        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);

        if (dg) {

            //group需要进行调用dispatch_group_leave并释放信号

            dispatch_group_leave(dg);

            _dispatch_release(dg);

        }

    所以dispatch_group_async_f函数中的dispatch_group_leave是在_dispatch_continuation_pop函数中调用的。

    这里概括一下dispatch_group_async_f的工作流程:

    调用dispatch_group_enter;

    将block和queue等信息记录到dispatch_continuation_t结构体中,并将它加入到group的链表中;

    _dispatch_continuation_pop执行时会判断任务是否为group,是的话执行完任务再调用dispatch_group_leave以达到信号量的平衡。

    dispatch_group_async_f内部会加dispatch_group_enter 和dispatch_group_leave,但是内部进行异步操作,会出现notify没有在想要的时机执行的问题

    dispatch_group_enter

    void

    dispatch_group_enter(dispatch_group_t dg)

    {

        dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

        (void)dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);

    }

    dispatch_group_enter将dispatch_group_t转换成dispatch_semaphore_t,并调用dispatch_semaphore_wait,原子性减1后,进入等待状态直到有信号唤醒。所以说dispatch_group_enter就是对dispatch_semaphore_wait的封装。

    dispatch_group_leave

    void

    dispatch_group_leave(dispatch_group_t dg)

    {

        dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

        dispatch_atomic_release_barrier();

        long value = dispatch_atomic_inc2o(dsema, dsema_value);//dsema_value原子性加1

        if (slowpath(value == LONG_MIN)) {//内存溢出,由于dispatch_group_leave在dispatch_group_enter之前调用

            DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_group_leave()");

        }

        if (slowpath(value == dsema->dsema_orig)) {//表示所有任务已经完成,唤醒group

            (void)_dispatch_group_wake(dsema);

        }

    }

    从上面的源代码中我们看到dispatch_group_leave将dispatch_group_t转换成dispatch_semaphore_t后将dsema_value的值原子性加1。如果value为LONG_MIN程序crash;如果value等于dsema_orig表示所有任务已完成,调用_dispatch_group_wake唤醒group(_dispatch_group_wake的用于和notify有关,我们会在后面介绍)。因为在enter的时候进行了原子性减1操作。所以在leave的时候需要原子性加1。

    这里先说明一下enter和leave之间的关系:

    dispatch_group_leavedispatch_group_enter配对使用。当调用了dispatch_group_enter而没有调用dispatch_group_leave时,由于value不等于dsema_orig不会走到唤醒逻辑,dispatch_group_notify中的任务无法执行或者dispatch_group_wait收不到信号而卡住线程。

    dispatch_group_enter必须在dispatch_group_leave之前出现。当dispatch_group_leave比dispatch_group_enter多调用了一次或者说在dispatch_group_enter之前被调用的时候,dispatch_group_leave进行原子性加1操作,相当于value为LONGMAX+1,发生数据长度溢出,变成LONG_MIN,由于value == LONG_MIN成立,程序发生crash。

    dispatch_group_notify

    void

    dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,

            dispatch_block_t db)

    {

        dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db),

                _dispatch_call_block_and_release);

    }

    dispatch_group_notify是dispatch_group_notify_f的封装,具体实现在后者。

    dispatch_group_notify_f

    void

    dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,

            void (*func)(void *))

    {

        dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

        struct dispatch_sema_notify_s *dsn, *prev;

        //封装dispatch_continuation_t结构体

        // FIXME -- this should be updated to use the continuation cache

        while (!(dsn = calloc(1, sizeof(*dsn)))) {

            sleep(1);

        }

        dsn->dsn_queue = dq;

        dsn->dsn_ctxt = ctxt;

        dsn->dsn_func = func;

        _dispatch_retain(dq);

        dispatch_atomic_store_barrier();

        //将结构体放到链表尾部,如果链表为空同时设置链表头部节点并唤醒group

        prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn);

        if (fastpath(prev)) {

            prev->dsn_next = dsn;

        } else {

            _dispatch_retain(dg);

            (void)dispatch_atomic_xchg2o(dsema, dsema_notify_head, dsn);

            if (dsema->dsema_value == dsema->dsema_orig) {//任务已经完成,唤醒group

                _dispatch_group_wake(dsema);

            }

        }

    }

    所以dispatch_group_notify函数只是用链表把所有回调通知保存起来,等待调用。

    _dispatch_group_wake

    static long

    _dispatch_group_wake(dispatch_semaphore_t dsema)

    {

        struct dispatch_sema_notify_s *next, *head, *tail = NULL;

        long rval;

        //将dsema的dsema_notify_head赋值为NULL,同时将之前的内容赋给head

        head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL);

        if (head) {

            // snapshot before anything is notified/woken

            //将dsema的dsema_notify_tail赋值为NULL,同时将之前的内容赋给tail

            tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL);

        }

        //将dsema的dsema_group_waiters设置为0,并返回原来的值

        rval = dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0);

        if (rval) {

            //循环调用semaphore_signal唤醒当初等待group的信号量,使得dispatch_group_wait函数返回。

            // wake group waiters

    #if USE_MACH_SEM

            _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);

            do {

                kern_return_t kr = semaphore_signal(dsema->dsema_waiter_port);

                DISPATCH_SEMAPHORE_VERIFY_KR(kr);

            } while (--rval);

    #elif USE_POSIX_SEM

            do {

                int ret = sem_post(&dsema->dsema_sem);

                DISPATCH_SEMAPHORE_VERIFY_RET(ret);

            } while (--rval);

    #endif

        }

        if (head) {

            //获取链表,依次调用dispatch_async_f异步执行在notify函数中的任务即Block。

            // async group notify blocks

            do {

                dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);

                _dispatch_release(head->dsn_queue);

                next = fastpath(head->dsn_next);

                if (!next && head != tail) {

                    while (!(next = fastpath(head->dsn_next))) {

                        _dispatch_hardware_pause();

                    }

                }

                free(head);

            } while ((head = next));

            _dispatch_release(dsema);

        }

        return 0;

    }

    _dispatch_group_wake主要的作用有两个:

    调用semaphore_signal唤醒当初等待group的信号量,使得dispatch_group_wait函数返回。

    获取链表,依次调用dispatch_async_f异步执行在notify函数中的任务即Block。

    dispatch_group_wait

    long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)

    {

        dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;

        if (dsema->dsema_value == dsema->dsema_orig) {//没有需要执行的任务

            return 0;

        }

        if (timeout == 0) {//返回超时

    #if USE_MACH_SEM

            return KERN_OPERATION_TIMED_OUT;

    #elif USE_POSIX_SEM

            errno = ETIMEDOUT;

            return (-1);

    #endif

        }

        return _dispatch_group_wait_slow(dsema, timeout);

    }

    dispatch_group_wait用于等待group中的任务完成。

    _dispatch_group_wait_slow

    static long

    _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)

    {

        long orig;

    again:

        // check before we cause another signal to be sent by incrementing

        // dsema->dsema_group_waiters

        if (dsema->dsema_value == dsema->dsema_orig) {

            return _dispatch_group_wake(dsema);

        }

        // Mach semaphores appear to sometimes spuriously wake up. Therefore,

        // we keep a parallel count of the number of times a Mach semaphore is

        // signaled (6880961).

        (void)dispatch_atomic_inc2o(dsema, dsema_group_waiters);

        // check the values again in case we need to wake any threads

        if (dsema->dsema_value == dsema->dsema_orig) {

            return _dispatch_group_wake(dsema);

        }

    #if USE_MACH_SEM

        mach_timespec_t _timeout;

        kern_return_t kr;

        _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);

        // From xnu/osfmk/kern/sync_sema.c:

        // wait_semaphore->count = -1; /* we don't keep an actual count */

        //

        // The code above does not match the documentation, and that fact is

        // not surprising. The documented semantics are clumsy to use in any

        // practical way. The above hack effectively tricks the rest of the

        // Mach semaphore logic to behave like the libdispatch algorithm.

        switch (timeout) {

        default:

            do {

                uint64_t nsec = _dispatch_timeout(timeout);

                _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);

                _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);

                kr = slowpath(semaphore_timedwait(dsema->dsema_waiter_port,

                        _timeout));

            } while (kr == KERN_ABORTED);

            if (kr != KERN_OPERATION_TIMED_OUT) {

                DISPATCH_SEMAPHORE_VERIFY_KR(kr);

                break;

            }

            // Fall through and try to undo the earlier change to

            // dsema->dsema_group_waiters

        case DISPATCH_TIME_NOW:

            while ((orig = dsema->dsema_group_waiters)) {

                if (dispatch_atomic_cmpxchg2o(dsema, dsema_group_waiters, orig,

                        orig - 1)) {

                    return KERN_OPERATION_TIMED_OUT;

                }

            }

            // Another thread called semaphore_signal().

            // Fall through and drain the wakeup.

        case DISPATCH_TIME_FOREVER:

            do {

                kr = semaphore_wait(dsema->dsema_waiter_port);

            } while (kr == KERN_ABORTED);

            DISPATCH_SEMAPHORE_VERIFY_KR(kr);

            break;

        }

    #elif USE_POSIX_SEM

    //这部分代码省略

    #endif

        goto again;

    }

    从上面的代码我们发现_dispatch_group_wait_slow和_dispatch_semaphore_wait_slow的逻辑很接近。都利用mach内核的semaphore进行信号的发送。区别在于_dispatch_semaphore_wait_slow在等待结束后是return,而_dispatch_group_wait_slow在等待结束是调用_dispatch_group_wake去唤醒这个group。

    相关文章

      网友评论

        本文标题:dispatch_group

        本文链接:https://www.haomeiwen.com/subject/fabprrtx.html