美文网首页程序员协程
基于ucontext.h的轻量级协程库

基于ucontext.h的轻量级协程库

作者: neilzwshen | 来源:发表于2018-08-01 13:19 被阅读66次

    本文主要是对自己学习协程并实现轻量级协程过程的一个记录, 语言略显啰嗦, 各位见谅. 水平有限, 如有疏漏, 欢迎各位指正.

    一 了解协程

    • 协程可以理解为一种用户态的轻量级线程, 切换由用户定义
    • 协程上下文切换很快, 因为不会陷入内核态
    • 协程拥有自己的寄存器上下文和栈, 协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈

    优点

    • 协程具有极高的执行效率 因为子程序切换不是线程切换,是由程序自身控制,因此协程没有线程切换的开销, 多线程的线程数量越多,协程的性能优势就越明显
    • 访问共享资源不需要多线程的锁机制, 因为只有一个线程, 也不存在同时写变量冲突, 所以在协程中控制共享资源无需加锁, 只需要判断状态就好了,执行效率比多线程高很多, 而且代码编写难度也可以相应降低
    • 以同步代码的方式写异步逻辑

    缺点

    • 无法利用多核资源, 除非和多进程配合


    二 了解ucontext

    • ucontext组件是什么
      • 头文件<ucontext.h>定义了两个数据结构, mcontext_t(暂时用不到)和ucontext_t和四个函数, 可以被用来实现一个进程中的用户级线程(协程)切换

    数据结构

    • ucontext_t 结构体至少拥有如下几个域
      typedef struct ucontext {
          struct ucontext *uc_link;
          sigset_t uc_sigmask;
          stack_t uc_stack;
          mcontext_t uc_mcontext;
          ...
      } ucontext_t;
      
      • uc_link指向后继上下文, 当前上下文运行终止时系统会恢复指向的上下文
      • uc_sigmask为该上下文中的阻塞信号集合
      • uc_stack为该上下文中使用的栈
      • uc_mcontex保存上下文的特定机器, 包括调用线程的特定寄存器等等
      • 简而言之这个数据结构是用来保存上下文的

    函数

    1. int getcontext(ucontext_t * ucp);

      • 获取当前上下文, 初始化ucp结构体, 将当前上下文保存到ucp中
      • 成功返回0; 失败返回-1, 并设置errno
    2. void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);

      • 创建一个目标上下文 创建方式: (1) getcontext, (2) 指定分配给上下文的栈uc_stack.ss_sp, (3) 指定这块栈的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后继上下文uc_link
      • 协程运行时使用主协程划分的栈空间,而协程切回主线程时需要将该部分栈空间的内容copy到每个协程各自的一个空间缓存起来,因为主协程中划分的栈空间并不是只用于一个协程,而是会用于多个协程
      • makecontext可以修改通过getcontext初始化得到的上下文, (必须先调用getcontext), 然后为ucp指定一个栈空间ucp->stack, 设置后继的上下文ucp->uc_link
      • 当上下文通过setcontext或者swapcontext激活后, 执行func函数(argc为后续的参数个数, 可变参数). 当func执行返回后, 继承的上下文被激活(ucp->uc_link), 如果为NULL, 则线程退出
      ucontext_t tar_ctx;
      ucontext_t next_ctx;
      char stack[100];
      getcontext(&tar_ctx);
      tar_ctx.uc_stack.ss_sp = stack;
      tar_ctx.uc_stack.ss_sp = sizeof(stack);
      tar_ctx.uc_stack.ss_flags = 0;
      tar_ctx.uc_link = &next_ctx;
      
    3. int setcontext(const ucontext_t *ucp)

      • 设置当前的上下文为ucp(激活ucp)
      • ucp来自getcontext, 那么上下文恢复至ucp
      • ucp来自makecontext, 那么将会调用makecontext函数的第二个参数指向的函数func, 如果func返回, 则恢复至ucp->uc_link指定的后继上下文, 如果该ucp中的uc_link为NULL, 那么线程退出
      • 成功不返回, 失败返回-1, 设置errno
    4. int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

      • 切换上下文
      • 保存当前上下文至oucp, 激活ucp上下文(先执行makecontext指定的ucp入口函数, 而后会执行ucp->uc_link指向的后继上下文)
      • 成功不返回, 失败返回-1, 设置errno


    三 轻量级协程实现

    名词说明

    • 协程调度器 代码中的SingleSchedule
    • 用户协程 代码中的Coroutine
    • 栈空间/栈区 对应SingleSchedule中的成员stack
    • 栈缓存/缓存区 对应的是Coroutine中的成员Buffer
    • 主协程上下文 SingleSchedule中的main_ctx, 对应main函数中的上下文
    • 用户协程上下文 Coroutine中的ctx, 对应每个用户协程自身的上下文

    思路

    • 本文基于ucontext.h实现协程库
    • 基本思路:
      • 构造一个协程调度类, 该类用于调度所有的用户协程, 提供一个协程池ctxPool, 使用单例模式实现.
      • 构造一个用户协程类, 该类对象对应每个用户协程, 提供一个用户协程逻辑虚函数CoProcess, 提供一个用户协程上下文ctx
      • 用户协程首次激活, 将会为其分配协程调度器提供的栈区stack
      • 用户协程被挂起, 那么会将该协程的栈信息栈区stack保存到其自身的缓存区buffer;
      • 用户协程被唤醒, 那么会将该用户协程的栈信息缓存区buffer中Reload至栈区
    • 协程库框架
      • 激活 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池
      • 挂起 将栈空间stack对应的数据缓存至当前用户协程的栈缓存buffer中, 更改协程状态SUSPEND
      • 恢复 将用户协程栈缓存buffer中的数据reload进栈空间stack
        示意图

    1 用户协程类 Coroutine

    数据成员

    • 协程状态CoState state(FREE, RUNNING, SUSPEND)
    • 协程号int id(对应协程调度类中的协程池的id)
    • 栈缓存char * Buffer, 一个动态数组, 当协程被切出时, 缓存栈空间
    • 所需缓存区尺寸int stack_size
    • 用户协程栈容量尺寸int cap cap如果小于stack_size, 那么需要重新分配缓存区, 否则可以直接缓存
    • 用户协程上下文ucontext_t ctx

    主要成员函数

    • 挂起协程函数void Yield();
      • 挂起当前协程, 并SaveStack栈空间, 切换状态至SUSPEND
    • 恢复协程函数void Resume()
      • 恢复该协程, 并ReloadStack栈空间
    • 缓存堆栈函数void SaveStack();
      • 调用时机是协程被切出, 会将协程调度对象中的堆栈缓存入用户协程自身的缓存区
    • 载入堆栈函数void ReloadStack();
      • 调用时机是协程被恢复时, 会将该用户协程的堆栈信息从缓存区回复到协程调度对象的堆栈中
    • 用户协程逻辑虚函数virtual void CoProcess();
      • 用于派生成员中定义业务逻辑

    2 协程调度类

    • 单例

    数据成员

    • 协程池std::map<int, Coroutine*> crtPool;
    • 主协程上下文ucontext_t main_ctx
    • 协程堆栈char stack[DEFAULT_STACK_SIZE], 所有的协程都利用这块区域

    成员函数

    • 协程启动函数 void CoroutineNew(Coroutine * crt);
      • 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池
    • 用户协程入口函数static void CoroutineEntry(void * crt);
      • 指向用户协程的入口函数
    • 协程恢复函数void Resume(int id);
      • 根据id恢复对应协程
    • 检查并清理协程池int HasCoroutine();
      • 清理FREE的协程, 并返回剩余的协程数量
    • 协程删除函数void Remove(int id);
      • 删除对应协程

    注意点

    1. 所有的用户协程都使用调度器的栈空间, 每个用户协程自身的buffer只不过用来作缓存
    2. SaveStack和ReloadStack函数的实现需要注意, 如何缓存协程栈
      • 协程栈是由用户分配的, 如代码中stack数组, 由于该数组的目的是用作栈空间, 而进程中栈是预分配的, 即首先确定栈的高地址, 从高地址开始往低使用, 根据这一点我们可以确定需要被缓存的栈空间大小.
      char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    // 获取到栈底, 即高地址
      char dumy = 0;                                                       // 最后创建的变量, 必然分配在栈顶
      assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);                     // 被栈缓存不能大于栈空间
      if (cap<stackBottom-&dumy){                                          // cap 代表当前栈缓存大小, 如果不够需要重分配
          if(buffer){                                                      // 释放当前栈缓存
              delete [] buffer;
          }
          cap = stackBottom-&dumy;
          buffer = new char[cap];
      }
      stack_size = stackBottom-&dumy;                                      // 栈空间大小
      memcpy(buffer, &dumy, stack_size);                                   // 缓存
      

    代码实现

    https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine

    1 用户协程

    /**
     * @file    : Coroutine.h
     * @author  : neilzwshen
     * @time    : 2018-7-31
     * @version : 3.0
     * @remark  : 用户协程类
     */
    
    #ifndef COROUTINE_H_
    #define COROUTINE_H_
    #define DEFAULT_STACK_SIZE (1024*1024)
    #include <stdio.h>
    #include <string.h>
    #include <ucontext.h>
    
    
    enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};
    
    class Coroutine
    {
    public:
        Coroutine();
        virtual ~Coroutine();
    
        /**
         * 用户协程入口函数
         */
        virtual void CoProcess();
        
        /**
         * 用户协程恢复函数
         */
        void Resume();
    
        /**
         * 获取协程id
         * @return [返回id]
         */
        int GetId()const {
            return id;
        }
    
        /**
         * 设置协程id
         */
        void SetId(int _id) {
            id = _id;
        }
    
        /**
         * 获取协程状态
         * @return [返回协程状态]
         */
        int GetState()const {
            return state;
        }
    
        /**
         * 设置协程状态
         */
        void SetState(CoState _state) {
            state = _state;
        }
    
    protected:
        /**
         * 用户协程挂起函数
         */
        void Yield();
    
        /**
         * 堆栈缓存
         */
        void SaveStack();
    
        /**
         * 堆栈恢复
         */
        void ReloadStack();
        
    public:
        char *buffer;       // 缓存协程堆栈
        ucontext_t ctx;
    
    private:
        int stack_size;
        int cap;
        int id;
        CoState state;
    };
    
    #endif
    
    #include <assert.h>
    #include "Coroutine.h"
    #include "Schedule.h"
    
    Coroutine::Coroutine()
            :id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
    {
    
    }
    
    Coroutine::~Coroutine()
    {
        delete [] buffer;
    }
    
    void Coroutine::CoProcess()
    {
    
    }
    
    void Coroutine::Resume()
    {
        if(state==SUSPEND){
            ReloadStack();
            state = RUNNING;
            swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
        }
    }
    
    void Coroutine::Yield()
    {
        if (state == RUNNING){
            SaveStack();
            state = SUSPEND;
            swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
        }
    }
    
    void Coroutine::SaveStack()
    {
        char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();
        char dumy = 0;
    
        assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);
        if (cap<stackBottom-&dumy){
            if(buffer){
                delete [] buffer;
            }
            cap = stackBottom-&dumy;
            buffer = new char[cap];
        }
        stack_size = stackBottom-&dumy;
        memcpy(buffer, &dumy, stack_size);
    }
    
    void Coroutine::ReloadStack()
    {
        memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
    }
    

    2 单例模板

    /**
     * @file    : Singleton.h
     * @author  : neilzwshen
     * @time    : 2018-7-30
     * @version : 1.0
     * @remark  : 单例模板, 只要将对象作为T, 就可以获取到一个单例对象, 构造函数不能传参
     */
    
    #ifndef SINGLETON_H_
    #define SINGLETON_H_
    
    template<class T>
    class Singleton {
    public:
        /**
         * 单例获取
         * @return [返回T的单例对象]
         */
        static T* GetInst(){
            if (!flag_instance){
                flag_instance = new Singleton();
            }
            return &flag_instance->_instance;
        }
    
    protected:
        /**
         * 单例构造
         */
        Singleton(){}
    
    private:
        /**
         * T对象实例
         */
        T _instance;
        /**
         * 单例模板实例,
         */
        static Singleton<T> * flag_instance;
    };
    
    template<class T>
    Singleton<T> * Singleton<T>::flag_instance = 0;
    
    #endif
    

    3 协程调度器

    /**
     * @file    : Schedule.h
     * @author  : neilzwshen
     * @time    : 2018-7-31
     * @version : 3.0
     * @remark  : 协程调度类
     */
    
    #ifndef SCHEDULE_H_
    #define SCHEDULE_H_
    #include <stdio.h>
    #include <map>
    #include <ucontext.h>
    #include "Coroutine.h"
    #include "Singleton.h"
    
    
    typedef std::map<int, Coroutine*> CrtMap;
    
    class Schedule
    {
    public:
        Schedule();
        virtual ~Schedule();
    
        /**
         * 用户协程入口函数
         */
        static void CoroutineEntry(void * crt);
    
        /**
         * 将协程crt加入协程池, 并开启
         * @param  crt [协程指针]
         */
        void CoroutineNew(Coroutine * crt);
    
        /**
         * 恢复用户协程
         * @param id [description]
         */
        void Resume(int id);
    
        /**
         * 判断协程池中是否还有未完成的协程, 并将已经终止的协程删除
         * @return [返回协程数]
         */
        int HasCoroutine();
    
        /**
         * 根据协程id删除协程
         * @param id [协程id]
         */
        void Remove(int id);
    
        /**
         * 获取到栈底
         * @return [返回栈底地址]
         */
        char* GetStackBottom(){
            return stack + DEFAULT_STACK_SIZE;
        }
    
    public:
        ucontext_t mainCtx;
        char stack[DEFAULT_STACK_SIZE];     // 运行协程堆栈
    
    private:
        CrtMap crtPool;
    };
    
    typedef Singleton<Schedule> SingleSchedule;
    
    #endif
    
    
    #include <assert.h>
    #include "Schedule.h"
    
    Schedule::Schedule()
    {
    
    }
    
    Schedule::~Schedule()
    {
    
    }
    
    void Schedule::CoroutineEntry(void * crt) {
        ((Coroutine *)crt)->SetState(RUNNING);
        ((Coroutine *)crt)->CoProcess();
        ((Coroutine *)crt)->SetState(FREE);
    }
    
    void Schedule::CoroutineNew(Coroutine * crt) {
        
        int id = crt->GetId();
        CoState state = CoState(crt->GetState());
        assert(id != 0);
        assert(state == FREE);
        //printf("--%d,%d--\n",id, state);
    
        if (crtPool[id] != nullptr) {
            CrtMap::iterator it = crtPool.find(id);
            crtPool.erase(it);
        }
        
        // 构建用户协程上下文
        getcontext(&(crt->ctx));
        //memset(stack, 0, DEFAULT_STACK_SIZE);
        crt->ctx.uc_stack.ss_sp = stack;
        crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
        crt->ctx.uc_stack.ss_flags = 0;
        crt->ctx.uc_link = &mainCtx;
        crtPool[id] = crt;
        
        makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
        swapcontext(&mainCtx, &crt->ctx);
    }
    
    void Schedule::Resume(int id){
        if (crtPool[id] != nullptr) {
            crtPool[id]->Resume();
        }
    }
    
    int Schedule::HasCoroutine() {
        int count = 0;
        CrtMap::iterator it;
        for (it = crtPool.begin(); it != crtPool.end(); it++) {
            if (it->second->GetState() != FREE) {
                count++;
            }else{
                it=crtPool.erase(it);
                it--;
            }
        }
        return count;
    }
    
    void Schedule::Remove(int id) {
        if (crtPool[id] != nullptr) {
            crtPool.erase(crtPool.find(id));
        }
    }
    

    4 示例

    #include <stdio.h>
    #include <memory>
    #include "Coroutine.h"
    #include "Schedule.h"
    
    
    class Logic1 : public Coroutine{
        void CoProcess(){
            puts("1");
            Yield();
            puts("4");
            Yield();
            puts("7");
        }
    };
    
    class Logic2 : public Coroutine{
        void CoProcess(){
            puts("2");
            Yield();
            puts("5");
            Yield();
            puts("8");
        }
    };
    
    class Logic3 : public Coroutine{
        void CoProcess(){
            puts("3");
            Yield();
            puts("6");
            Yield();
            puts("9");
        }
    };
    
    int main() {
    
        std::shared_ptr<Coroutine> ct1(new Logic1());
        std::shared_ptr<Coroutine> ct2(new Logic2());
        std::shared_ptr<Coroutine> ct3(new Logic3());
    
        ct1->SetId(1);
        ct2->SetId(2);
        ct3->SetId(3);
    
        SingleSchedule::GetInst()->CoroutineNew(ct1.get());
        SingleSchedule::GetInst()->CoroutineNew(ct2.get());
        SingleSchedule::GetInst()->CoroutineNew(ct3.get());
    
        SingleSchedule::GetInst()->Resume(1);
        SingleSchedule::GetInst()->Resume(2);
        SingleSchedule::GetInst()->Resume(3);
        SingleSchedule::GetInst()->Resume(1);
        SingleSchedule::GetInst()->Resume(2);
        SingleSchedule::GetInst()->Resume(3);
    
    
        //SingleSchedule::GetInst()->Remove(1);
        //SingleSchedule::GetInst()->Remove(2);
        //SingleSchedule::GetInst()->Remove(3);
    
        int count = SingleSchedule::GetInst()->HasCoroutine();
        printf("%d\n", count);
    
        return 0;
    }
    

    5 执行结果

    szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    0
    

    相关文章

      网友评论

        本文标题:基于ucontext.h的轻量级协程库

        本文链接:https://www.haomeiwen.com/subject/fclkvftx.html