在上一篇文章iOS-底层原理24:GCD 之 函数与队列中从函数和队列去认识GCD,本文将更深入的通过源码去了解GCD底层原理。
1 查找GCD源码
step1:
用一个案例去寻找GCD的源码,在此处打上断点
dispatch_queue_t conque = dispatch_queue_create("lbh", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(conque, ^{
NSLog(@"异步函数");
});
等程序运行到此处,Debug
--> Debug Workflow
勾上Always show Disassembly
step2:
程序会自动跳转到汇编代码
可以看到一些熟悉的符号
step3:
给程序打上符号断点
step4:
继续运行,发现程序程序自动跳转到汇编
已经定位到对应的库libdispatch
step5:
去苹果开源地址搜索libdispatch
找到一个相关的库将它下载下来
2 队列
队列类型: dispatch_queue_t
队列的创建方法:dispatch_queue_create
2.1 队列类型 dispatch_queue_t
step1:
查找dispatch_queue_t
的定义
typedef struct dispatch_queue_s *dispatch_queue_t;
可以看到dispatch_queue_t
本身只是dispatch_queue_s
这个结构体指针
step2:
全局搜索dispatch_queue_s {
,查找dispatch_queue_s
定义
struct dispatch_queue_s {
DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
/* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;
step3:
查找DISPATCH_QUEUE_CLASS_HEADER
#define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
_DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
/* LP64 global queue cacheline boundary */ \
unsigned long dq_serialnum; \// queue的编号
const char *dq_label; \ //标签
DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
const uint16_t dq_width, \
const uint16_t __dq_opaque2 \
); \
dispatch_priority_t dq_priority; \//优先级
union { \
struct dispatch_queue_specific_head_s *dq_specific_head; \
struct dispatch_source_refs_s *ds_refs; \
struct dispatch_timer_source_refs_s *ds_timer_refs; \
struct dispatch_mach_recv_refs_s *dm_recv_refs; \
struct dispatch_channel_callbacks_s const *dch_callbacks; \
}; \
int volatile dq_sref_cnt
在这个宏里我们找到相关的方法_DISPATCH_QUEUE_CLASS_HEADER(x, pointer_sized_field);
step4:
继续展开搜索查看里面的内容如下:
// 展开_DISPATCH_QUEUE_CLASS_HEADER
#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
DISPATCH_OBJECT_HEADER(x); \
DISPATCH_UNION_LE(uint64_t volatile dq_state, \
dispatch_lock dq_state_lock, \
uint32_t dq_state_bits \
); \
// 持续展开DISPATCH_OBJECT_HEADER
#define DISPATCH_OBJECT_HEADER(x) \
struct dispatch_object_s _as_do[0]; \
_DISPATCH_OBJECT_HEADER(x)
// 进一步查看 _DISPATCH_OBJECT_HEADER
#define _DISPATCH_OBJECT_HEADER(x) \
struct _os_object_s _as_os_obj[0]; \
OS_OBJECT_STRUCT_HEADER(dispatch_##x); \ // 这个宏,可以理解为dispatch_object_s继承自_os_object_s
struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \
void *do_ctxt; \
void *do_finalizer
step5:
查看OS_OBJECT_STRUCT_HEADER
#define OS_OBJECT_STRUCT_HEADER(x) \
_OS_OBJECT_HEADER(\
const void *_objc_isa, \
do_ref_cnt, \
do_xref_cnt); \
// 注意这个成员变量,后面将任务Push到队列就是通过这个变量
const struct x##_vtable_s *do_vtable
来到OS_OBJECT_STRUCT_HEADER
之后,我们需要注意一个成员变量,记住这个成员变量名字叫做do_vtable
step6:
查看_OS_OBJECT_HEADER
// 进一步查看 _OS_OBJECT_HEADER
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
isa; /* must be pointer-sized */ \ // isa指针
int volatile ref_cnt; \ // gcd对象内部引用计数
int volatile xref_cnt// gcd对象外部引用计数(内外部都要减到0时,对象会被释放)
_OS_OBJECT_HEADER
包含isa指针
和引用计数
等信息。
【总结】
dispatch_queue_t
的本质是一个结构体指针对象,指向一个dispatch_queue_s
类型的结构体,里面包含了label
(标签)、priority
(优先级)等一些信息。
GCD源码中的数据结构为dispatch_object_t
联合抽象类
typedef union {
struct _os_object_s *_os_obj;// 基类
struct dispatch_object_s *_do;// 基类继承os_object
struct dispatch_queue_s *_dq;// 队列结构
struct dispatch_queue_attr_s *_dqa;// 队列相关属性
struct dispatch_group_s *_dg;// group结构
struct dispatch_source_s *_ds;
struct dispatch_channel_s *_dch;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_semaphore_s *_dsema;// 信号量
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
2.2 创建队列 dispatch_queue_create
我们知道队列的创建是通过dispatch_queue_create
,让我们看下它在源码中是如何创建的
step1:
打开源码,全局搜索dispatch_queue_create
,在queue.c
文件中找到源码
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
label
: 标签,我们平时传入队列的名字
attr
:我们知道创建队列时, attr 属性有三个值可选,nil
、DISPATCH_QUEUE_SERIAL
(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT
step2:
搜索_dispatch_lane_create_with_target
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// dqai 创建 -
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
//第一步:规范化参数,例如qos, overcommit, tq
...
//拼接队列名称
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) { //vtable表示类的类型
// OS_dispatch_queue_concurrent
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
....
//创建队列,并初始化
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s)); // alloc
//根据dqai.dqai_concurrent的值,就能判断队列 是 串行 还是并发
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
//设置队列label标识符
dq->dq_label = label;//label赋值
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos, dqai.dqai_relpri);//优先级处理
...
//类似于类与元类的绑定,不是直接的继承关系,而是类似于模型与模板的关系
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;//研究dq
}
2.2.1 _dispatch_lane_create_with_target
分析
part1:
全局搜索_dispatch_queue_attr_to_info
通过_dispatch_queue_attr_to_info
方法传入dqa
(即队列类型,串行、并发
等)创建dispatch_queue_attr_info_t
类型的对象dqai
,用于存储队列的相关属性信息
part2:
设置队列相关联的属性,例如服务质量qos等
part3:
通过DISPATCH_VTABLE
拼接队列名称,即vtable
,其中DISPATCH_VTABLE
是宏定义,如下所示,所以队列的类型是通过OS_dispatch_
+队列类型
(queue_concurrent
or queue_serial
)拼接而成的
//object_internal.h
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
⬇️
#define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name))
⬇️
#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class
⬇️
#define DISPATCH_CLASS(name) OS_dispatch_##name
part4:
通过alloc+init
初始化队列,即dq
,其中在_dispatch_queue_init
传参中根据dqai.dqai_concurrent
的布尔值,就能判断队列 是 串行 还是并发,而 vtable表示队列的类型,说明队列也是对象
-
part4.1:
进入_dispatch_object_alloc
-->_os_object_alloc_realized
方法中设置了isa的指向
,从这里可以验证队列也是对象的说法
-
part4.2:
进入_dispatch_queue_init
方法,队列类型是dispatch_queue_t
,并设置队列的相关属性
part5:
通过_dispatch_trace_queue_create
对创建的队列进行处理,其中_dispatch_trace_queue_create
是_dispatch_introspection_queue_create
封装的宏定义,最后会返回处理过的_dq
-
part5.1:
_dispatch_trace_queue_create
part5.2:
-
part5.3:
进入_dispatch_introspection_queue_create_hook
-->dispatch_introspection_queue_get_info
-->_dispatch_introspection_lane_get_info
中可以看出,与我们自定义的类还是有所区别的,创建队列在底层的实现是通过模板创建的
【总结】
-
队列
queue
也是一个对象,也需要底层通过alloc + init
创建,并且在alloc中也有一个class
,这个class是通过宏定义拼接而成,并且同时会指定isa的指向
-
创建队列在底层的处理是通过模板创建的,其类型是
dispatch_introspection_queue_s
结构体
dispatch_queue_create
底层分析流程如下图所示:
3 函数 底层原理
主要是分析 异步函数dispatch_async
和 同步函数dispatch_sync
3.1 异步函数 dispatch_async
进入dispatch_async
函数源码
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)//work 任务
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
// 任务包装器(work在这里才有使用) - 接受work - 保存work - 并函数式编程
// 保存 block
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
//并发处理
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
_dispatch_continuation_init
:任务包装函数
_dispatch_continuation_async
:并发处理函数
3.1.1 _dispatch_continuation_init 任务包装
进入_dispatch_continuation_init
源码实现,主要是包装任务,并设置线程的回调函数,相当于初始化
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
void *ctxt = _dispatch_Block_copy(work);//拷贝任务
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;//赋值
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);//封装work - 异步回调
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
主要有以下几步:
step1:
通过_dispatch_Block_copy
拷贝任务
step2:
通过_dispatch_Block_invoke
封装任务,其中_dispatch_Block_invoke
是个宏定义,根据以上分析得知是异步回调
#define _dispatch_Block_invoke(bb) \
((dispatch_function_t)((struct Block_layout *)bb)->invoke)
step3:
如果是同步的,则回调函数赋值为_dispatch_call_block_and_release
step4:
通过_dispatch_continuation_init_f
方法将回调函数赋值,即f就是func,将其保存在属性中
3.1.2 _dispatch_continuation_async 并发处理
这个函数中,主要是执行block回调
step1:
进入_dispatch_continuation_async
源码
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);//跟踪日志
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);//与dx_invoke一样,都是宏
}
step2:
关键代码是dx_push(dqu._dq, dc, qos)
,dx_push
是宏定义
,如下所示
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
step3:
查看dq_push
需要根据队列的类型,执行不同的函数
3.1.2.1 符号断点调试执行函数
part1:
添加如下测试代码,并打上断点
dispatch_queue_t conque = dispatch_queue_create("lbh", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(conque, ^{
NSLog(@"异步函数");
});
截屏2021-01-15 上午11.08.07.png
part2:
运行到断点处,加上符号断点_dispatch_lane_concurrent_push
和 _dispatch_lane_push
part3:
继续运行
跳转到汇编代码,走的是_dispatch_lane_concurrent_push
part3:
进入_dispatch_lane_concurrent_push
函数源码
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
有两个重要的函数_dispatch_continuation_redirect_push
和_dispatch_lane_push
part4:
将_dispatch_continuation_redirect_push
打上符号断点,_dispatch_lane_push
的符号断点已经存在,继续执行
走的是_dispatch_continuation_redirect_push
part5:
进入_dispatch_continuation_redirect_push
源码
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos); //递归
}
发现又调用了dx_push
,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法
part6:
将根类_dispatch_root_queue_push
打上符号断点,来验证猜想是否正确
从运行结果看,猜想是正确的,队列是一个对象,递归会执行到根类
part7:
进入源码_dispatch_root_queue_push
--> _dispatch_root_queue_push_inline
--> _dispatch_root_queue_poke
--> _dispatch_root_queue_poke_slow
,将_dispatch_root_queue_poke_slow
打上符号断点,继续运行
part8:
进入_dispatch_root_queue_poke_slow
源码
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();//重点
...
//do-while循环创建线程
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
_dispatch_root_queue_poke_slow
源码中主要有两步操作:
-
1、通过
_dispatch_root_queues_init
方法注册回调 -
2、通过
do-while
循环创建线程,使用pthread_create
方法
3.1.2.2 _dispatch_root_queues_init
part1:
进入_dispatch_root_queues_init
源码
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}
dispatch_once_f
是个单例(后面会对单例底层进行分析)
part2:
进入_dispatch_root_queues_init_once
源码
内部不同事务的调用句柄都是_dispatch_worker_thread2
part3:
可运行案例打印堆栈信息
_dispatch_root_queues_init
回调路径: _dispatch_worker_thread2
--> _dispatch_root_queue_drain
--> _dispatch_async_redirect_invoke
--> _dispatch_continuation_pop
--> _dispatch_client_callout
--> _dispatch_call_block_and_release
说明
单例的block回调
和异步函数的block回调
是不同
的
- 单例的block回调中的func是
_dispatch_Block_invoke(block)
- 异步函数的block回调中的func是
dispatch_call_block_and_release
【总结
】
所以,综上所述,异步函数的底层分析如下
*【准备工作】:首先,将
异步任务拷贝并封装
,并设置回调函数func
*【block回调】:底层通过
dx_push递归
,会重定向到根队列
,然后通过pthread_creat创建线程
,最后通过dx_invoke执行block回调
(注意dx_push 和 dx_invoke 是成对的)
异步函数的底层分析流程如图所示
3.2 同步函数 dispatch_sync
step1:
进入dispatch_sync
源码实现,其底层的实现是通过栅栏函数实现
的(后续会进行分析)
DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
step2:
进入_dispatch_sync_f
源码
DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
step3:
进入_dispatch_sync_f_inline
源码
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {//表示是串行队列
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);//栅栏
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);//处理当前信息
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));//block执行并释放
}
_dispatch_sync_f_inline
源码中有两个重要的函数:
-
栅栏
:_dispatch_barrier_sync_f
(可以通过后文的栅栏函数底层分析解释),可以得出同步函数的底层
实现其实是同步栅栏函数
-
死锁
:_dispatch_sync_f_slow
,如果存在相互等待的情况,就会造成死锁
3.2.1 死锁 _dispatch_sync_f_slow
part1:
进入_dispatch_sync_f_slow
源码
part2:
进入_dispatch_trace_item_push
源码
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
{
if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
_dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
}
_dispatch_trace_item_push_inline(dqu._dq, _tail._do);
_dispatch_introspection_queue_push(dqu, _tail);
}
part3:
进入__DISPATCH_WAIT_FOR_QUEUE__
,判断dq是否为正在等待的队列,然后给出一个状态state
,然后将dq的状态和当前任务依赖的队列进行匹配
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
// 判断qd是否为正在等待的主队列
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor();
_dispatch_thread_event_init(&dsc->dsc_event);
}
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
_dispatch_trace_runtime_event(sync_wait, dq, 0);
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_wait(&dsc->dsc_event); // acquire
} else {
_dispatch_event_loop_wait_for_ownership(dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}
part4:
进入_dq_state_drain_locked_by
--> _dispatch_lock_is_locked_by
源码
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
//异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
//即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相乘,如果相等,则会造成死锁
同步函数 + 并发队列 顺序执行的原因
在_dispatch_sync_invoke_and_complete
--> _dispatch_sync_function_invoke_inline
源码中,主要有三个步骤:
- 将任务压入队列: _dispatch_thread_frame_push
- 执行任务的block回调: _dispatch_client_callout
- 将任务出队:_dispatch_thread_frame_pop
从实现中可以看出,是先将任务push队列中,然后执行block回调,在将任务pop,所以任务是顺序执行的。
总结
同步函数的底层实现如下:
同步函数的底层实现实际是同步栅栏函数
同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁
所以,综上所述,同步函数的底层实现流程如图所示
4 单例
在日常开发中,我们一般使用GCD的dispatch_once来创建单例,如下所示:
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"单例应用");
});
首先对于单例,我们需要了解两点:
【
执行一次的原因
】单例的流程只执行一次,底层是如何控制的,即为什么只能执行一次?【
block调用时机
】单例的block是在什么时候进行调用的?
4.1 单例 底层分析
step1:
dispatch_once
有两个参数:
-
参数1:
onceToken
,它是一个静态变量,由于不同位置定义的静态变量是不同的,所以静态变量具有唯一性 -
参数2:
block
回调
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
进入dispatch_once
源码,底层是通过dispatch_once_f
实现的,
step2:
进入dispatch_once_f
源码
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);//load
if (likely(v == DLOCK_ONCE_DONE)) {//已经执行过了,直接返回
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {//尝试进入
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);//无限次等待
}
其中的val
是外界传入的onceToken
静态变量,而func
是_dispatch_Block_invoke(block)
,其中单例的底层主要分为以下几步:
- 将
val
,也就是静态变量
转换为dispatch_once_gate_t
类型的变量l
- 通过
os_atomic_load
获取此时的任务的标识符v
如果
v等于DLOCK_ONCE_DONE
,表示任务已经执行过了
,直接return
如果任务执行后,加锁失败了,则走到
_dispatch_once_mark_done_if_quiesced
函数,再次进行存储,将标识符置为DLOCK_ONCE_DONE
反之,则通过
_dispatch_once_gate_tryenter
尝试进入任务,即解锁,然后执行_dispatch_once_callout执行block回调
- 如果此时有任务正在执行,再次进来一个任务2,则通过
_dispatch_once_wait
函数让任务2进入无限次等待
4.1.1 _dispatch_once_gate_tryenter 解锁
查看其源码,主要是通过底层os_atomic_cmpxchg
方法进行对比,如果比较没有问题,则进行加锁,即任务的标识符置为DLOCK_ONCE_UNLOCKED
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);//首先对比,然后进行改变
}
4.1.2 _dispatch_once_callout 回调
step1:
进入_dispatch_once_callout
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func);//block调用执行
_dispatch_once_gate_broadcast(l);//进行广播:告诉别人有了归属,不要找我了
主要就两步:
_dispatch_client_callout
:block回调执行
_dispatch_once_gate_broadcast
:进行广播
step2:
进入_dispatch_client_callout
源码,主要就是执行block
回调,其中的f
等于_dispatch_Block_invoke(block)
,即异步回调
#undef _dispatch_client_callout
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
@try {
return f(ctxt);
}
@catch (...) {
objc_terminate();
}
}
step3:
进入 _dispatch_once_gate_broadcast
--> _dispatch_once_mark_done
源码
DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
//如果不相同,直接改为相同,然后上锁 -- DLOCK_ONCE_DONE
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
主要就是给dgo->dgo_once
一个值,然后将任务的标识符为DLOCK_ONCE_DONE
,即解锁
总结
针对单例的底层实现,主要说明如下:
【
单例只执行一次的原理
】:GCD单例中,有两个重要参数,onceToken
和block
,其中onceToken
是静态变量,具有唯一性,在底层被封装成了dispatch_once_gate_t类型的变量l
,l主要是用来获取底层原子封装性的关联,即变量v,通过v来查询任务的状态,如果此时v等于DLOCK_ONCE_DONE
,说明任务已经处理过一次
了,直接return【
block调用时机
】:如果此时任务没有执行过,则会在底层通过C++函数的比较,将任务进行加锁,即任务状态置为DLOCK_ONCE_UNLOCK
,目的是为了保证当前任务执行的唯一性,防止在其他地方有多次定义。加锁之后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE
,在下次进来时,就不会在执行,会直接返回【
多线程影响
】:如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的
单例的底层流程分析如下如所示:
5 栅栏函数
GCD中常用的栅栏函数,主要有两种:
名称 | 作用 | 缺点 |
---|---|---|
同步栅栏函数dispatch_barrier_sync ,在主线程中执行 |
前面的任务执行完毕才会来到这里 | 堵塞线程 |
异步栅栏函数dispatch_barrier_async
|
前面的任务执行完毕才会来到这里 | 堵塞队列 |
栅栏函数最直接的作用就是:控制任务执行顺序,使同步执行
栅栏函数需要注意一下几点:
栅栏函数
只能控制同一并发队列
同步栅栏添加进入队列
的时候,当前线程会被锁死
,直到同步栅栏之前的任务和同步栅栏任务本身执行完毕时
,当前线程才会打开然后继续执行下一句代码。在
使用栅栏函数时,使用自定义队列才有意义
,如果用的是串行队列
或者系统提供的全局并发队列
,这个栅栏函数的作用等同于一个同步函数的作用,没有任何意义
5.1 代码调试
5.1.1 异步栅栏函数
- 异步栅栏函数 不会阻塞主线程 ,异步
堵塞的是队列
- (void)wbinterDemo1{
dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(queue1, ^{
NSLog(@"1-%@",[NSThread currentThread]);
});
dispatch_async(queue1, ^{
// sleep(1);
NSLog(@"2-%@",[NSThread currentThread]);
});
dispatch_barrier_async(queue1, ^{
// sleep(2);
NSLog(@"3-%@",[NSThread currentThread]);
});
dispatch_async(queue1, ^{
NSLog(@"4-%@",[NSThread currentThread]);
});
}
运行结果
分析
5.1.2 同步栅栏函数
- 同步栅栏函数 会
堵塞主线程
- (void)wbinterDemo2{
dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(queue1, ^{
// sleep(2);
NSLog(@"1=%@=%@",[NSThread currentThread],[NSDate date]);
});
dispatch_async(queue1, ^{
NSLog(@"2=%@=%@",[NSThread currentThread],[NSDate date]);
});
dispatch_barrier_sync(queue1, ^{
NSLog(@"3=%@=%@",[NSThread currentThread],[NSDate date]);
});
dispatch_async(queue1, ^{
NSLog(@"4=%@=%@",[NSThread currentThread],[NSDate date]);
});
}
运行结果
5.2 使用问题
代码
- (void)interDemo3
{
dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
NSMutableArray *array = [NSMutableArray array];
for (int i = 0; i < 10000; i++) {
dispatch_async(queue1, ^{
[array addObject:[NSString stringWithFormat:@"%d",i]];
});
}
}
运行结果
问题
: 为什么会崩溃?
分析
在objc源码
中找到addObject:
源码
- (id)addObject:anObject
{
return [self insertObject:anObject at:numElements];
}
⬇️
- (id)insertObject:anObject at:(unsigned)index
{
register id *this, *last, *prev;
if (! anObject) return nil;
if (index > numElements)
return nil;
if ((numElements + 1) > maxElements) {
volatile id *tempDataPtr;
/* we double the capacity, also a good size for malloc */
// 这里在数组超过一定的空间之后就进行了双倍的扩容
maxElements += maxElements + 1;
// 这里数组tempDataPtr 进行了realloc操作 所以在多个线程同时访问的时候就会出现问题
tempDataPtr = (id *) realloc (dataPtr, DATASIZE(maxElements));
dataPtr = (id*)tempDataPtr;
}
this = dataPtr + numElements;
prev = this - 1;
last = dataPtr + index;
while (this > last)
*this-- = *prev--;
*last = anObject;
numElements++;
return self;
}
可以看到,当数组的容量超过
maxElements
的时候就会maxElements += maxElements + 1;
,并且进行realloc
重新创建了一个新的数组的操作,在多线程的操作,如果数组添加的元素太多就会出现给旧数组添加元素的时候,旧的数组其实已经被替代的情况,这样就出现了崩溃
解决方法
1、数组初始化时给足够大的空间
2、利用栅栏函数
问题
:为什么不能使用同步栅栏函数?
3、使用互斥锁 @synchronized (self) {}
5.3 底层原理
5.3.1 异步栅栏函数 底层原理
进入dispatch_barrier_async
源码实现
#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif
在分析dispatch_async
的底层实现时,已经知道dispatch_async
的本质其实就是dispatch_barrier_async
,这里就不在进行分析
5.3.2 同步栅栏函数 底层原理
进入dispatch_barrier_sync
源码,实现如下
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
5.3.2.1 _dispatch_barrier_sync_f_inline
进入 _dispatch_barrier_sync_f
--> _dispatch_barrier_sync_f_inline
源码
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();//获取线程的id,即线程的唯一标识
...
//判断线程状态,需不需要等待,是否回收
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {//栅栏函数也会死锁
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,//没有回收
DC_FLAG_BARRIER | dc_flags);
}
//验证target是否存在,如果存在,加入栅栏函数的递归查找 是否等待
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));//执行
}
源码主要有分为以下几部分:
part1:
通过_dispatch_tid_self
获取线程ID
part2:
通过_dispatch_queue_try_acquire_barrier_sync
判断线程状态
part2.1:
进入_dispatch_queue_try_acquire_barrier_sync_and_suspend
源码
part3:
通过_dispatch_sync_recurse
递归查找栅栏函数的target
part4:
通过_dispatch_introspection_sync_begin
对向前信息进行处理
part5:
通过_dispatch_lane_barrier_sync_invoke_and_complete
执行block并释放
6 信号量 dispatch_semaphore_t
信号量的作用一般是用来使任务同步执行
,类似于互斥锁,用户可以根据需要控制GCD最大并发数,一般是这样使用的
//信号量
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
dispatch_semaphore_signal(sem);
6.1 底层原理
选择几个重要的函数,对其底层进行分析
6.1.1 dispatch_semaphore_create 创建
该函数的底层实现如下,主要是初始化信号量
,并设置GCD的最大并发数
,其最大并发数必须大于0
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
6.1.2 dispatch_semaphore_wait 加锁
step1:
进入dispatch_semaphore_wait
源码
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
// dsema_value 进行 -- 操作
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {//表示执行操作无效,即执行成功
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);//长等待
}
该函数的源码实现如下,其主要作用是对信号量dsema
通过os_atomic_dec2o
进行了--操作,其内部是执行的C++的atomic_fetch_sub_explicit
方法
如果
value 大于等于0
,表示操作无效,即执行成功如果
value 等于LONG_MIN
,系统会抛出一个crash如果
value 小于0
,则进入长等待
其中os_atomic_dec2o
的宏定义转换如下
#define os_atomic_inc2o(p, f, m) \
os_atomic_add2o(p, f, 1, m)
👇
#define os_atomic_add2o(p, f, v, m) \
os_atomic_add(&(p)->f, (v), m)
👇
#define os_atomic_add(p, v, m) \
_os_atomic_c11_op((p), (v), m, add, +)
👇
#define _os_atomic_c11_op(p, v, m, o, op) \
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })
将具体的值代入为
网友评论