协程

作者: wayyyy | 来源:发表于2020-08-31 23:52 被阅读0次

以下全文基本是以下几篇文章搬运整合,只做自己记录学习使用,未及时获得作者授权,侵删。

进程
线程

为什么需要线程?或者说线程有什么优点?

协程的优点
  • 相比线程更加轻量
  • 将异步流程同步化处理
x86-64 函数模型
ucontext簇函数
  • ucontext

    #include <ucontext.h>
    typedef struct ucontext {
        struct ucontext *uc_link;
        sigset_t         uc_sigmask;
        stack_t          uc_stack;
        mcontext_t       uc_mcontext;
        ...
    } ucontext_t;
    
    typedef struct sigaltstack
    {
      void *ss_sp;
      int ss_flags;
      size_t ss_size;
    } stack_t;
    
    • uc_link指向当前上下文结束时要恢复的上下文。
    • uc_stack是这个上下文使用的栈。
    • uc_mcontext是机器特定的保存上下文的表示,包括调用协程的机器寄存器。
  • 示例:

    #include <ucontext.h>
    
    static ucontext_t ctx[3];
    
    static void f1()
    {
        puts("start f1");
        swapcontext(&ctx[1], &ctx[2]);
        puts("finish f1");
    }
    
    static void f2()
    {
        puts("start f2");
        swapcontext(&ctx[2], &ctx[1]);
        puts("finish f2");
    }
    
    int main()
    {
        char st1[8192];
        char st2[8192];
    
        getcontext(&ctx[1]);
        ctx[1].uc_stack.ss_sp = st1;
        ctx[1].uc_stack.ss_size = sizeof st1;
        ctx[1].uc_link = &ctx[0];
        makecontext(&ctx[1], f1, 0);
    
        getcontext(&ctx[2]);
        ctx[2].uc_stack.ss_sp = st2;
        ctx[2].uc_stack.ss_size = sizeof st2;
        ctx[2].uc_link = &ctx[1];
        makecontext(&ctx[2], f2, 0);
    
        swapcontext(&ctx[0], &ctx[2]);   
    }
    
    输出:  
    start f2
    start f1
    finish f2
    finish f1
    
  • getcontext

    int getcontext(ucontext_t *ucp);
    

    getcontext()函数初始化ucp所指向的结构体,填充当前有效的上下文。

  • makecontext

    void makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...);
    

    makecontext()函数使得程序的执行会切换到func的调用,通过 makecontext()调用的argc传递func的参数。

  • swapcontext

    int swapcontext(ucontext_t *restrict oucp, const ucontext_t *restrict ucp);
    

    swapcontext()函数保存当前的上下文到oucp所指向的数据结构,并且切换到ucp所指向的上下文。

云风协程库示例
协程调度器
struct schedule {
    char stack[STACK_SIZE]; // 运行时栈,此栈即是共享栈
    ucontext_t main; // 主协程的上下文
    int nco;        // 当前存活的协程个数
    int cap;        // 协程管理器的当前最大容量,即可以同时支持多少个协程。如果不够了,则进行2倍扩容
    int running;    // 正在运行的协程ID
    struct coroutine **co; // 一个一维数组,用于存放所有协程。其长度等于cap
};
  • coroutine_open协程调度器的创建
    #define DEFAULT_COROUTINE 16
    struct schedule* coroutine_open(void) {
        // 这里做的主要就是分配内存,同时赋初值
        struct schedule *S = malloc(sizeof(*S));
        S->nco = 0;
        S->cap = DEFAULT_COROUTINE;
        S->running = -1;
        S->co = malloc(sizeof(struct coroutine *) * S->cap);
        memset(S->co, 0, sizeof(struct coroutine *) * S->cap);
        return S;
    }
    
协程的创建
struct coroutine {
    coroutine_func func; // 协程所用的函数
    void *ud;  // 协程参数
    ucontext_t ctx; // 协程上下文
    struct schedule * sch; // 该协程所属的调度器
    ptrdiff_t cap;   // 已经分配的内存大小
    ptrdiff_t size; // 当前协程运行时栈,保存起来后的大小
    int status; // 协程当前的状态
    char *stack; // 当前协程的保存起来的运行时栈
};
  • coroutine_new 创建协程

    #define COROUTINE_DEAD 0
    #define COROUTINE_READY 1
    #define COROUTINE_RUNNING 2
    #define COROUTINE_SUSPEND 3
    
    struct coroutine * _co_new(struct schedule *S , coroutine_func func, void *ud) 
    {
        struct coroutine * co = malloc(sizeof(*co));
        co->func = func;
        co->ud = ud;
        co->sch = S;
        co->cap = 0;
        co->size = 0;
        co->status = COROUTINE_READY; // 默认的最初状态都是COROUTINE_READY
        co->stack = NULL;
        return co;
    }
    
    int coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
        struct coroutine *co = _co_new(S, func , ud);
        if (S->nco >= S->cap) {
            // 如果目前协程的数量已经大于调度器的容量,那么进行扩容
            int id = S->cap;  // 新的协程的id直接为当前容量的大小
            // 扩容的方式为,扩大为当前容量的2倍
            S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *));
            // 初始化内存
            memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap);
             // 将协程放入调度器
            S->co[S->cap] = co;
            // 将容量扩大为两倍
            S->cap *= 2;
            // 尚未结束运行的协程的个数 
            ++S->nco; 
            return id;
        } else {
            // 如果目前协程的数量小于调度器的容量,则取一个为NULL的位置,放入新的协程
            for (int i = 0; i < S->cap; i++) {
                int id = (i+S->nco) % S->cap;
                if (S->co[id] == NULL) {
                    S->co[id] = co;
                    ++S->nco;
                    return id;
                }
             }
        }
        assert(0);
        return -1;
    }
    

    这样,一个协程对象就被创建好,此时该协程的状态是READY,但尚未正式执 行。

    coroutine_resume函数会切入到指定协程中执行。当前正在执行的协程的上下文会被保存起来,同时上下文替换成新的协程,该协程的状态将被置为RUNNING。

    进入coroutine_resume函数的前置状态有两个READYSUSPEND,这两个状态下coroutine_resume的处理方法也是有很大不同。我们先看下协程在READY状态下进行coroutine_resume的流程。

coroutine_resume(READY -> RUNNING)
void coroutine_resume(struct schedule * S, int id) {
    assert(S->running == -1);
    assert(id >=0 && id < S->cap);

    // 取出协程
    struct coroutine *C = S->co[id];
    if (C == NULL)
        return;

    int status = C->status;
    switch(status) {
        case COROUTINE_READY:
            // 初始化ucontext_t结构体,将当前的上下文放到C->ctx里面
            getcontext(&C->ctx);
            // 将当前协程的运行时栈的栈顶设置为S->stack,每个协程都这么设置,这就是所谓的共享栈。(注意,这里是栈顶)
            C->ctx.uc_stack.ss_sp = S->stack; 
            C->ctx.uc_stack.ss_size = STACK_SIZE;
            C->ctx.uc_link = &S->main; // 如果协程执行完,将切换到主协程中执行
            S->running = id;
            C->status = COROUTINE_RUNNING;

            // 设置执行C->ctx函数, 并将S作为参数传进去
            uintptr_t ptr = (uintptr_t)S;
            makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));

            // 将当前的上下文放入S->main中,并将C->ctx的上下文替换到当前上下文
            swapcontext(&S->main, &C->ctx);
            break;
        case COROUTINE_SUSPEND:
            // 将协程所保存的栈的内容,拷贝到当前运行时栈中
            // 其中C->size在yield时有保存
            memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
            S->running = id;
            C->status = COROUTINE_RUNNING;
            swapcontext(&S->main, &C->ctx);
            break;
        default:
            assert(0);
    }
}
状态转换机
image.png
共享栈

共享栈这个词在libco中提到的多,其实coroutine也是用的共享栈模型。 共享栈这个东西说起来很玄乎,实际原理不复杂,本质就是所有的协程在运行的时候都使用同一个栈空间。

共享栈对标的是非共享栈,也就是每个协程的栈空间都是独立的,固定大小。好处是协程切换的时候,内存不用拷贝来拷贝去。坏处则是内存空间浪费.

因为栈空间在运行时不能随时扩容,为了防止栈内存不够,所以要预先每个协程都要预先开一个足够的栈空间使用。当然很多协程用不了这么大的空间,就必然造成内存的浪费。

共享栈则是提前开了一个足够大的栈空间(coroutine默认是1M)。所有的栈运行的时候,都使用这个栈空间。 conroutine是这么设置每个协程的运行时栈:

C->ctx.uc_stack.ss_sp = S->stack;
C->ctx.uc_stack.ss_size = STACK_SIZE;
对协程调用yield的时候,该协程栈内容暂时保存起来,保存的时候需要用到多少内存就开多少,这样就减少了内存的浪费。(即_save_stack函数的内容)。 当resume该协程的时候,协程之前保存的栈内容,会被重新拷贝到运行时栈中。

这就是所谓的共享栈的原理。

相关文章

网友评论

      本文标题:协程

      本文链接:https://www.haomeiwen.com/subject/zwtqictx.html