美文网首页V10(逻辑教育)
iOS-底层探索23:GCD下

iOS-底层探索23:GCD下

作者: differ_iOSER | 来源:发表于2020-11-05 22:14 被阅读0次

iOS 底层探索 文章汇总

前言

上一篇文章我们介绍了异步函数中任务的包装和调用流程,这篇文章主要介绍异步函数中线程是如何开辟的,同步函数和栅栏函数的底层源码分析。

目录

一、异步函数中线程的开辟

1、异步函数底层分析:dispatch_async

#ifdef __BLOCKS__
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    // 任务包装器 - 保存 block
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
#endif

dispatch_async方法中主要调用两个方法:
_dispatch_continuation_init_dispatch_continuation_async
_dispatch_continuation_init方法已经在上一篇文章中分析了,这篇文章从_dispatch_continuation_async方法开始分析

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

dx_push是通过宏定义的

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

根据dx_vtable(x)的不同dq_push调用的方法也不同,其中并发队列对应的方法定义如下:

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
    .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
    .do_dispose     = _dispatch_lane_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_lane_invoke,

    .dq_activate    = _dispatch_lane_activate,
    .dq_wakeup      = _dispatch_lane_wakeup,
    .dq_push        = _dispatch_lane_concurrent_push,//并发队列调用的dq_push方法
);
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    // <rdar://problem/24738102&24743140> reserving non barrier width
    // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
    // width equivalent), so we have to check that this thread hasn't
    // enqueued anything ahead of this call or we can break ordering
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}

现在需要查看当前方法执行的是return _dispatch_continuation_redirect_push(dq, dou, qos);还是_dispatch_lane_push(dq, dou, qos);

添加三个符号断点并关闭断点:

运行代码并打开新添加的三个符号断点:

点击继续运行进入dispatch_async符号断点:

点击继续运行可以发现进入了_dispatch_continuation_redirect_push符号断点:

因此_dispatch_lane_concurrent_push方法中首先执行的是return _dispatch_continuation_redirect_push(dq, dou, qos);
通过相同的方法可以验证后面执行的方法为_dispatch_root_queue_push

2、异步函数底层线程的开辟

void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
...
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
        dispatch_object_t _head, dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
    if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
        return _dispatch_root_queue_poke(dq, n, 0);
    }
}
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
    ...
    return _dispatch_root_queue_poke_slow(dq, n, floor);
}
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
   int remaining = n;
   int r = ENOSYS;

   _dispatch_root_queues_init();
   ...
   do {
       _dispatch_retain(dq); // released in _dispatch_worker_thread
       while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
           if (r != EAGAIN) {
               (void)dispatch_assume_zero(r);
           }
           _dispatch_temporary_resource_shortage();
       }
   } while (--remaining);

   ...
}

从源码中我们可以看到底层是通过pthread开辟的的线程

在前一篇文章中我们打印出了异步函数任务Block调用的堆栈信息

3、跨库调用Block

那么底层是在哪里从libsystem_pthread.dylib库调用到libdispatch.dylib库的_dispatch_worker_thread2方法呢?

上面的_dispatch_root_queue_poke方法中有这样一句代码:
_dispatch_root_queues_init();

static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,
            _dispatch_root_queues_init_once);
}
这里引出了单例的底层实现:
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {//执行了一次就直接返回
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {//获取一把锁
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);//获取不到锁的线程进入等待
}

这里的dispatch_once_f方法也是dispatch_once底层的实现

  • _dispatch_once_gate_tryenterr(l)保证只有一条线程能获取到这把锁,从而保证了多线程下的线程安全
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
            (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
  • _dispatch_once_gate_tryenterr(l)获取的锁和Block中_dispatch_once_gate_broadcast方法获取的是同一把锁。
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
});

单例执行一次的说明:
dispatch_once有两个参数:onceTokenBlock
我们主要关注有两点:第一点为什么单例只会执行一次,第二点Block为什么会被调用。因为onceToken会在底层封装为dispatch_once_gate_t l变量l会用来获取底层原子性的关联,并且onceToken是静态变量保证了唯一性。通过os_atomic_load方法判断Block执行的次数,如果执行了就直接返回。如果是第一次执行会获取一把锁,保证当前调用Block的唯一,当Block执行后就解除当前的锁。如果执行Block时还未解锁且有一条线程进入dispatch_once_f方法,那么就会进入等待状态。

Block调用:

static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_client_callout(ctxt, func);// 调用Block
    _dispatch_once_gate_broadcast(l);//对外广播已经执行了一次
}
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
    dispatch_lock value_self = _dispatch_lock_value_for_self();
    uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    v = _dispatch_once_mark_quiescing(l);
#else
    v = _dispatch_once_mark_done(l);//将v标记为DLOCK_ONCE_DONE表示已经执行了一次
#endif
    if (likely((dispatch_lock)v == value_self)) return;
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

跨库调用的方法绑定还是在_dispatch_root_queues_init();方法的_dispatch_root_queues_init_once参数中

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
...
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
        cfg.workq_cb = _dispatch_worker_thread2;
        r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
        r = _pthread_workqueue_init(_dispatch_worker_thread2,
                offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
}
...
} 

二、同步函数、栅栏函数底层分析

1、异步栅栏函数和同步栅栏函数的区别

    dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NANSLog(@"111");
    });
    /* 2. 异步栅栏函数 */
    dispatch_barrier_async(concurrentQueue, ^{
        NANSLog(@"%@",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NANSLog(@"222");
    });
    NANSLog(@"333");
333
111
---<NSThread: 0x600001883540>{number = 3, name = (null)}---
222
    dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);

    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NANSLog(@"111");
    });
    /* 2. 同步栅栏函数 */
    dispatch_barrier_sync(concurrentQueue, ^{
        NANSLog(@"%@",[NSThread currentThread]);
    });
    /* 3. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NANSLog(@"222");
    });
    NANSLog(@"333");
111
<NSThread: 0x6000026981c0>{number = 1, name = main}
333
222

区别:

  • 异步栅栏函数只会阻塞自定义的队列,不会阻塞外面的队列
  • 同步栅栏函数阻塞的是外面的队列,不会阻塞自定义的队列
  • 栅栏函数只能使用自定义的并发队列

2、栅栏函数使用案例

    dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);

    for (int i = 0; i<1000; i++) {
        dispatch_async(concurrentQueue, ^{
            NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
            NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
            NSData *data = [NSData dataWithContentsOfURL:url];
            UIImage *image = [UIImage imageWithData:data];

            [self.mArray addObject:image];
        });
    }

运行程序会Crash,并报如下错误:
error for object 0x7f8844860000: pointer being freed was not allocated
也就是多线程操作时,同名的变量retainrelease操作并未成对出现,所以导致数据读取的安全问题。
添加栅栏函数后就能解决问题

dispatch_barrier_async(concurrentQueue , ^{
     [self.mArray addObject:image];
});

通过加锁也能解决问题

@synchronized (self) {
    [self.mArray addObject:image];
}

3、同步函数、栅栏函数底层源码分析

同步函数:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    if (likely(dq->dq_width == 1)) {// 串行队列
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);// 串行队列和barrier相同
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

_dispatch_barrier_sync_f方法调用_dispatch_barrier_sync_f_inline方法

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // The more correct thing to do would be to merge the qos of the thread
    // that just acquired the barrier lock into the queue state.
    //
    // However this is too expensive for the fast path, so skip doing it.
    // The chosen tradeoff is that if an enqueue on a lower priority thread
    // contends with this fast path, this thread may receive a useless override.
    //
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);//判断有无死锁
    }// 栅栏函数也会死锁,判断有没有阻塞当前队列

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);// 准备操作
      //  执行完成后对状态进行释放
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

三、调度组的使用

1、信号量控制最大并发数

    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);

    dispatch_semaphore_t sem = dispatch_semaphore_create(2);
    
    //任务1
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        NSLog(@"执行任务1");
        sleep(1);
        NSLog(@"任务1完成");
        dispatch_semaphore_signal(sem);
    });
    
    //任务2
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        NSLog(@"执行任务2");
        sleep(1);
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem);
    });
    
    //任务3
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        NSLog(@"执行任务3");
        sleep(1);
        NSLog(@"任务3完成");
        dispatch_semaphore_signal(sem);
    });

这里设置的最大并发数 为2

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);//信号量--
    if (likely(value >= 0)) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);//进入异常处理
}
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    long value = os_atomic_inc2o(dsema, dsema_value, release);//信号量++
    if (likely(value > 0)) {
        return 0;
    }
    if (unlikely(value == LONG_MIN)) {
        DISPATCH_CLIENT_CRASH(value,
                "Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);//进入异常等待
}

2、调度组

进组 出组搭配使用,不一起使用会Crash

dispatch_group_enter(group);
dispatch_group_leave(group);

dispatch_group_notify代替进组 出组

相关文章

网友评论

    本文标题:iOS-底层探索23:GCD下

    本文链接:https://www.haomeiwen.com/subject/ursnvktx.html