目录
探究 iOS 协程 - 协程介绍与使用(一)
探究 iOS 协程 - coobjc 源码分析(二)
上一篇讲完了协程的概念与使用方式,这一篇我们来分析一下阿里开源协程框架 coobjc 源码。首先我们先写一个最简单的示例程序:
- (void)testCORoutineAsyncFunc {
co_launch(^{
NSLog(@"co start");
// await 后面需要跟 COChan 或者 COPromise
NSNumber *num = await([self promiseWithNumber:@(1)]);
NSLog(@"co finish");
});
NSLog(@"main");
}
// COPromise 模拟了一个异步任务
- (COPromise *)promiseWithNumber:(NSNumber *)number {
COPromise *promise = [COPromise promise:^(COPromiseFulfill _Nonnull fullfill, COPromiseReject _Nonnull reject) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
fullfill(number);
// reject(error); // 如果有错误,回调到上层
});
} onQueue:dispatch_get_global_queue(0, 0)];
return promise;
}
以上的代码会输出:
main
co start
co finish
co_launch
这里就在主线程开启了一个协程,现在大家应该特别好奇 await
为什么可以等待异步任务完成?别着急,我们慢慢往下看。
创建协程
首先我们来看一下 co_launch
做了什么事:
/**
Create a coroutine, then resume it asynchronous on current queue.
@param block the code execute in the coroutine
@return the coroutine instance
*/
NS_INLINE COCoroutine * _Nonnull co_launch(void(^ _Nonnull block)(void)) {
// 创建协程
COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
// 开启协程
return [co resume];
}
co_launch
主要做了两件事:
- 创建协程,把协程需要执行的 block 作为参数传进去。
co_launch
默认会在当前线程创建协程。 - 启动协程。
我们具体来看看如何创建协程:
- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize {
self = [super init];
if (self) {
// 协程需要执行的 block 赋予属性 execBlock
_execBlock = [block copy];
_dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch];
// 真正创建协程的方法。真正的协程是 coroutine_t 结构体类型,COCoroutine 只是在 OC 层面的一层封装
coroutine_t *co = coroutine_create((void (*)(void *))co_exec);
// 指定栈空间
if (stackSize > 0 && stackSize < 1024*1024) { // Max 1M
co->stack_size = (uint32_t)((stackSize % 16384 > 0) ? ((stackSize/16384 + 1) * 16384) : stackSize); // Align with 16kb
}
_co = co;
// 让 coroutine_t 引用 COCoroutine,并设置销毁函数
coroutine_setuserdata(co, (__bridge_retained void *)self, co_obj_dispose);
}
return self;
}
上面贴出了创建协程的关键方法,相关的步骤已经给出了注释,我们具体来看coroutine_create
:
coroutine_t *coroutine_create(coroutine_func func) {
coroutine_t *co = calloc(1, sizeof(coroutine_t));
co->entry = func;
co->stack_size = STACK_SIZE;
co->status = COROUTINE_READY;
// check debugger is attached, fix queue debugging.
co_rebind_backtrace();
return co;
}
co_rebind_backtrace
这里先忽略。这个方法很简单,就是创建一个 coroutine_t
结构体,把之前调用者传入的 co_exec
赋值给 entry
属性。这里的 co_exec
是一个函数,下面我们来看看这个函数的具体实现:
static void co_exec(coroutine_t *co) {
/* 通过 co_get_obj 拿到 COCoroutine 对象
(之前在创建协程的时候通过 coroutine_setuserdata 把 COCoroutine 对象设置到了 coroutine_t 结构体中)。
这里需要拿到 COCoroutine 的原因是因为协程真正执行的 block 是保存在 COCoroutine 对象中的
*/
COCoroutine *coObj = co_get_obj(co);
if (coObj) {
// 执行之前保存的 execBlock
[coObj execute];
coObj.isFinished = YES;
if (coObj.finishedBlock) {
coObj.finishedBlock();
coObj.finishedBlock = nil;
}
if (coObj.joinBlock) {
coObj.joinBlock();
coObj.joinBlock = nil;
}
//维护父子协程关系
[coObj.parent removeChild:coObj];
}
}
co_exec
主要做的事就是执行保存在 coroutine
上的 block。目前我们的协程就算创建完毕了。
启动协程
通过上面的分析可以看到,co_exec
是真正执行协程 block 的地方,那么 co_exec
是在什么时候开始执行的呢?回到最开始 co_launch
的地方。co_launch
之后,会立刻调用 [co resume]
,这里 resume
就是真正启动协程的地方,下面我们来看看 resume
具体实现:
- (COCoroutine *)resume {
// 拿到当前真正运行的协程
COCoroutine *currentCo = [COCoroutine currentCoroutine];
// 判断是否是当前运行协程的子协程
BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
[self.dispatch dispatch_async_block:^{
if (self.isResume) {
return;
}
// 如果是子协程,设置一下父子关系
if (isSubroutine) {
self.parent = currentCo;
[currentCo addChild:self];
}
self.isResume = YES;
// 启动协程
coroutine_resume(self.co);
}];
return self;
}
要注意,协程是异步追加到队列中的。如果没有特别指定队列,默认会追加到当前线程队列中。
具体启动协程在 coroutine_resume
,我们接着往里看:
void coroutine_resume(coroutine_t *co) {
if (!co->is_scheduler) {
// 拿到当前线程的协程调度器
coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
co->scheduler = scheduler;
// 把协程丢到 scheduler 维护的协程集合里(这里的集合是用双向链表实现)
scheduler_queue_push(scheduler, co);
// 如果当前线程有真正运行的协程,把该协程 yield 掉
if (scheduler->running_coroutine) {
// resume a sub coroutine.
scheduler_queue_push(scheduler, scheduler->running_coroutine);
coroutine_yield(scheduler->running_coroutine);
} else {
// scheduler is idle
coroutine_resume_im(co->scheduler->main_coroutine);
}
}
}
在这里需要特别说明一下调度器这个概念。其实在上一篇文章有提到,实现协程的 resume 和 yield 需要一个调度器来控制。调度器每个线程独有一个,用来调度该线程下的所有协程。同一时间段每个线程下只有一个协程在 running
状态。
下面的图很好的诠释了线程、调度器和协程的关系:
这里的调度器就类似于操作系统在线程调度时候发挥的作用。为什么说协程是一种用户态的线程
,看到这里想必对这个概念也有了更深刻的理解。
下面我们通过代码来具体看看调度器是如何创建的。大家还记得上面在 coroutine_resume
方法内部调用了 coroutine_scheduler_self_create_if_not_exists
吗,我们来看看这个方法具体实现:
coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {
if (!coroutine_scheduler_key) {
pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);
}
void *schedule = pthread_getspecific(coroutine_scheduler_key);
if (!schedule) {
schedule = coroutine_scheduler_new();
pthread_setspecific(coroutine_scheduler_key, schedule);
}
return schedule;
}
可以看到调度器是被存在了 TSD
里,每个线程有且仅有一个,这也就更好的诠释了上面那张图片。
说完了调度器,下面我们再回到协程启动上来。我们当前线程只创建了一个协程,所以不存在 running_coroutine
,那么协程启动最终会调用到 coroutine_resume_im
来,这个函数有点长,我只截取了启动相关的部分:
void coroutine_resume_im(coroutine_t *co) {
switch (co->status) {
case COROUTINE_READY:
{
// 分配虚拟内存到 stack_memory
co->stack_memory = coroutine_memory_malloc(co->stack_size);
// 根据虚拟内存地址计算栈顶指针地址
co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
// get the pre context
// 在堆上开辟一块内存,随后调用 coroutine_getcontext 把当前函数调用栈存入 pre_context。
co->pre_context = malloc(sizeof(coroutine_ucontext_t));
BOOL skip = false;
// coroutine_getcontext 保存了当前函数调用栈,但最主要得是保存 lr 寄存器的地址(下一条指令地址)。
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
free(co->context);
co->context = calloc(1, sizeof(coroutine_ucontext_t));
// 通过 coroutine_makecontext 生成一个协程上下文,跟 coroutine_getcontext 类似,只不过这里是直接用结构体模拟的。
coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
// setcontext
// 真正开启协程的函数,这里一执行,就会调用到 coroutine_main 这个函数里。
coroutine_begin(co->context);
break;
}
.........
}
coroutine_resume_im
主要做了三件事:
- 把当前的函数栈保存在
co->pre_context
中(其实就是保存 lr)。 - 生成一个新的 context 保存在
co->context
中。 - 开始执行
co->context
中保存的函数(coroutine_main)。
coroutine_getcontext
、coroutine_makecontext
、coroutine_begin
等被称为协程族函数,具体实现细节会在后一篇文章讨论,这里只需要知道它们的作用就可以。
现在我们知道,协程本身会保存 pre_context
和新建一个 context
,这里也引申出来一个问题:为什么要保存 pre_context
?原因是当我们的协程执行完之后,还需要回到我们想回去的地方。我在哪里设置了 pre_context
,那当我协程执行完之后就可以通过 coroutine_setcontext
回到我当初设置 pre_context
的地方。
到这里大家也不难想象协程是怎么实现异步的同步化表达。在传统的 block 异步编程中,其实是把异步操作执行完需要回调的函数地址保存在 block 对象内部,然后通过 block 对象调用这个函数:
那么对于协程来说,它通过
coroutine
对象内部保存了当前函数调用栈,当异步执行完之后,取出保存的函数调用栈开始执行原来的函数。image.png
刚才说到在调用
coroutine_begin
之后会真正开始执行 coroutine_main
,我们一起来看看这个函数的实现:
static void coroutine_main(coroutine_t *co) {
co->status = COROUTINE_RUNNING;
// 执行协程中保存的 block
co->entry(co);
co->status = COROUTINE_DEAD;
// 执行完毕,回到保存函数栈的地方
coroutine_setcontext(co->pre_context);
}
重点看一下 co->entry(co)
,还记得一开始我们在创建协程的时候赋值给 co->entry
的函数吗?不清楚的可以回到文章一开始的地方看一下。那么在 coroutine_main
函数调用的时候就真正执行了保存在 co->entry
里的 co_exec
函数,这个函数里会调用保存在 COCoroutine
对象上的 execBlock
,也就是我们文章一开始例子中 co_launch
的 block 参数。
中断协程
现在,我们的协程已经顺利启动起来了。然后碰到了 await
函数,当前协程会暂停等待 await
之后的异步操作来唤醒,那么我们一起来看看这个函数做了什么:
/**
await
@param _promiseOrChan the COPromise object, you can also pass a COChan object.
But we suggest use Promise first.
@return return the value, nullable. after, you can use co_getError() method to get the error.
*/
NS_INLINE id _Nullable await(id _Nonnull _promiseOrChan) {
id val = co_await(_promiseOrChan);
return val;
}
await
函数很简单,就是调用了 co_await
,并把返回值返回了出去。我们真正需要看的是 co_await
这个核心函数:
id co_await(id awaitable) {
coroutine_t *t = coroutine_self();
if (t == nil) {
@throw [NSException exceptionWithName:COInvalidException reason:@"Cannot call co_await out of a coroutine" userInfo:nil];
}
if (t->is_cancelled) {
return nil;
}
if ([awaitable isKindOfClass:[COChan class]]) {
COCoroutine *co = co_get_obj(t);
co.lastError = nil;
// 内部会调用 yield 中断当前协程
id val = [(COChan *)awaitable receive];
return val;
} else if ([awaitable isKindOfClass:[COPromise class]]) {
// 创建 cochan
COChan *chan = [COChan chanWithBuffCount:1];
COCoroutine *co = co_get_obj(t);
co.lastError = nil;
COPromise *promise = awaitable;
[[promise
then:^id _Nullable(id _Nullable value) {
// 当有回调过来,调用 resume 恢复协程中断
[chan send_nonblock:value];
return value;
}]
catch:^(NSError * _Nonnull error) {
co.lastError = error;
[chan send_nonblock:nil];
}];
// 内部会调用 yield 中断当前协程
id val = [chan receiveWithOnCancel:^(COChan * _Nonnull chan) {
[promise cancel];
}];
return val;
} else {
@throw [NSException exceptionWithName:COInvalidException
reason:[NSString stringWithFormat:@"Cannot await object: %@.", awaitable]
userInfo:nil];
}
}
COChan内部实现
在上一篇文章中我们有提到 COChan 这个概念和它的一些用法,如果不清楚的话可以再回过去看一下,这里就不再赘述。在 co_await
源码里可以看到,不管传进来的 awaitable
对象是 COChan
还是 COPromise
,最终都会调用 COChan
的 receive
方法中断当前协程,我们先一起来看看 COChan
是如何创建的:
- (instancetype)initWithBuffCount:(int32_t)buffCount {
self = [super init];
if (self) {
_chan = chancreate(sizeof(int8_t), buffCount, co_chan_custom_resume);
_buffList = [[NSMutableArray alloc] init];
COOBJC_LOCK_INIT(_buffLock);
}
return self;
}
COChan
内部会创建一个 co_channel
结构体和一个 _buffList
数组。这里我们也可以看到,COChan
其实也是内部属性 co_channel
结构体的一层封装,真正核心逻辑还是 co_channel
在处理,下面我们一起来看看 chancreate
方法:
co_channel *chancreate(int elemsize, int bufsize, void (*custom_resume)(coroutine_t *co)) {
// bufsize == 外面传进来的 buffCount
co_channel *c;
if (bufsize < 0) {
// 没有 bufferCount 不需要额外存储空间
c = calloc(1, sizeof(co_channel));
} else {
c = calloc(1, (sizeof(co_channel) + bufsize*elemsize));
}
// init buffer
if (bufsize < 0) {
queueinit(&c->buffer, elemsize, 16, 16, NULL);
} else {
// bufferCount >= 0 -> expandsize == 0
queueinit(&c->buffer, elemsize, bufsize, 0, (void *)(c+1));
}
// init lock
c->lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
c->custom_resume = custom_resume;
return c;
}
bufsize
是我们外面传进来的 buffCount,在 co_await
函数中,buffCount 的值是 1
。当 bufsize > 0
的时候,会为 co_channel
结构体分配多余的内存空间。bufsize
这里代表缓冲区最大容量。
为 co_channel
分配完内存空间之后,会初始化 co_channel
中的 buffer
属性,该属性是一个 chan_queue
类型结构体:
static void queueinit(chan_queue *q, int elemsize, int bufsize, int expandsize, void *buf) {
// bufsize >= 0, expandsize == 0; bufsize < 0, expandsize == 16
q->elemsize = elemsize;
q->size = bufsize;
q->expandsize = expandsize;
if (expandsize) {
if (bufsize > 0) {
// 为容器分配内存空间
q->arr = malloc(bufsize * elemsize);
}
} else {
if (buf) {
// 这里的 buf 是 co_channel 里的 asend 结构体。
q->arr = buf;
}
}
}
创建 co_channel
主要就是初始化了内部的 buffer
属性,也就是缓冲区。其余的都比较简单。要注意在这里当外部传进来的 BuffCount >= 0 时,expandsize == 0,c->buffer->arr == c->asend。具体为什么要这样设计,我会在后面给出答案。
讲完了 COChan
的初始化,紧接着就会调用 [COChan receive]
,我们一起来看看 receive
内部做了什么。receive
最终都会调到 receiveWithOnCancel:
- (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock {
...
IMP cancel_exec = NULL;
if (cancelBlock) {
cancel_exec = imp_implementationWithBlock(^{
cancelBlock(self);
});
}
uint8_t val = 0;
int ret = chanrecv_custom_exec(_chan, &val, cancel_exec);
if (cancel_exec) {
imp_removeBlock(cancel_exec);
}
co.currentChan = nil;
if (ret == CHANNEL_ALT_SUCCESS) {
// success
do {
COOBJC_SCOPELOCK(_buffLock);
NSMutableArray *buffList = self.buffList;
if (buffList.count > 0) {
id obj = buffList.firstObject;
[buffList removeObjectAtIndex:0];
if (obj == kCOChanNilObj) {
obj = nil;
}
return obj;
} else {
return nil;
}
} while(0);
} else {
// ret not 1, means nothing received or cancelled.
return nil;
}
}
省略了与主流程无关的代码,重点来关注 chanrecv_custom_exec
:
int chanrecv_custom_exec(co_channel *c, void *v, IMP cancelExec) {
return _chanop2(c, CHANNEL_RECEIVE, v, 1, NULL, cancelExec);
}
最终调用了 _chanop2
,主要关注 CHANNEL_RECEIVE
这个枚举:
typedef enum {
CHANNEL_SEND = 1,
CHANNEL_RECEIVE,
} channel_op;
CHANNEL_SEND
代表往 chan 里面发送消息,也就是调用 send
或者 send_nonblock
;
CHANNEL_RECEIVE
代表调用了 chan 的 receive
或者 receive_nonblock
。
接下来看一下 _chanop2
:
static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exec, IMP cancel_exec) {
chan_alt *a = malloc(sizeof(chan_alt));
a->channel = c;
a->op = op;
a->value = p;
// 应该是重复赋值了一次
a->op = op;
// 是否需要 yield 当前协程(如果是调用 nonblock 后缀的方法,canblock == 0)
a->can_block = canblock;
a->prev = NULL;
a->next = NULL;
a->is_cancelled = false;
// send 的时候会传入 custom_exec
a->custom_exec = custom_exec;
a->cancel_exec = cancel_exec;
int ret = chanalt(a);
free(a);
return ret;
}
这里主要就是创建 chan_alt
结构体,真正的核心逻辑在 chan_alt
:
int chanalt(chan_alt *a) {
int canblock = a->can_block;
co_channel *c;
coroutine_t *t = coroutine_self();
// task = coroutine_t
a->task = t;
c = a->channel;
// 对 co_channel 加锁
chanlock(c);
// 判断是否需要执行 alt
if(altcanexec(a)) {
return altexec(a);
}
if(!canblock) {
chanunlock(c);
return a->op == CHANNEL_SEND ? CHANNEL_ALT_ERROR_BUFFER_FULL : CHANNEL_ALT_ERROR_NO_VALUE;
}
// add to queue
altqueue(a);
// set coroutine's chan_alt
t->chan_alt = a;
chanunlock(c);
// blocking.
coroutine_yield(t);
// resume
t->chan_alt = nil;
// alt is cancelled
if (a->is_cancelled) {
return CHANNEL_ALT_ERROR_CANCELLED;
}
return CHANNEL_ALT_SUCCESS;
}
在 chan_alt
内部会首先判断该 chan_alt
是否能够执行,其次会判断是否是 block 类型的函数,在这里会出现这么几种执行路径:
- 如果不能执行(缓冲区满了),并且调用的是
receive_nonblock
或send_nonblock
,那么会直接return
。 - 如果不能执行(缓冲区满了),并且调用的是
receive
或send
,那么会被coroutine_yield
把当前协程中断。 - 如果可以执行,那么会调用
altexec
并返回结果。
我们先来看一下 altcanexec
函数:
static int altcanexec(chan_alt *a) {
alt_queue *altqueue;
co_channel *c;
c = a->channel;
// buffer.size 是初始化 COChan 时传进去的 BuffCount,代表缓冲区的容量
// buffer.count 是 buffer 里实际任务的数量
if(c->buffer.size == 0){
/**
1.未设置 buffer.size 或者 buffer.size == 0 说明需要立即执行 chan 里的任务
2.otherop 对 a->op 取反操作,然后会拿到与 op 相反操作的队列
比如当前的 op 为 CHANNEL_RECEIVE,那么这里的 altqueue 就是拿到一个
SEND的操作队列。如果 SEND 队列里面有任务,证明当前的 RECEIVE 操作是可以执行的;
反之如果当前 op 为 CHANNEL_SEND,如果 RECEIVE 队列中有任务,那么 CHANNEL_SEND
也是可以执行的。
*/
altqueue = chanarray(c, otherop(a->op));
return altqueue && altqueue->count;
} else if (c->buffer.expandsize) {
// c->buffer.expandsize > 0,代表 buffer.size < 0 的情况。
// 如果设置了 buffer.expandsize,意味着 SEND 可以永远成功 (await 不会走这里)
// expandable buffer
switch(a->op){
default:
return 0;
case CHANNEL_SEND:
// send always success.
return 1;
case CHANNEL_RECEIVE:
return c->buffer.count > 0;
}
} else{
// buffer.size > 0 的情况
//这里的 c.buffer == c.asend
switch(a->op){
default:
return 0;
case CHANNEL_SEND:
// SEND时,buffer 里任务的数量 < 缓冲区最大容量,可以执行 SEND
return c->buffer.count < c->buffer.size;
case CHANNEL_RECEIVE:
// RECEIVE时,buffer 里有任务就可以执行
return c->buffer.count > 0;
}
}
}
这里忽略 c->buffer.expandsize
中的逻辑,重点来看 c->buffer.size == 0
和 else
两个分支。关于 buffer.size
和 buffer.count
不太理解的可以看上面 co_channel
创建过程的分析,理解了它们俩的概念,再来看这段逻辑应该不难:
-
buffer.size == 0
(无缓冲区),RECEIVE
会直接取c->asend
,SEND
会直接取c->arecv
。如果队列里面有任务,那么可以成功。 -
buffer.size > 0
(有缓冲区),如果缓冲区内未达最大容量,SEND
可以成功;如果缓冲区内有任务,RECEIVE
可以成功。
如下图:
在
buffer.size > 0
这个分支里也可以找到为什么要把 c->buffer
设置为 c->asend
的答案:对于存在缓冲区的情况,SEND
和 RECEIVE
都只需要判断 SEND
任务队里中的任务数量,而不需要关心 RECEIVE
任务队列中的任务数量。看完了上面的分析,大家对于中断的流程应该比较清楚了:当
await
内部调用 receive
的时候,c->asend
里面是不存在任务的,所以 altcanexec
返回 false
,当前协程会被 coroutine_yield
中断。
恢复协程
上面说到 receive
会中断当前的协程,那么当异步任务完成之后,会调用 [COChan send_nonblock:val]
把获取的到数据 val
传给 COChan
,在这个过程中就触发了协程恢复。当调用 send
的时候,a->arecv
内部有任务,altcanexec
返回 true
,会立即执行 altexec
函数:
static int altexec(chan_alt *a) {
alt_queue *altqueue;
chan_alt *other = NULL;
co_channel *c;
c = a->channel;
// 拿到 a->op 取反操作队列
altqueue = chanarray(c, otherop(a->op));
// 取出双向链表尾部的任务
if(altqueuepop(altqueue, &other)){
int copyRet = altcopy(a, other);
assert(copyRet == 1);
// 拿到 other 上的协程(如果是 SEND 这里就是 RECEIVE 的协程)
coroutine_t *co = other->task;
// co_chan_custom_resume
void (*custom_resume)(coroutine_t *co) = c->custom_resume;
chanunlock(c);
// call back sender
chan_alt *sender = a->op == CHANNEL_SEND ? a : other;
// 如果是 SEND 直接执行 a->custom_exec, 如果是 RECEIVE 执行 other->custom_exec
if (sender->custom_exec) {
// [self.buffList addObject:val ?: kCOChanNilObj];
sender->custom_exec();
}
// 把协程加到当前调度器中,如果该调度器上没有协程在运行,会立刻 resume 这个协程
if (custom_resume) {
custom_resume(co);
} else {
coroutine_add(co);
}
return CHANNEL_ALT_SUCCESS;
} else {
// altqueue 里没有任务
int copyRet = altcopy(a, nil);
chanunlock(c);
if (copyRet && a->op == CHANNEL_SEND) {
if (a->custom_exec) {
a->custom_exec();
}
}
return copyRet ? CHANNEL_ALT_SUCCESS : CHANNEL_ALT_ERROR_COPYFAIL;
}
}
这个函数代码比较多,总结起来就是:
- 根据
c->op
取出反操作队列尾部的任务。 - 拿到该任务保存的协程对象。
- 如果是
SEND
操作,执行绑定在chan_alt
上的custom_exec
,这个函数主要是这句代码[self.buffList addObject:val ?: kCOChanNilObj]
,就是把send
后面的参数添加到COChan
的buffList
属性里。 -
resume
第二步保存的协程对象。
到这里我们就可以知道,当满足 altcanexec
的条件之后:
- 如果调用
send_nonblock
函数,那么会取出RECEIVE
队列中的任务,把 send 过来的val
放到buffList
中,然后通过custom_resume
恢复RECEIVE
任务中的协程,恢复之后会从buffList
里面取出刚才 send 传过来的val
,然后return
出去。 - 如果调用
receive_nonblock
函数,会取出SEND
队列中的任务,把 send 过来的val
放到buffList
中,恢复RECEIVE
任务中的协程。执行完SEND
协程的代码后继续执行return CHANNEL_ALT_SUCCESS
,返回到上层后receive_nonblock
会返回 send 存在buffList
中的值。
到此,整个 await 的流程已经比较清晰了,如下图:
一次 await 时序.png
最后
笔者的这篇文章主要从一个简单的协程例子开始,按着代码执行步骤一步一步带大家分析整个协程执行的流程,大家可以边看文章边跟着源码过一遍加深记忆。整个协程实现异步的同步化表达的过程核心在COChan,也就是一个阻塞的消息队列。当然还有其它的一些类(比如COActor)没有在这里展开讲,其实原理都差不多,它们的核心都是基于协程的几个族函数。
在下一篇文章我会继续带大家分析这几个族函数在 ARM64 下的实现。
网友评论