协程

作者: wayyyy | 来源:发表于2020-08-31 23:52 被阅读0次

    以下全文基本是以下几篇文章搬运整合,只做自己记录学习使用,未及时获得作者授权,侵删。

    进程
    线程

    为什么需要线程?或者说线程有什么优点?

    协程的优点
    • 相比线程更加轻量
    • 将异步流程同步化处理
    x86-64 函数模型
    ucontext簇函数
    • ucontext

      #include <ucontext.h>
      typedef struct ucontext {
          struct ucontext *uc_link;
          sigset_t         uc_sigmask;
          stack_t          uc_stack;
          mcontext_t       uc_mcontext;
          ...
      } ucontext_t;
      
      typedef struct sigaltstack
      {
        void *ss_sp;
        int ss_flags;
        size_t ss_size;
      } stack_t;
      
      • uc_link指向当前上下文结束时要恢复的上下文。
      • uc_stack是这个上下文使用的栈。
      • uc_mcontext是机器特定的保存上下文的表示,包括调用协程的机器寄存器。
    • 示例:

      #include <ucontext.h>
      
      static ucontext_t ctx[3];
      
      static void f1()
      {
          puts("start f1");
          swapcontext(&ctx[1], &ctx[2]);
          puts("finish f1");
      }
      
      static void f2()
      {
          puts("start f2");
          swapcontext(&ctx[2], &ctx[1]);
          puts("finish f2");
      }
      
      int main()
      {
          char st1[8192];
          char st2[8192];
      
          getcontext(&ctx[1]);
          ctx[1].uc_stack.ss_sp = st1;
          ctx[1].uc_stack.ss_size = sizeof st1;
          ctx[1].uc_link = &ctx[0];
          makecontext(&ctx[1], f1, 0);
      
          getcontext(&ctx[2]);
          ctx[2].uc_stack.ss_sp = st2;
          ctx[2].uc_stack.ss_size = sizeof st2;
          ctx[2].uc_link = &ctx[1];
          makecontext(&ctx[2], f2, 0);
      
          swapcontext(&ctx[0], &ctx[2]);   
      }
      
      输出:  
      start f2
      start f1
      finish f2
      finish f1
      
    • getcontext

      int getcontext(ucontext_t *ucp);
      

      getcontext()函数初始化ucp所指向的结构体,填充当前有效的上下文。

    • makecontext

      void makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...);
      

      makecontext()函数使得程序的执行会切换到func的调用,通过 makecontext()调用的argc传递func的参数。

    • swapcontext

      int swapcontext(ucontext_t *restrict oucp, const ucontext_t *restrict ucp);
      

      swapcontext()函数保存当前的上下文到oucp所指向的数据结构,并且切换到ucp所指向的上下文。

    云风协程库示例
    协程调度器
    struct schedule {
        char stack[STACK_SIZE]; // 运行时栈,此栈即是共享栈
        ucontext_t main; // 主协程的上下文
        int nco;        // 当前存活的协程个数
        int cap;        // 协程管理器的当前最大容量,即可以同时支持多少个协程。如果不够了,则进行2倍扩容
        int running;    // 正在运行的协程ID
        struct coroutine **co; // 一个一维数组,用于存放所有协程。其长度等于cap
    };
    
    • coroutine_open协程调度器的创建
      #define DEFAULT_COROUTINE 16
      struct schedule* coroutine_open(void) {
          // 这里做的主要就是分配内存,同时赋初值
          struct schedule *S = malloc(sizeof(*S));
          S->nco = 0;
          S->cap = DEFAULT_COROUTINE;
          S->running = -1;
          S->co = malloc(sizeof(struct coroutine *) * S->cap);
          memset(S->co, 0, sizeof(struct coroutine *) * S->cap);
          return S;
      }
      
    协程的创建
    struct coroutine {
        coroutine_func func; // 协程所用的函数
        void *ud;  // 协程参数
        ucontext_t ctx; // 协程上下文
        struct schedule * sch; // 该协程所属的调度器
        ptrdiff_t cap;   // 已经分配的内存大小
        ptrdiff_t size; // 当前协程运行时栈,保存起来后的大小
        int status; // 协程当前的状态
        char *stack; // 当前协程的保存起来的运行时栈
    };
    
    • coroutine_new 创建协程

      #define COROUTINE_DEAD 0
      #define COROUTINE_READY 1
      #define COROUTINE_RUNNING 2
      #define COROUTINE_SUSPEND 3
      
      struct coroutine * _co_new(struct schedule *S , coroutine_func func, void *ud) 
      {
          struct coroutine * co = malloc(sizeof(*co));
          co->func = func;
          co->ud = ud;
          co->sch = S;
          co->cap = 0;
          co->size = 0;
          co->status = COROUTINE_READY; // 默认的最初状态都是COROUTINE_READY
          co->stack = NULL;
          return co;
      }
      
      int coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
          struct coroutine *co = _co_new(S, func , ud);
          if (S->nco >= S->cap) {
              // 如果目前协程的数量已经大于调度器的容量,那么进行扩容
              int id = S->cap;  // 新的协程的id直接为当前容量的大小
              // 扩容的方式为,扩大为当前容量的2倍
              S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *));
              // 初始化内存
              memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap);
               // 将协程放入调度器
              S->co[S->cap] = co;
              // 将容量扩大为两倍
              S->cap *= 2;
              // 尚未结束运行的协程的个数 
              ++S->nco; 
              return id;
          } else {
              // 如果目前协程的数量小于调度器的容量,则取一个为NULL的位置,放入新的协程
              for (int i = 0; i < S->cap; i++) {
                  int id = (i+S->nco) % S->cap;
                  if (S->co[id] == NULL) {
                      S->co[id] = co;
                      ++S->nco;
                      return id;
                  }
               }
          }
          assert(0);
          return -1;
      }
      

      这样,一个协程对象就被创建好,此时该协程的状态是READY,但尚未正式执 行。

      coroutine_resume函数会切入到指定协程中执行。当前正在执行的协程的上下文会被保存起来,同时上下文替换成新的协程,该协程的状态将被置为RUNNING。

      进入coroutine_resume函数的前置状态有两个READYSUSPEND,这两个状态下coroutine_resume的处理方法也是有很大不同。我们先看下协程在READY状态下进行coroutine_resume的流程。

    coroutine_resume(READY -> RUNNING)
    void coroutine_resume(struct schedule * S, int id) {
        assert(S->running == -1);
        assert(id >=0 && id < S->cap);
    
        // 取出协程
        struct coroutine *C = S->co[id];
        if (C == NULL)
            return;
    
        int status = C->status;
        switch(status) {
            case COROUTINE_READY:
                // 初始化ucontext_t结构体,将当前的上下文放到C->ctx里面
                getcontext(&C->ctx);
                // 将当前协程的运行时栈的栈顶设置为S->stack,每个协程都这么设置,这就是所谓的共享栈。(注意,这里是栈顶)
                C->ctx.uc_stack.ss_sp = S->stack; 
                C->ctx.uc_stack.ss_size = STACK_SIZE;
                C->ctx.uc_link = &S->main; // 如果协程执行完,将切换到主协程中执行
                S->running = id;
                C->status = COROUTINE_RUNNING;
    
                // 设置执行C->ctx函数, 并将S作为参数传进去
                uintptr_t ptr = (uintptr_t)S;
                makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
    
                // 将当前的上下文放入S->main中,并将C->ctx的上下文替换到当前上下文
                swapcontext(&S->main, &C->ctx);
                break;
            case COROUTINE_SUSPEND:
                // 将协程所保存的栈的内容,拷贝到当前运行时栈中
                // 其中C->size在yield时有保存
                memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
                S->running = id;
                C->status = COROUTINE_RUNNING;
                swapcontext(&S->main, &C->ctx);
                break;
            default:
                assert(0);
        }
    }
    
    状态转换机
    image.png
    共享栈

    共享栈这个词在libco中提到的多,其实coroutine也是用的共享栈模型。 共享栈这个东西说起来很玄乎,实际原理不复杂,本质就是所有的协程在运行的时候都使用同一个栈空间。

    共享栈对标的是非共享栈,也就是每个协程的栈空间都是独立的,固定大小。好处是协程切换的时候,内存不用拷贝来拷贝去。坏处则是内存空间浪费.

    因为栈空间在运行时不能随时扩容,为了防止栈内存不够,所以要预先每个协程都要预先开一个足够的栈空间使用。当然很多协程用不了这么大的空间,就必然造成内存的浪费。

    共享栈则是提前开了一个足够大的栈空间(coroutine默认是1M)。所有的栈运行的时候,都使用这个栈空间。 conroutine是这么设置每个协程的运行时栈:

    C->ctx.uc_stack.ss_sp = S->stack;
    C->ctx.uc_stack.ss_size = STACK_SIZE;
    对协程调用yield的时候,该协程栈内容暂时保存起来,保存的时候需要用到多少内存就开多少,这样就减少了内存的浪费。(即_save_stack函数的内容)。 当resume该协程的时候,协程之前保存的栈内容,会被重新拷贝到运行时栈中。

    这就是所谓的共享栈的原理。

    相关文章

      网友评论

          本文标题:协程

          本文链接:https://www.haomeiwen.com/subject/zwtqictx.html