前言
- 上篇我们复习了常用的GCD之后,我们会问dispatch_async是如何开启线程的呢?带着这个疑问,我们开启GCD 底层源码的研究。
- 主要内容:探索队列的本质,队列的创建,串行和并发 、同步和异步执行的区别及底层执行流程。
准备
一 、查找 GCD 源码
来到工程,我们跳转查看dispatch_async
如下:只是提供了一个对外的接口。
#ifdef __BLOCKS__
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
void
dispatch_async(dispatch_queue_t queue, dispatch_block_t block);
#endif
所以我们再去断点调试:
我们在
dispatch_async
下断点,查看堆栈下的dispatch_async
,就会来到汇编调用情况,其中我们可以看到libdispatch.dylib
调用了dispatch_async:
,所以根据以前的经验,GCD的库就在libdispatch.dylib
里面.所以我们去源码中查看. 下载地址上面已给出。
二 、libdispatch源码分析
当我翻开源码的时候,首先我发现,没法编译这份源码,其次源码中结构体、方法等的命名不能从字面理解,各种宏定义的嵌套错综复杂,所以看了半天有点读不下去了。但是,虽然不能像以前探索objc
源码那样通过LLDB
调试验证,但是我们可以从我们使用GCD流程下手,刨根问底,抱着探索求真的目的,顺着思路往下慢慢的找关键代码,然后进行分析其流程。
所以我们开始第一阶段的探索:
在上一篇讲解了GCD的使用步骤:
- 第一步:创建一个队列(串行队列或并发队列)
- 第二步:将任务追加到任务的等待队列中,然后系统就会根据任务类型执行任务(同步执行或异步执行)
队列类型 dispatch_queue_t
队列的创建方法:dispatch_queue_create
创建串行队列
dispatch_queue_t queue =
dispatch_queue_create("com.lgcooci.seial", DISPATCH_QUEUE_SERIAL);
创建并发队列
dispatch_queue_t queue =
dispatch_queue_create("com.lgcooci.concurrent", DISPATCH_QUEUE_CONCURRENT);
首先我们在源码中全局去搜索找找dispatch_queue_t
是什么?
1. 队列类型dispatch_queue_t
首先可以看到dispatch_queue_t
本身只是dispatch_queue_s
这个结构体指针
typedef struct dispatch_queue_s *dispatch_queue_t;
继续查找dispatch_queue_s
定义
struct dispatch_queue_s {
DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
/* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;
继续查找DISPATCH_QUEUE_CLASS_HEADER
,发现是个宏
如下
#define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
_DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
/* LP64 global queue cacheline boundary */ \
unsigned long dq_serialnum; \// queue的编号
const char *dq_label; \ //标签
DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
const uint16_t dq_width, \
const uint16_t __dq_opaque2 \
); \
dispatch_priority_t dq_priority; \//优先级
union { \
struct dispatch_queue_specific_head_s *dq_specific_head; \
struct dispatch_source_refs_s *ds_refs; \
struct dispatch_timer_source_refs_s *ds_timer_refs; \
struct dispatch_mach_recv_refs_s *dm_recv_refs; \
struct dispatch_channel_callbacks_s const *dch_callbacks; \
}; \
int volatile dq_sref_cnt
在这个宏里我们找到相关的方法_DISPATCH_QUEUE_CLASS_HEADER(x, pointer_sized_field); 继续展开搜索查看里面的内容如下:
// 展开_DISPATCH_QUEUE_CLASS_HEADER
#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
DISPATCH_OBJECT_HEADER(x); \
DISPATCH_UNION_LE(uint64_t volatile dq_state, \
dispatch_lock dq_state_lock, \
uint32_t dq_state_bits \
); \
// 持续展开DISPATCH_OBJECT_HEADER
#define DISPATCH_OBJECT_HEADER(x) \
struct dispatch_object_s _as_do[0]; \
_DISPATCH_OBJECT_HEADER(x)
// 进一步查看 _DISPATCH_OBJECT_HEADER
#define _DISPATCH_OBJECT_HEADER(x) \
struct _os_object_s _as_os_obj[0]; \
OS_OBJECT_STRUCT_HEADER(dispatch_##x); \ // 这个宏,可以理解为dispatch_object_s继承自_os_object_s
struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \
void *do_ctxt; \
void *do_finalizer
来到OS_OBJECT_STRUCT_HEADER
之后,我们需要注意一个成员变量,记住这个成员变量名字叫做do_vtable
。再继续拆解_OS_OBJECT_HEADER
发现里面起就是一个isa指针
和引用计数
一些信息。
再查看 OS_OBJECT_STRUCT_HEADER
#define OS_OBJECT_STRUCT_HEADER(x) \
_OS_OBJECT_HEADER(\
const void *_objc_isa, \
do_ref_cnt, \
do_xref_cnt); \
// 注意这个成员变量,后面将任务Push到队列就是通过这个变量
const struct x##_vtable_s *do_vtable
// 进一步查看 _OS_OBJECT_HEADER
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
isa; /* must be pointer-sized */ \ // isa指针
int volatile ref_cnt; \ // gcd对象内部引用计数
int volatile xref_cnt// gcd对象外部引用计数(内外部都要减到0时,对象会被释放)
经过一步步相关代码的查找最终来到_OS_OBJECT_HEADER
,可以发现这些代码晦涩难懂,但是我们研究源码不一定要知道每句代码的意思,我们查看其核心内容:dispatch_queue_t
的本质是一个结构体指针对象,里面包含了label
(标签)、priority
(优先级)等一些信息。dispatch_queue_t
类型的指针,指向一个dispatch_queue_s
类型的结构体。
其实GCD源码中的数据结构为dispatch_object_t
联合抽象类
typedef union {
struct _os_object_s *_os_obj;// 基类
struct dispatch_object_s *_do;// 基类继承os_object
struct dispatch_queue_s *_dq;// 队列结构
struct dispatch_queue_attr_s *_dqa;// 队列相关属性
struct dispatch_group_s *_dg;// group结构
struct dispatch_source_s *_ds;
struct dispatch_channel_s *_dch;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_semaphore_s *_dsema;// 信号量
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
2. 创建队列 dispatch_queue_create
源码查看如下:
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
label : 标签,我们平时传入队列的名字
attr :我们知道创建队列时, attr 属性有三个值可选,nil、DISPATCH_QUEUE_SERIAL
(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT
。
_dispatch_queue_create_with_target
函数,这里会创建一个root队列,并将自己新建的队列绑定到所对应的root队列上。
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
//
// Step 1: Normalize arguments (qos, overcommit, tq) 规范化参数
//
// 根据上文代码注释里提到的,作者认为调用者传入DISPATCH_QUEUE_SERIAL和nil的几率要大于传DISPATCH_QUEUE_CONCURRENT。所以这里设置个默认值。
// 这里怎么理解呢?只要看做if(!dqa)即可
if (!slowpath(dqa)) {
// _dispatch_get_default_queue_attr里面会将dqa的dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT的,inactive也指定为false。这里就不展开了,只需要知道赋了哪些值。因为后面会用到。
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
// 取出优先级
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
// overcommit单纯从英文理解表示过量使用的意思,那这里这个overcommit就是一个标识符,表示是不是就算负荷很高了,但还是得给我新开一个线程出来给我执行任务。
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}
// 如果overcommit没有被指定
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// 所以对于overcommit,如果是串行的话默认是开启的,而并行是关闭的
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
// 之前说过初始化队列默认传了DISPATCH_TARGET_QUEUE_DEFAULT,也就是null,所以进入if语句。
if (!tq) {
// 获取一个管理自己队列的root队列。
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}
// ----------- 开始进行队列的创建 -----------------
// legacy默认是true的
if (legacy) {
// 之前说过,默认是会给dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT,所以这个判断式是成立的
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}
// vtable变量很重要,之后会被赋值到之前说的dispatch_queue_t结构体里的do_vtable变量上
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
// 如果创建队列的时候传了DISPATCH_QUEUE_CONCURRENT,就是走这里
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
// 如果创建线程没有指定为并行队列,无论你传DISPATCH_QUEUE_SERIAL还是nil,都会创建一个串行队列。
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqai.dqai_autorelease_frequency) {
.....
}
if (label) {
// 判断传进来的字符串是否可变的,如果可变的copy成一份不可变的
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
// _dispatch_object_alloc里面就将vtable赋值给do_vtable变量上了。
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
// 第三个参数根据是否并行队列,如果不是则最多开一个线程,如果是则最多开0x1000 - 2个线程,这个数量很惊人了已经,换成十进制就是(4096 - 2)个。
// dqa_inactive之前说串行是false的
// DISPATCH_QUEUE_ROLE_INNER 也是0,所以这里串行队列的话dqa->dqa_state是0
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
dqai.dqai_relpri);
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqai.dqai_inactive) {
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_lane_inherit_wlh_from_target(dq, tq);
}
_dispatch_retain(tq);
// 自定义的queue的目标队列是root队列
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;
}
源码中大部分代码都不能从字面上了解其意思,但是这里有几个重要的函数我们可以找到,首先是创建一个root队列_dispatch_get_root_queue函数。取root队列
static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
}
return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
看下这个_dispatch_root_queues数组。我们可以看到,每一个优先级都有对应的root队列,每一个优先级又分为是不是可以过载的队列。
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
struct dispatch_queue_global_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
[_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
.dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \
_dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \
_dispatch_priority_make(DISPATCH_QOS_##n, 0)), \
__VA_ARGS__ \
}
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};
其中DISPATCH_GLOBAL_OBJECT_HEADER(queue_root)
,解析到最后是OSdispatch##name##_class
这样的这样的,对应的实例对象是如下代码,指定了root队列各个操作对应的函数。
可以看到很熟悉的alloc 、init 操作了;
_dispatch_object_alloc
: 申请对应类型的内存;
_dispatch_queue_init
:初始化。
首先看前者的实现:
// 注意对于iOS并不满足 OS_OBJECT_HAVE_OBJC1
void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
const struct dispatch_object_vtable_s *_vtable = vtable;
dispatch_object_t dou;
dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
dou._do->do_vtable = vtable;
return dou._do;
#else
return _os_object_alloc_realized(vtable, size);
#endif
}
// 接着看 _os_object_alloc_realized
inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
_os_object_t obj;
dispatch_assert(size >= sizeof(struct _os_object_s));
while (unlikely(!(obj = calloc(1u, size)))) {
_dispatch_temporary_resource_shortage();
}
obj->os_obj_isa = cls;
return obj;
}
再看一下 _os_objc_alloc
_os_object_t
_os_object_alloc(const void *cls, size_t size)
{
if (!cls) cls = &_os_object_vtable;
return _os_object_alloc_realized(cls, size);
}
inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
_os_object_t obj;
dispatch_assert(size >= sizeof(struct _os_object_s));
while (unlikely(!(obj = calloc(1u, size)))) {
_dispatch_temporary_resource_shortage();
}
obj->os_obj_isa = cls;
return obj;
}
返回值类型:结构体_os_object_s
typedef struct _os_object_s {
_OS_OBJECT_HEADER(
const _os_object_vtable_s *os_obj_isa,
os_obj_ref_cnt,
os_obj_xref_cnt);
} _os_object_s;
// 系统对象头部定义宏
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt)
isa; // isa指针
int volatile ref_cnt; // gcd对象内部引用计数
int volatile xref_cnt // gcd对象外部引用计数(内外部都要减到0时,对象会被释放)
再看下_dispatch_queue_init函数,这里也就是做些初始化工作了
static inline void _dispatch_queue_init(dispatch_queue_t dq, dispatch_queue_flags_t dqf,
uint16_t width, uint64_t initial_state_bits)
{
uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
DISPATCH_QUEUE_INACTIVE)) == 0);
if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
dq_state |= DISPATCH_QUEUE_INACTIVE + DISPATCH_QUEUE_NEEDS_ACTIVATION;
dq_state |= DLOCK_OWNER_MASK;
dq->do_ref_cnt += 2;
}
dq_state |= (initial_state_bits & DISPATCH_QUEUE_ROLE_MASK);
// 指向DISPATCH_OBJECT_LISTLESS是优化编译器的作用。只是为了生成更好的指令让CPU更好的编码
dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
dqf |= DQF_WIDTH(width);
// dqf 保存进 dq->dq_atomic_flags
os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
dq->dq_state = dq_state;
dq->dq_serialnum =
os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
}
最后是_dispatch_introspection_queue_create函数,一个内省函数。
dispatch_queue_t _dispatch_introspection_queue_create(dispatch_queue_t dq)
{
TAILQ_INIT(&dq->diq_order_top_head);
TAILQ_INIT(&dq->diq_order_bottom_head);
_dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
TAILQ_INSERT_TAIL(&_dispatch_introspection.queues, dq, diq_list);
_dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);
DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
_dispatch_introspection_queue_create_hook(dq);
}
return dq;
}
至此,一个队列的创建过程我们大致了解了。大致可以分为这么几点
- 设置队列优先级
- 默认创建的是一个串行队列
- 设置队列挂载的根队列。优先级不同根队列也不同
- 实例化vtable对象,这个对象给不同队列指定了push、wakeup等函数。
3. 任务异步dispatch_async
内部就是两个函数:
-
_dispatch_continuation_init
:任务包装函数 -
_dispatch_continuation_async
:并发处理函数
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME; // 设置标识位
dispatch_qos_t qos;
// 任务的包装器 接收 保存 函数式 保存block
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
dispatch_continuation_init
函数只是一个初始化,主要就是保存Block上下文,指定block的执行函数
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
// block对象赋值到dc_ctx
void *ctxt = _dispatch_Block_copy(work); //拷贝任务
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;//赋值
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);/封装work - 异步回调
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
_dispatch_call_block_and_release
这个函数就是直接执行block了,所以dc->dc_func被调用的话就block会被直接执行了。
void
_dispatch_call_block_and_release(void *block)
{
void (^b)(void) = block;
b();
Block_release(b);
}
上面的初始化过程就是这样,接着看下_dispatch_continuation_async函数
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
// 如果是用barrier插进来的任务或者是串行队列,直接将任务加入到队列
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
//trace 轨迹 跟踪器的意思
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);//宏
}
宏:#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
,我们只关心dq_push
全局搜索我们看到:
DISPATCH_VTABLE_INSTANCE(queue,
// This is the base class for queues, no objects of this type are made
.do_type = _DISPATCH_QUEUE_CLUSTER,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
);
DISPATCH_VTABLE_INSTANCE(workloop,
.do_type = DISPATCH_WORKLOOP_TYPE,
.do_dispose = _dispatch_workloop_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_workloop_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_workloop_wakeup,
.dq_push = _dispatch_workloop_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
根据队列的类型不同,这里会对应的有不同的方法
比如:
并发队列:.dq_push = _dispatch_lane_concurrent_push,
串行队列:.dq_push = _dispatch_lane_push,
等等
在项目中我们可以添加符号断点_dispatch_lane_concurrent_push
验证如下:
所以我们先研究下:_dispatch_lane_concurrent_push
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
// 进行一系列的判断后,进入该方法。
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
符号断点调试可以发现会走_dispatch_continuation_redirect_push
这里:
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
// 递归调用,
// 原因在于GCD也是对象,也存在继承封装的问题,类似于 类 父类 根类的关系。
dx_push(dq, dou, qos);
}
这里会发现又走到了dx_push,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法
接下来,通过根类的_dispatch_root_queue_push
符号断点,来验证猜想是否正确,从运行结果看出,完全是正确的
进入_dispatch_root_queue_push -> _dispatch_root_queue_push_inline ->_dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow源码,经过符号断点验证,确实是走的这里,查看该方法的源码实现,主要有两步操作
通过_dispatch_root_queues_init方法注册回调
通过do-while循环创建线程,使用pthread_create方法
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();//重点
...
//do-while循环创建线程
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
首先我们先分析一下:_dispatch_root_queues_init
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}
进入_dispatch_root_queues_init
源码实现,发现是一个dispatch_once_f
单例(这里不作说明,以后重点分析),其中传入的func是_dispatch_root_queues_init_once。
_dispatch_root_queues_init_once 开始进入底层os的处理了。通过断点,bt打印堆栈信息如下:
(lldb) bt
* thread #7, queue = 'cooci', stop reason = breakpoint 1.1
* frame #0: 0x0000000100449144 001---函数与队列`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x000000010044c0e0) at ViewController.m:48:9
frame #1: 0x00000001004efb68 libdispatch.dylib`_dispatch_call_block_and_release + 32
frame #2: 0x00000001004f15f0 libdispatch.dylib`_dispatch_client_callout + 20
frame #3: 0x00000001004f44e4 libdispatch.dylib`_dispatch_continuation_pop + 584
frame #4: 0x00000001004f3888 libdispatch.dylib`_dispatch_async_redirect_invoke + 692
frame #5: 0x00000001005042ec libdispatch.dylib`_dispatch_root_queue_drain + 364
frame #6: 0x0000000100504c94 libdispatch.dylib`_dispatch_worker_thread2 + 140
frame #7: 0x00000001d52b48cc libsystem_pthread.dylib`_pthread_wqthread + 216
(lldb)
综上所述,异步函数的底层分析如下
【准备工作】:首先,将异步任务拷贝并封装,并设置回调函数func
【block回调】:底层通过dx_push递归,会重定向到根队列,然后通过pthread_creat创建线程,最后通过dx_invoke执行block回调(注意dx_push 和 dx_invoke 是成对的)
异步函数的底层分析流程如图所示
4. 任务同步dispatch_sync
dispatch_sync直接调用的是dispatch_sync_f
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
// 很大可能不会走if分支,看做if(_dispatch_block_has_private_data(work))
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
查看_dispatch_sync_f_inline
源码,其中width = 1
表示是串行队列,其中有两个重点:
- 栅栏:
_dispatch_barrier_sync_f
(可以通过后文的栅栏函数底层分析解释),可以得出同步函数的底层实现其实是同步栅栏函数 - 死锁:
_dispatch_sync_f_slow
,如果存在相互等待的情况,就会造成死锁
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
// 串行队列会走到这个if分支
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
// 全局获取的并行队列或者绑定的是非调度线程的队列会走进这个if分支
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
第一种情况,串行队列:dispatch_barrier_sync_f
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
// 队列绑定的是非调度线程就会走这里
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
// 一般会走到这里
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
// 首先会执行这个函数
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
// 内部其实就是唤醒队列
return _dispatch_lane_barrier_complete(dq, 0, 0);
}
const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
uint64_t old_state, new_state;
// similar to _dispatch_queue_drain_try_unlock
// 原子锁。检查dq->dq_state与old_state是否相等,如果相等把new_state赋值给dq->dq_state,如果不相等,把dq_state赋值给old_state。
// 串行队列走到这里,dq->dq_state与old_state是相等的,会把new_state也就是闭包里的赋值的值给dq->dq_state
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
if (unlikely(old_state & fail_unlock_mask)) {
os_atomic_rmw_loop_give_up({
return _dispatch_lane_barrier_complete(dq, 0, 0);
});
}
});
if (_dq_state_is_base_wlh(old_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
}
自定义并行队列会走_dispatch_sync_invoke_and_complete函数。
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
// 将自定义队列加入到root队列里去
// dispatch_async也会调用此方法,之前我们初始化的时候会绑定一个root队列,这里就将我们新建的队列交给root队列进行管理
_dispatch_lane_non_barrier_complete(dq, 0);
}
static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
// 执行任务
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
_dispatch_sync_f_slow 死锁
进入_dispatch_sync_f_slow,当前的主队列是挂起、阻塞的
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
往一个队列中 加入任务,会push加入主队列,进入_dispatch_trace_item_push
static inline void
_dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
{
if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
_dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
}
_dispatch_trace_item_push_inline(dqu._dq, _tail._do);
_dispatch_introspection_queue_push(dqu, _tail);
}
进入DISPATCH_WAIT_FOR_QUEUE,判断dq是否为正在等待的队列,然后给出一个状态state,然后将dq的状态和当前任务依赖的队列进行匹配
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
.....省略 ....
}
进入_dq_state_drain_locked_by -> _dispatch_lock_is_locked_by源码
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
//异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
//即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相乘,如果相等,则会造成死锁
同步函数 + 并发队列 顺序执行的原因
在_dispatch_sync_invoke_and_complete -> _dispatch_sync_function_invoke_inline源码中,主要有三个步骤:
- 将任务压入队列: _dispatch_thread_frame_push
- 执行任务的block回调: _dispatch_client_callout
- 将任务出队:_dispatch_thread_frame_pop
从实现中可以看出,是先将任务push队列中,然后执行block回调,在将任务pop,所以任务是顺序执行的。
综上所述,同步函数的底层分析如下
同步函数的底层实现实际是同步栅栏函数
同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁
步骤如下:
三 、总结
分析了这一阶段的源码之后,我觉得了解下基本流程就行了,最后都来到底层函数封装的调用。接下来我们分析GCD的另外一部分,GCD底层原理下,单例,栅栏函数、信号量等等。
网友评论