美文网首页
Pebble协程库实现

Pebble协程库实现

作者: fooboo | 来源:发表于2018-05-19 11:18 被阅读113次

    这部分准备分析下Pebble里的协程实现,它和上部分的Phxrpc协程有一部分相似点,即都使用了ucontext_t,也有协程管理器,调度器,定时器等设计思想,和Phxrpc不同的是定时器实现并非小根堆,是用了STL中的unordered_map组件;另外和Libco中的协程不同的是,后者没使用ucontext_t,直接使用汇编实现协程上下文切换的逻辑和数据结构,后期在分析Libco协程的时候会结合ucontext_t相关的实现重点分析下切换时的工作原理。

    Pebble中与协程相关的主要有以下几个类声明:
    1)协程实例

     68 struct coroutine {
     69     coroutine_func func;
     71     void *ud;
     72     ucontext_t ctx;
     73     struct schedule * sch;
     74     int status;
     76     char* stack;
     77     int32_t result; 
     78 
     79     coroutine() {
                //more code ...
     87         memset(&ctx, 0, sizeof(ucontext_t));
     88     }
     89 };
    

    每个成员变量的作用从声明可以得知;
    2)协程管理器

     92 struct schedule {
     93     ucontext_t main;
     94     int64_t nco; 
     95     int64_t running;
     96     cxx::unordered_map<int64_t, coroutine*> co_hash_map;
     97     std::list<coroutine*> co_free_list;
     98     int32_t co_free_num;
     99     uint32_t stack_size;
    100 };
    

    其中main为主协程,running表示正在运行的协程idco_hash_map为协程管理器,co_free_list为空闲的协程实例,stack_size大小文中为256k
    3)协程任务类相关

    173 class CoroutineTask {
    174     friend class CoroutineSchedule;
    175 public:
    176     CoroutineTask();
    177     virtual ~CoroutineTask();
    178 
    179     int64_t Start(bool is_immediately = true);
    180     virtual void Run() = 0;
    181    
    186     int32_t Yield(int32_t timeout_ms = -1);
    188     CoroutineSchedule* schedule_obj();
    189 private:
    190     int64_t id_;
    191     CoroutineSchedule* schedule_obj_;
    192 };  
    193 
    194 class CommonCoroutineTask : public CoroutineTask {
    195 public:
    196     CommonCoroutineTask() {}  
    198     virtual ~CommonCoroutineTask() {}
    199     
    200     void Init(const cxx::function<void(void)>& run) { m_run = run; }
    201     
    202     virtual void Run() {
    203         m_run();
    204     }
    206 private:
    207     cxx::function<void(void)> m_run;
    208 };
    

    4)协程调试管理器

    233 class CoroutineSchedule {
    234     friend class CoroutineTask;
    235 public:
    236     int Init(Timer* timer = NULL, uint32_t stack_size = 256 * 1024);
    237     
    238     int Close();
    239     CoroutineTask* CurrentTask() const;
    240     
    241     int64_t CurrentTaskId() const;
    242     
    243     int32_t Yield(int32_t timeout_ms = -1);
    244     int32_t Resume(int64_t id, int32_t result = 0);
    245     
    246     template<typename TASK>
    247     TASK* NewTask() { 
    248         if (CurrentTaskId() != INVALID_CO_ID) {
    249             return NULL;
    250         }   
    251         TASK* task = new TASK();
    252         if (AddTaskToSchedule(task)) {
    253             delete task;
    254             task = NULL;
    255         }   
    256         return task;
    257     }     
    259 private:
    260     int AddTaskToSchedule(CoroutineTask* task);
    262     int32_t OnTimeout(int64_t id);
    263 
    264     struct schedule* schedule_;
    265     Timer* timer_;
    266     cxx::unordered_map<int64_t, CoroutineTask*> task_map_;
    267     std::set<CoroutineTask*> pre_start_task_;
    268 };
    

    下面从使用的角度去分析每个接口的实现,从pebble_server.cpp中:

     741 int32_t PebbleServer::InitCoSchedule() {
     742     if (m_coroutine_schedule) {
     743         return 0;
     744     }
     745 
     746     m_coroutine_schedule = new CoroutineSchedule();
     747     int32_t ret = m_coroutine_schedule->Init(GetTimer(), m_options._co_stack_size_bytes);
     748     if (ret != 0) {
     749         delete m_coroutine_schedule;
     750         m_coroutine_schedule = NULL;
     752         return -1;
     753     }
     755     return 0;
     756 }
    

    创建一个协程调度器,在Init实现中:

     579 int CoroutineSchedule::Init(Timer* timer, uint32_t stack_size) {
     580     timer_ = timer;
     581     schedule_ = coroutine_open(stack_size);
     582     if (schedule_ == NULL)
     583         return -1;
     584     return 0;
     585 }
    

    创建一个协程管理器,coroutine_open主要做的事情如下:

     262 struct schedule *
     263 coroutine_open(uint32_t stack_size) {
     264     if (0 == stack_size) {
     265         stack_size = 256 * 1024;
     266     }
     267     pid_t pid = GetPid();
     268     stCoRoutineEnv_t *env = GetCoEnv(pid);
     269     if (env) {
     270         return env->co_schedule;
     271     }
     272 
     273     void* p = calloc(1, sizeof(stCoRoutineEnv_t));
     274     env = reinterpret_cast<stCoRoutineEnv_t*>(p);
     275     SetCoEnv(pid, env);
     278     struct schedule *S = new schedule;
     279     S->nco = 0;
     280     S->running = -1;
     281     S->co_free_num = 0;
     282     S->stack_size = stack_size;
     283 
     284     env->co_schedule = S;
     285 
     286     stCoEpoll_t *ev = AllocEpoll();
     287     SetEpoll(env, ev);
     290     return S;
     291 }
    

    coroutine_open是获取协程调度器,没有的话则为每个线程创建一个stCoRoutineEnv_t,根据自己的线程id索引到g_CoEnvArrayForThread,不存在的话会初始化:设置栈大小,创建协程管理器,分配stCoEpoll_t

     129 stCoEpoll_t *AllocEpoll() {
     130     stCoEpoll_t *ctx = reinterpret_cast<stCoEpoll_t*>(calloc(1, sizeof(stCoEpoll_t)));
     131 
     132     ctx->iEpollFd = epoll_create(stCoEpoll_t::_EPOLL_SIZE);
     133     ctx->pTimeout = AllocTimeout(60 * 1000);
     134 
     135     ctx->pstActiveList = reinterpret_cast<stTimeoutItemLink_t*>
     136         (calloc(1, sizeof(stTimeoutItemLink_t)));
     137     ctx->pstTimeoutList = reinterpret_cast<stTimeoutItemLink_t*>
     138         (calloc(1, sizeof(stTimeoutItemLink_t)));
     139 
     140     return ctx;
     141 }
    

    这个类的声明会在分析调度器时会列出来,这里先跳过。

    下面举例有一个client connect过来后,整个框架是如何工作的呢?先介绍下大概工作流程吧:
    Serve()--->Update()--->ProcessMessage()--->Message::Poll()--->RawMessageDriver::Poll()--->NetMessage::Poll()--->Accept()
    acceptnew_socket时,会设置非阻塞并初始化相关的数据,并m_epoll->AddFd(new_socket, EPOLLIN | EPOLLERR, net_addr)进行对new_socket监听可读和错误事件。

    如果不考虑网络层的具体实现细节,在上层收到完整的rpc请求数据后,会进行ProcessRequest,会创建一个task,绑定处理函数ProcessRequestInCoroutine,并Start

    181 int32_t RpcUtil::ProcessRequest(int64_t handle, const RpcHead& rpc_head,
    182     const uint8_t* buff, uint32_t buff_len) {
    183     if (!m_coroutine_schedule) {
    184         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
    185     }   
    186 
    187     if (m_coroutine_schedule->CurrentTaskId() != INVALID_CO_ID) {
    188         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
    189     }
    190      
    191     CommonCoroutineTask* task = m_coroutine_schedule->NewTask<CommonCoroutineTask>();
    192     cxx::function<void(void)> run = cxx::bind(&RpcUtil::ProcessRequestInCoroutine, this,
    193         handle, rpc_head, buff, buff_len);
    194     task->Init(run);
    195     task->Start();
    196     
    197     return kRPC_SUCCESS;
    198 }
    200 int32_t RpcUtil::ProcessRequestInCoroutine(int64_t handle, const RpcHead& rpc_head,
    201     const uint8_t* buff, uint32_t buff_len) {
    202     return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
    203 }
    

    协程相关的创建工作:

    287     template<typename TASK>
    288     TASK* NewTask() {
    289         if (CurrentTaskId() != INVALID_CO_ID) {
    290             return NULL; 
    291         } 
    292         TASK* task = new TASK();
    293         if (AddTaskToSchedule(task)) {
    294             delete task;
    295             task = NULL;
    296         }
    297         return task;
    298     }
    
     639 int CoroutineSchedule::AddTaskToSchedule(CoroutineTask* task) {
     640     task->schedule_obj_ = this;
     641     pre_start_task_.insert(task);
     642     return 0;
     643 }
    
     546 int64_t CoroutineTask::Start(bool is_immediately) {
     547     if (is_immediately && schedule_obj_->CurrentTaskId() != INVALID_CO_ID) {
     548         delete this;
     549         return -1;
     550     }
     551     id_ = coroutine_new(schedule_obj_->schedule_, DoTask, this);
     552     if (id_ < 0)
     553         id_ = -1;
     554     int64_t id = id_;
     555     schedule_obj_->task_map_[id_] = this;
     556     schedule_obj_->pre_start_task_.erase(this);
     557     if (is_immediately) {
     558         int32_t ret = coroutine_resume(schedule_obj_->schedule_, id_);
     559         if (ret != 0) {
     560             id = -1;
     561         }
     562     }
     563     return id;
     564 }
    

    以上实现是开始一个协程任务,结束后会delete taskcoroutine_new主要是分配一个协程对象,并设置入口函数DoTask,由于默认形参为true,所以会立即coroutine_resume

     342 int64_t coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
     343     if (NULL == S || NULL == func) {
     344         return -1;
     345     }
     346     struct coroutine *co = _co_new(S, func, ud);
     347     int64_t id = S->nco;
     348     S->co_hash_map[id] = co;
     349     S->nco++;
     352     return id;
     353 }
    
     231 struct coroutine *
     232 _co_new(struct schedule *S, coroutine_func func, void *ud) {
     233     if (NULL == S) {
     234         assert(0);
     235         return NULL;
     236     }
     237 
     238     struct coroutine * co = NULL;
     239 
     240     if (S->co_free_list.empty()) {
     241         co = new coroutine;
     242         co->stack = new char[S->stack_size];
     243     } else {
     244         co = S->co_free_list.front();
     245         S->co_free_list.pop_front();
     246 
     247         S->co_free_num--;
     248     }
     249     co->func = func;
     250     co->ud = ud;
     251     co->sch = S;
     252     co->status = COROUTINE_READY;
     253 
     254     return co;
     255 }
    
     518 void DoTask(struct schedule*, void *ud) {
     519     CoroutineTask* task = static_cast<CoroutineTask*>(ud);
     520     assert(task != NULL);
     521     task->Run();
     522     delete task;
     523 }
    

    coroutine_new里主要工作是从协程池中看一下有没有空闲的协程对象,有的话复用,否则创建,然后设置协程的处理函数和对象,以及statusCOROUTINE_READY状态,其中task->Run()最终运行的是ProcessRequestInCoroutine,创建完一个协程对象后进行coroutine_resume

     381 int32_t coroutine_resume(struct schedule * S, int64_t id, int32_t result) {
     382     if (NULL == S) {
     383         return kCO_INVALID_PARAM;
     384     }
     385     if (S->running != -1) {
     386         return kCO_CANNOT_RESUME_IN_COROUTINE;
     387     }
     388     cxx::unordered_map<int64_t, coroutine*>::iterator pos = S->co_hash_map.find(id);
     391     if (pos == S->co_hash_map.end()) {
     393         return kCO_COROUTINE_UNEXIST;
     394     }
     395 
     396     struct coroutine *C = pos->second;
     397     if (NULL == C) {
     399         return kCO_COROUTINE_UNEXIST;
     400     }
     402     C->result = result;
     403     int status = C->status;
     404     switch (status) {
     405         case COROUTINE_READY: {
     408             getcontext(&C->ctx);
     409             C->ctx.uc_stack.ss_sp = C->stack;
     410             C->ctx.uc_stack.ss_size = S->stack_size;
     411             C->ctx.uc_stack.ss_flags = 0;
     412             C->ctx.uc_link = &S->main;
     413             S->running = id;
     414             C->status = COROUTINE_RUNNING;
     415             uintptr_t ptr = (uintptr_t) S;
     416             makecontext(&C->ctx, (void (*)(void)) mainfunc, 2,
     417             (uint32_t)ptr,  
     418             (uint32_t)(ptr>>32));
     419 
     420             swapcontext(&S->main, &C->ctx);
     422             break;
     423         }
     424         case COROUTINE_SUSPEND: {
     428             S->running = id;
     429             C->status = COROUTINE_RUNNING;
     430             swapcontext(&S->main, &C->ctx);
     431 
     432             break;
     433         }
     435         default:
     437             return kCO_COROUTINE_STATUS_ERROR;
     438     }
     439 
     440     return 0;
     441 }
    
     355 static void mainfunc(uint32_t low32, uint32_t hi32) {
     356     uintptr_t ptr = (uintptr_t) low32 | ((uintptr_t) hi32 << 32);
     357     struct schedule *S = (struct schedule *) ptr;
     358     int64_t id = S->running;
     359     struct coroutine *C = S->co_hash_map[id];
     360     if (C->func != NULL) {
     361         C->func(S, C->ud);
     362     } else {
     363         C->std_func();
     364     }
     365     S->co_free_list.push_back(C);
     366     S->co_free_num++;
     367 
     368     if (S->co_free_num > MAX_FREE_CO_NUM) {
     369         coroutine* co = S->co_free_list.front();
     370         _co_delete(co);
     371 
     372         S->co_free_list.pop_front();
     373         S->co_free_num--;
     374     }
     375 
     376     S->co_hash_map.erase(id);
     377     S->running = -1;
     379 }
    

    以上根据stauts分别处理不同的逻辑,如果是COROUTINE_READY表示协程还未运行,这里的逻辑同Phxrpc一样:获取上下文,设置栈大小和地址,然后设置协程执行完后回到的主协程main,并更新status,并设置协程入口函数mainfunc和参数等工作,之后切换到该协程运行;
    mainfunc执行具体的回调函数DoTask,处理完后释放协程对象资源;
    如果是COROUTINE_SUSPEND表示切入上一次被切出的协程,此时更新status并由swapcontext切换上下文,继续运行;

    下面是切出cpu协程的实现:

     443 int32_t coroutine_yield(struct schedule * S) {
     444     if (NULL == S) {
     445         return kCO_INVALID_PARAM;
     446     }
     447 
     448     int64_t id = S->running;
     449     if (id < 0) {
     451         return kCO_NOT_IN_COROUTINE;
     452     }
     453 
     454     assert(id >= 0);
     455     struct coroutine * C = S->co_hash_map[id];
     456 
     457     if (C->status != COROUTINE_RUNNING) {
     459         return kCO_NOT_RUNNING;
     460     }
     462     C->status = COROUTINE_SUSPEND;
     463     S->running = -1;
    
     466     swapcontext(&C->ctx, &S->main);
     468     return C->result;
     469 }
    

    以上整个分析是一个协程切入和切出cpu的工作原理。

    协程之间的调度,超时事件的处理,可能有可读写事件的发生等,都要去处理,并执行可能的回调函数;

    还是以介绍协程基础的时候举的两个hook socket api为例:

    228 ssize_t read(int fd, void *buf, size_t nbyte)
    229 {   
    230     HOOK_SYS_FUNC(read);
    231     
    232     if (!co_is_enable_sys_hook()) {
    233         return g_sys_read_func(fd, buf, nbyte);
    234     }
    235     rpchook_t *lp = get_by_fd(fd);
    236     
    237     if (!lp || (O_NONBLOCK & lp->user_flag)) { 
    238         ssize_t ret = g_sys_read_func(fd, buf, nbyte);
    239         return ret;
    240     }
    241     int timeout = (lp->read_timeout.tv_sec * 1000)
    242                 + (lp->read_timeout.tv_usec / 1000);
    243     
    244     struct pollfd pf = { 0 };
    245     pf.fd = fd; 
    246     pf.events = (POLLIN | POLLERR | POLLHUP);
    247     
    248     int pollret = poll(&pf, 1, timeout);  
    250     ssize_t readret = g_sys_read_func(fd, reinterpret_cast<char*>(buf), nbyte);
    251     
    252     if (readret < 0) {
    255     }
    257     return readret;
    258 }
    
    259 ssize_t write(int fd, const void *buf, size_t nbyte)
    260 {   
    261     HOOK_SYS_FUNC(write);
    262 
    263     if (!co_is_enable_sys_hook()) {
    264         return g_sys_write_func(fd, buf, nbyte);
    265     }
    266     rpchook_t *lp = get_by_fd(fd);
    267 
    268     if (!lp || (O_NONBLOCK & lp->user_flag)) {
    269         ssize_t ret = g_sys_write_func(fd, buf, nbyte);
    270         return ret;
    271     }
    272 
    273     size_t wrotelen = 0;
    274     int timeout = (lp->write_timeout.tv_sec * 1000)
    275                 + (lp->write_timeout.tv_usec / 1000);
    276 
    277     ssize_t writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
    278 
    279     if (writeret > 0) {
    280         wrotelen += writeret;
    281     }
    282     while (wrotelen < nbyte) {
    284         struct pollfd pf = {0};
    285         pf.fd = fd;
    286         pf.events = (POLLOUT | POLLERR | POLLHUP);
    287         poll(&pf, 1, timeout);
    288 
    289         writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
    290 
    291         if (writeret <= 0) {
    292             break;
    293         }
    294         wrotelen += writeret;
    295     }
    296     return wrotelen;
    297 }
    
    427 int poll(struct pollfd fds[], nfds_t nfds, int timeout)
    428 {
    429     HOOK_SYS_FUNC(poll);
    430 
    431     if (!co_is_enable_sys_hook()) {
    432         return g_sys_poll_func(fds, nfds, timeout);
    433     }
    434 
    435     return co_poll(co_get_epoll_ct(), fds, nfds, timeout);
    436 }
    
    1027 int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout)
    1028 {
    1029     if (timeout > stTimeoutItem_t::eMaxTimeout) {
    1030         timeout = stTimeoutItem_t::eMaxTimeout;
    1031     }
    1032     int epfd = ctx->iEpollFd;
    1033    
    1034     // 1.struct change
    1035     stPoll_t arg;
    1036     memset(&arg, 0, sizeof(arg));
    1037 
    1038     arg.iEpollFd = epfd;
    1039     arg.fds = fds;
    1040     arg.nfds = nfds;
    1041 
    1042     stPollItem_t arr[2];
    1043     if (nfds < sizeof(arr) / sizeof(arr[0])) {
    1044         arg.pPollItems = arr;
    1045     } else {
    1046         arg.pPollItems = reinterpret_cast<stPollItem_t*>(malloc(nfds * sizeof(stPollItem_t)));
    1047     }
    1048     memset(arg.pPollItems, 0, nfds * sizeof(stPollItem_t));
    1049 
    1050     arg.pfnProcess = OnPollProcessEvent;
    1051     arg.co_id = get_curr_co_id();
    
    1053     // 2.add timeout
    1054     unsigned long long now = GetTickMS();
    1055     arg.ullExpireTime = now + timeout;
    1056     int ret = AddTimeout(ctx->pTimeout, &arg, now);
    1057     if (ret != 0) {
    1060         errno = EINVAL;
    1061         return -__LINE__;
    1062     }
    1063 
    1064     for (nfds_t i = 0; i < nfds; i++) {
    1065         arg.pPollItems[i].pSelf = fds + i;
    1066         arg.pPollItems[i].pPoll = &arg;
    1067 
    1068         arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
    1069         struct epoll_event &ev = arg.pPollItems[i].stEvent;
    1070 
    1071         if (fds[i].fd > -1) {
    1072             ev.data.ptr = arg.pPollItems + i;
    1073             ev.events = PollEvent2Epoll(fds[i].events);
    1074 
    1075             epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i].fd, &ev);
    1076         }
    1077     }
    
    1079     coroutine_yield(co_get_curr_thread_env()->co_schedule);
    1080 
    1081     RemoveFromLink<stTimeoutItem_t, stTimeoutItemLink_t>(&arg);
    1082     for (nfds_t i = 0; i < nfds; i++) {
    1083         int fd = fds[i].fd;
    1084         if (fd > -1) {
    1085             epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &arg.pPollItems[i].stEvent);
    1086         }
    1087     }
    1088 
    1089     if (arg.pPollItems != arr) {
    1090         free(arg.pPollItems);
    1091         arg.pPollItems = NULL;
    1092     }
    1093     return arg.iRaiseCnt;
    1094 }
    

    对于accept fd,设置了非阻塞,当读或写的时候,需要监听相应的事件,为此,每个线程都会有一个struct stCoRoutineEnv_t对象:

      66 struct stCoEpoll_t {
      67     int iEpollFd;
      68     static const int _EPOLL_SIZE = 1024 * 10;
      69 
      70     struct stTimeout_t *pTimeout;
      72     struct stTimeoutItemLink_t *pstTimeoutList;
      74     struct stTimeoutItemLink_t *pstActiveList;
      76     co_epoll_res *result;
      77 };
    

    coroutine_open的时候初始化:

      37 struct stCoRoutineEnv_t {
      38     schedule* co_schedule;
      39     stCoEpoll_t *pEpoll;
      40 };
    
     942 void OnPollProcessEvent(stTimeoutItem_t * ap)
     943 {
     944     coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
     945 }
    
    1050     arg.pfnProcess = OnPollProcessEvent;
    1051     arg.co_id = get_curr_co_id();
    

    以上两行当事件发生时的回调函数,里面会resume对应id的协程。

    1055     arg.ullExpireTime = now + timeout;
    1056     int ret = AddTimeout(ctx->pTimeout, &arg, now);
    

    以上是添加超时处理,代码行1064〜1077是依次监听fd,OnPollPreparePfn当事件发生时会设置event并从超时链表中移走,并添加到活跃链表中,重点关注下这种使用方法:ev.data.ptr = arg.pPollItems + i,当epoll有事件或超时返回,需要从ev.data.ptr获得相关的信息:stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr)
    然后进行coroutine_yield(co_get_curr_thread_env()->co_schedule)切出该协程;
    如果当事件发生时,再切回来,代码行1079〜1088从超时链表中称走,并从epoll中删除相关的fd,释放资源等;

    最后是一个eventloop类似的wait功能:

     965 void co_update()
     966 {
     967     stCoEpoll_t* ctx = co_get_epoll_ct();
                 //more code....
     974     co_epoll_res *result = ctx->result;
     975     int ret = epoll_wait(ctx->iEpollFd, result->events, stCoEpoll_t::_EPOLL_SIZE, 1);
     976 
     977     stTimeoutItemLink_t *active = (ctx->pstActiveList);
     978     stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
     979 
     980     memset(active, 0, sizeof(stTimeoutItemLink_t));
     981     memset(timeout, 0, sizeof(stTimeoutItemLink_t));
     982 
     983     for (int i = 0; i < ret; i++)
     984     {   
     985         stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr);
     986         if (item->pfnPrepare) {
     987             item->pfnPrepare(item, result->events[i], active);
     988         } else {
     989             AddTail(active, item);
     990         }
     991     }
     993     unsigned long long now = GetTickMS();
     994     TakeAllTimeout(ctx->pTimeout, now, timeout);
     995 
     996     stTimeoutItem_t *lp = timeout->head;
     997     while (lp) {
     998         lp->bTimeout = true;
     999         lp = lp->pNext;
    1000     }
    1001 
    1002     Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);
    1003 
    1004     lp = active->head;
    1005     while (lp) {
    1006 
    1007         PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
    1008         if (lp->pfnProcess) {
    1009             lp->pfnProcess(lp);
    1010         }
    1011 
    1012         lp = active->head;
    1013     }
    1014 }
    

    上面的实现就是使用epoll阻塞一毫秒,有事件发生时依次执行pfnProcess,即OnPollProcessEvent,resume这个协程:

     942 void OnPollProcessEvent(stTimeoutItem_t * ap)
     943 {   
     944     coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
     945 }
    

    没有就是超时;然后处理超时情况,和上面的逻辑一样;其他有些更细节的没作过多分析,整体的实现原理和Phxrpc差不多,包括和后面分析的libco协程。

    类似的协程实现,网上有很多方案,再比如libgo,这个有时间会去研究下,协程的栈不能设置过大,看过的几个实现都是128kb;一般会包括调度器,定时器,协程池,eventloop,重点关注下协程的切换和状态,以及入口函数;其他更底层的关于性能方面的,比如少用malloc/free和new/delete等,绑定cpu等,减少线程在cpu间切换导致cache miss等,这些可以参考下brpc设计,如果服务器架构支持NUMA的,可以参考下DPDK/SPDK 的典型应用,后期也会找些时间去研究它俩的一些关键技术实现。

    这几个协程都是stackfull的,且栈大小固定,难免会有点浪费,在结束libco分析后,会列几个优化它的地方。

    https://www.zhihu.com/question/65647171

    相关文章

      网友评论

          本文标题:Pebble协程库实现

          本文链接:https://www.haomeiwen.com/subject/hrkurftx.html