美文网首页
Phxrpc协程库实现

Phxrpc协程库实现

作者: fooboo | 来源:发表于2018-05-06 12:37 被阅读612次

    Phxrpc中的coroutine实现分析:
    由于Phxrpc代码量不是很多,大概花个一两天可以分析明白,里面把epoll+timer+协程用的蛮溜。它的框架还是单进程多线程模型,主线程绑定CPU,跑LoopAccept逻辑,根据hsha_server_->hsha_server_qos_.CanAccept()创建新连接;然后通过轮询hsha_server_->server_unit_list_[idx_++]->AddAcceptedFd(accepted_fd)分配给某个HshaServerUnit对象,然后继续accept socket,贴一张连接中的图:

    框架图

    HshaServerUnit是由一个线程池和调度线程组成,调度线程也做了IO逻辑,一个线程里可能有协程实现。
    当构造HshaServerUnit时,创建一个线程池,每个工作线程可能是协程模式,由uthread_count_分区:

    408 void Worker::Func() {
    409     if (uthread_count_ == 0) {
    410         ThreadMode();
    411     } else {
    412         UThreadMode();
    413     }
    414 }
    
    433 void Worker::UThreadMode() {
    434     worker_scheduler_ = new UThreadEpollScheduler(utherad_stack_size_, uthread_count_, true);
    435     assert(worker_scheduler_ != nullptr);
    436     worker_scheduler_->SetHandlerNewRequestFunc(bind(&Worker::HandlerNewRequestFunc, this));
    437     worker_scheduler_->RunForever();
    438 }
    

    调度线程的主要逻辑在Run中,部分源码如下:

    699 void HshaServerIO::RunForever() {
    700     scheduler_->SetHandlerAcceptedFdFunc(bind(&HshaServerIO::HandlerAcceptedFd, this));
    701     scheduler_->SetActiveSocketFunc(bind(&HshaServerIO::ActiveSocketFunc, this));
    702     scheduler_->RunForever();
    703 }
    
    239 bool UThreadEpollScheduler::Run() {
    240     ConsumeTodoList();
    241 
    242     struct epoll_event * events = (struct epoll_event*) calloc(max_task_, sizeof(struct epoll_event));
    243 
    244     int next_timeout = timer_.GetNextTimeout();
    245 
    246     for (; (run_forever_) || (!runtime_.IsAllDone());) {
    247         int nfds = epoll_wait(epoll_fd_, events, max_task_, 4);
    248         if (nfds != -1) {
    249             for (int i = 0; i < nfds; i++) {
    250                 UThreadSocket_t * socket = (UThreadSocket_t*) events[i].data.ptr;
    251                 socket->waited_events = events[i].events;
    252 
    253                 runtime_.Resume(socket->uthread_id);
    254             }
    255 
    256             //for server mode
    257             if (active_socket_func_ != nullptr) {
    258                 UThreadSocket_t * socket = nullptr;
    259                 while ((socket = active_socket_func_()) != nullptr) {
    260                     runtime_.Resume(socket->uthread_id);
    261                 }
    262             }
    263 
    264             //for server uthread worker
    265             if (handler_new_request_func_ != nullptr) {
    266                 handler_new_request_func_();
    267             }
    268 
    269             if (handler_accepted_fd_func_ != nullptr) {
    270                 handler_accepted_fd_func_();
    271             }
    272 
    273             if (closed_) {
    274                 ResumeAll(UThreadEpollREvent_Close);
    275                 break;
    276             }
    277 
    278             ConsumeTodoList();
    279             DealwithTimeout(next_timeout);
    280         } else if (errno != EINTR) {
    281             ResumeAll(UThreadEpollREvent_Error);
    282             break;
    283         }
    284 
    285         StatEpollwaitEvents(nfds);
    286     }
    287 
    288     free(events);
    289 
    290     return true;
    291 }
    

    以上的主要逻辑分别是处理一些事件,和超时事件,强制wait四毫秒,如果有事件发生则resume协程处理,如果有response要发送则从data_flow取出放入某个协程并resume,如果是协程模式则有新的request从data_flow取出交由协程处理,然后是resume协程处理新的连接并关联IOFunc处理函数:

    561 void HshaServerIO::HandlerAcceptedFd() {
    562     lock_guard<mutex> lock(queue_mutex_);
    563     while (!accepted_fd_list_.empty()) {
    564         int accepted_fd = accepted_fd_list_.front();
    565         accepted_fd_list_.pop();
    566         scheduler_->AddTask(bind(&HshaServerIO::IOFunc, this, accepted_fd), nullptr);
    567     }
    568 }
    

    大概逻辑如下,为accepted_fd创建UThreadSocket_t对象,读请求(封装了socket buffer相关的东西),并塞进data_flow中,并notify协程,然后因wait切出协程,等协程处理完毕后,切回来把response塞进data_flow中,其中有一些逻辑判断就省略了,部分实现如下:

    570 void HshaServerIO::IOFunc(int accepted_fd) {
    571     UThreadSocket_t *socket{scheduler_->CreateSocket(accepted_fd)};
    572     UThreadTcpStream stream;
    573     stream.Attach(socket);
    574     UThreadSetSocketTimeout(*socket, config_->GetSocketTimeoutMS());
    575 
    576     while (true) {
    582         unique_ptr<BaseProtocolFactory> factory(
    583                 BaseProtocolFactory::CreateFactory(stream));
    584         unique_ptr<BaseProtocol> protocol(factory->GenProtocol());
    586         BaseRequest *req{nullptr};
    587         ReturnCode ret{protocol->ServerRecv(stream, req)};
    588         if (ReturnCode::OK != ret) {   
    589             delete req;     
    595             break;
    596         }
    600         if (!data_flow_->CanPushRequest(config_->GetMaxQueueLength())) {
    601             delete req;
    604             break;
    605         }
    606 
    607         if (!hsha_server_qos_->CanEnqueue()) {
    609             delete req;
    614             break;
    615         }
    622         data_flow_->PushRequest((void *)socket, req);
    625         worker_pool_->Notify();
    626         UThreadSetArgs(*socket, nullptr);
    628         UThreadWait(*socket, config_->GetSocketTimeoutMS());
    629         if (UThreadGetArgs(*socket) == nullptr) {
    636             socket = stream.DetachSocket();
    637             UThreadLazyDestory(*socket);
    641             break;
    642         }
    645         {
    646             BaseResponse *resp((BaseResponse *)UThreadGetArgs(*socket));
    647             if (!resp->fake()) {
    648                 ret = resp->ModifyResp(is_keep_alive, version);
    649                 ret = resp->Send(stream);
    651             }
    652             delete resp;
    653         }
    658         if (ReturnCode::OK != ret) {
    660         }
    662         if (!is_keep_alive || (ReturnCode::OK != ret)) {
    663             break;
    664         }
    665     }
    668 }
    

    然后是处理超时事件,timer用的是小根堆实现的,GetNextTimeout取下一个超时时间,如果没有则break,否则PopTimeout节点处理:

    309 void UThreadEpollScheduler::DealwithTimeout(int & next_timeout) {
    310     while (true) {
    311         next_timeout = timer_.GetNextTimeout();
    312         if (next_timeout != 0) {
    313             break;
    314         }
    315 
    316         UThreadSocket_t * socket = timer_.PopTimeout();
    317         socket->waited_events = UThreadEpollREvent_Timeout;
    318         runtime_.Resume(socket->uthread_id);
    319     }
    320 }
    

    以上是整个Run的处理逻辑,运行在调度线程中,有些细节没说明。

     78 void EpollNotifier :: Run() {
     79     assert(pipe(pipe_fds_) == 0);
     80     fcntl(pipe_fds_[1], F_SETFL, O_NONBLOCK);
     81     scheduler_->AddTask(std::bind(&EpollNotifier::Func, this), nullptr);
     82 }
     83 
     84 void EpollNotifier :: Func() {
     85     UThreadSocket_t * socket = scheduler_->CreateSocket(pipe_fds_[0], -1, -1, false);
     86     char tmp[2] = {0};
     87     while (true) {
     88         if (UThreadRead(*socket, tmp, 1, 0) < 0) {
     89             break;
     90         }
     91     }
     92     free(socket);
     93 }
     94 
     95 void EpollNotifier :: Notify() {
     96     ssize_t write_len = write(pipe_fds_[1], (void *)"a", 1);
     97     if (write_len < 0) {
     99     }
    100 }
    
    526 void WorkerPool::Notify() {
    527     lock_guard<mutex> lock(mutex_);
    528     if (last_notify_idx_ == worker_list_.size()) {
    529         last_notify_idx_ = 0;
    530     }   
    531             
    532     worker_list_[last_notify_idx_++]->Notify();
    533 } 
    
    485 void Worker::Notify() {
    486     if (uthread_count_ == 0) {
    487         return;
    488     }
    489 
    490     worker_scheduler_->NotifyEpoll();
    491 }
    

    上面几行是协程的等待和唤醒,每个工作线程如果是协程模式,也会有个调度功能,跟线程模式没多大关系。当有request/response的时候,会通过pipe写一个字符,然后发生可读事件然后进行后续处理,整个过程比较简单。

    以下为accept/read/send/这三部分实现,基本对fd设置了非阻塞,然后注册监听事件和超时,并切换协程,当发生事件时或超时时,resume的协程会进行删除,如果是发生了感兴趣的事件,则进行accept/read/send等处理;

    324 int UThreadPoll(UThreadSocket_t & socket, int events, int * revents, int timeout_ms) {
    325     int ret = -1;
    327     socket.uthread_id = socket.scheduler->GetCurrUThread();
    329     socket.event.events = events;
    330 
    331     socket.scheduler->AddTimer(&socket, timeout_ms);
    332     epoll_ctl(socket.epoll_fd, EPOLL_CTL_ADD, socket.socket, &socket.event);
    333 
    334     socket.scheduler->YieldTask();
    336     epoll_ctl(socket.epoll_fd, EPOLL_CTL_DEL, socket.socket, &socket.event);
    337     socket.scheduler->RemoveTimer(socket.timer_id);
    338 
    339     *revents = socket.waited_events;
    340 
    341     if ((*revents) > 0) {
    342         if ((*revents) & events) {
    343             ret = 1;
    344         } else {
    345             errno = EINVAL;
    346             ret = 0;
    347         }
    348     } else if ((*revents) == UThreadEpollREvent_Timeout) {
    349         //timeout
    350         errno = ETIMEDOUT;
    351         ret = 0;
    352     } else if ((*revents) == UThreadEpollREvent_Error){
    353         //error
    354         errno = ECONNREFUSED;
    355         ret = -1;
    356     } else {
    357         //active close
    358         errno = 0;
    359         ret = -1;
    360     }
    361 
    362     return ret;
    363 }
    
    424 int UThreadAccept(UThreadSocket_t & socket, struct sockaddr *addr, socklen_t *addrlen) {
    425     int ret = accept(socket.socket, addr, addrlen);
    426     if (ret < 0) {
    427         if (EAGAIN != errno && EWOULDBLOCK != errno) {
    428             return -1;
    429         }
    430 
    431         int revents = 0;
    432         if (UThreadPoll(socket, EPOLLIN, &revents, -1) > 0) {
    433             ret = accept(socket.socket, addr, addrlen);
    434         } else {
    435             ret = -1;
    436         }
    437     }
    438 
    439     return ret;
    440 }
    441 
    442 ssize_t UThreadRead(UThreadSocket_t & socket, void * buf, size_t len, int flags) {
    443     //int ret = read(socket.socket, buf, len);
    444     int ret = -1;
    445 
    446     //if (ret < 0 && EAGAIN == errno) {
    447         int revents = 0;
    448         if (UThreadPoll(socket, EPOLLIN, &revents, socket.socket_timeout_ms) > 0) {
    449             ret = read(socket.socket, buf, len);
    450         } else {
    451             ret = -1;
    452         }
    453     //}
    454 
    455     return ret;
    456 }
    474 ssize_t UThreadSend(UThreadSocket_t & socket, const void *buf, size_t len, int flags) {
    475     int ret = send(socket.socket, buf, len, flags);
    476 
    477     if (ret < 0 && EAGAIN == errno) {
    478         int revents = 0;
    479         if (UThreadPoll(socket, EPOLLOUT, &revents, socket.socket_timeout_ms) > 0) {
    480             ret = send(socket.socket, buf, len, flags);
    481         } else {
    482             ret = -1;
    483         }
    484     }
    485 
    486     return ret;
    487 }
    

    下面的WorkerLogic实现是协程或线程实际处理request的实现,从data_flow中拿消息,然后dispatch_处理后放入data_flow中,并NotifyEpoll:

    459 void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {
    464     BaseResponse *resp{req->GenResponse()};
    465     if (queue_wait_time_ms < MAX_QUEUE_WAIT_TIME_COST) {
    466         HshaServerStat::TimeCost time_cost;
    468         DispatcherArgs_t dispatcher_args(pool_->hsha_server_stat_->hsha_server_monitor_,
    469                 worker_scheduler_, pool_->args_);
    470         pool_->dispatch_(req, resp, &dispatcher_args);
    474     } else {
    475         pool_->hsha_server_stat_->worker_drop_requests_++;
    476     }
    477     pool_->data_flow_->PushResponse(args, resp);
    480     pool_->scheduler_->NotifyEpoll();
    482     delete req;
    483 }
    

    下面是整个协程的实现,协程的调度由UThreadEpollScheduler来操作,上面也作了些分析,管理由UThreadRuntime
    以下实现是每个协程的栈,私有栈,使用文件映射的方法作了些保护:

     29 class UThreadStackMemory {
     30 public:
     31     UThreadStackMemory(const size_t stack_size, const bool need_protect = true);
     32     ~UThreadStackMemory();
     33 
     34     void * top();
     35     size_t size();
     36 
     37 private:
     38     void * raw_stack_;
     39     void * stack_;
     40     size_t stack_size_;
     41     int need_protect_;
     42 };
     33 UThreadStackMemory :: UThreadStackMemory(const size_t stack_size, const bool need_protect) :
     34     raw_stack_(nullptr), stack_(nullptr), need_protect_(need_protect) {
     35     int page_size = getpagesize();
     36     if ((stack_size % page_size) != 0) {
     37         stack_size_ = (stack_size / page_size + 1) * page_size;
     38     } else {
     39         stack_size_ = stack_size;
     40     }
     41 
     42     if (need_protect) {
     43         raw_stack_ = mmap(NULL, stack_size_ + page_size * 2,
     44                 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
     45         assert(raw_stack_ != nullptr);
     46         assert(mprotect(raw_stack_, page_size, PROT_NONE) == 0);
     47         assert(mprotect((void *)((char *)raw_stack_ + stack_size_ + page_size), page_size, PROT_NONE) == 0);
     48         stack_ = (void *)((char *)raw_stack_ + page_size);
     49     } else {
     50         raw_stack_ = mmap(NULL, stack_size_, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
     51         assert(raw_stack_ != nullptr);
     52         stack_ = raw_stack_;
     53     }
     54 }
     55 
     56 UThreadStackMemory :: ~UThreadStackMemory() {
     57     int page_size = getpagesize();
     58     if (need_protect_) {
     59         assert(mprotect(raw_stack_, page_size, PROT_READ | PROT_WRITE) == 0);
     60         assert(mprotect((void *)((char *)raw_stack_ + stack_size_ + page_size), page_size, PROT_READ | PROT_WRITE) == 0);
     61         assert(munmap(raw_stack_, stack_size_ + page_size * 2) == 0);
     62     } else {
     63         assert(munmap(raw_stack_, stack_size_) == 0);
     64     }
     65 }
    

    以下是协程对象的基类和派生类的具体实现:

     36 class UThreadContext {
     37 public:
     38     UThreadContext() { }
     39     virtual ~UThreadContext() { }
     40 
     41     static UThreadContext * Create(size_t stack_size,
     42             UThreadFunc_t func, void * args,
     43             UThreadDoneCallback_t callback, const bool need_stack_protect);
     44     static void SetContextCreateFunc(ContextCreateFunc_t context_create_func);
     45     static ContextCreateFunc_t GetContextCreateFunc();
     46 
     47     virtual void Make(UThreadFunc_t func, void * args) = 0;
     48     virtual bool Resume() = 0;
     49     virtual bool Yield() = 0;
     50 
     51 private:
     52     static ContextCreateFunc_t context_create_func_;
     53 };
    
     28 UThreadContext * UThreadContext :: Create(size_t stack_size,
     29         UThreadFunc_t func, void * args,
     30         UThreadDoneCallback_t callback, const bool need_stack_protect) {
     31     if (context_create_func_ != nullptr) {
     32         return context_create_func_(stack_size, func, args, callback, need_stack_protect);
     33     }
     34     return nullptr;
     35 }
    
     34 class UThreadContextSystem : public UThreadContext {
     35 public:
     36     UThreadContextSystem(size_t stack_size, UThreadFunc_t func, void * args,
     37             UThreadDoneCallback_t callback, const bool need_stack_protect);
     38     ~UThreadContextSystem();
     39 
     40     static UThreadContext * DoCreate(size_t stack_size,
     41             UThreadFunc_t func, void * args, UThreadDoneCallback_t callback,
     42             const bool need_stack_protect);
     43 
     44     void Make(UThreadFunc_t func, void * args) override;
     45     bool Resume() override;
     46     bool Yield() override;
     47 
     48     ucontext_t * GetMainContext();
     49 
     50 private:
     51     static void UThreadFuncWrapper(uint32_t low32, uint32_t high32);
     52 
     53     ucontext_t context_;
     54     UThreadFunc_t func_;
     55     void * args_;
     56     UThreadStackMemory stack_;
     57     UThreadDoneCallback_t callback_;
     58 };
     30 
     31 UThreadContextSystem :: UThreadContextSystem(size_t stack_size, UThreadFunc_t func, void * args,
     32         UThreadDoneCallback_t callback, const bool need_stack_protect)
     33     : func_(func), args_(args), stack_(stack_size, need_stack_protect), callback_(callback) {
     34     Make(func, args);
     35 }
    
     40 UThreadContext * UThreadContextSystem :: DoCreate(size_t stack_size,
     41         UThreadFunc_t func, void * args, UThreadDoneCallback_t callback,
     42         const bool need_stack_protect) {
     43     return new UThreadContextSystem(stack_size, func, args, callback, need_stack_protect);
     44 }
     69 ucontext_t * UThreadContextSystem :: GetMainContext() {
     70     static __thread ucontext_t main_context;
     71     return &main_context;
     72 }
    

    上面的实现是用的ucontext_t,其中用到的几个函数说明下:
    int getcontext(ucontext_t *ucp) 用于将当前执行状态上下文保存到一个ucontext_t结构中;
    int makecontext(ucontext_t ucp, void (func)(), int argc, ...) 用于初始化一个ucontext_t类型的结构,即上下文,每个参数类型都是int型,函数指针func指明了该context的入口函数,在执行该函数前,一般要执行getcontext并设置相关的初始栈信息,包括栈指针和栈大小等信息;
    int swapcontext(ucontext_t *oucp, ucontext_t *ucp) 用于完成旧状态的保存和切换到新状态的工作;
    以上具体原理可以移到参考资料中。

     31 class UThreadRuntime {
     32 public:
     33     UThreadRuntime(size_t stack_size, const bool need_stack_protect);
     34     ~UThreadRuntime();
     35 
     36     int Create(UThreadFunc_t func, void * args);
     37     int GetCurrUThread();
     38     bool Yield();
     39     bool Resume(size_t index);
     40     bool IsAllDone();
     41     int GetUnfinishedItemCount() const;
     42 
     43     void UThreadDoneCallback();
     44 
     45 private:
     46     struct ContextSlot {
     47         ContextSlot() {
     48             context = nullptr;
     49             next_done_item = -1;
     50         }
     51         UThreadContext * context;
     52         int next_done_item;
     53         int status;
     54     };
     55 
     56     size_t stack_size_;
     57     std::vector<ContextSlot> context_list_;
     58     int first_done_item_;
     59     int current_uthread_;
     60     int unfinished_item_count_;
     61     bool need_stack_protect_;
     62 };
    
     36 UThreadRuntime :: UThreadRuntime(size_t stack_size, const bool need_stack_protect)
     37     :stack_size_(stack_size), first_done_item_(-1),
     38     current_uthread_(-1), unfinished_item_count_(0),
     39     need_stack_protect_(need_stack_protect) {
     40     if (UThreadContext::GetContextCreateFunc() == nullptr) {
     41         UThreadContext::SetContextCreateFunc(UThreadContextSystem::DoCreate);
     42     }
     43 }
    111     return true;
    112 }
    

    上面差不多是管理协程的池,如何使用?比如如下代码段:

    193 void UThreadEpollScheduler::ConsumeTodoList() {
    194     while (!todo_list_.empty()) {
    195         auto & it = todo_list_.front();
    196         int id = runtime_.Create(it.first, it.second);
    197         runtime_.Resume(id);
    198 
    199         todo_list_.pop();
    200     }
    201 }
    

    对每一个task,创建协程,如果first_done_item_表示有可用的协程,那么复用它并修改first_done_item_next_done_item表示下一个可用的协程索引号,然后调用Make进行设置和初始化当前协程的上下文,然后设置uc_link执行完毕回到的主协程,即main_contextUThreadFuncWrapper是协程的入口函数,把this分段成两个32位的值,在UThreadFuncWrapper里又拼接成this,然后进行处理,如果有回调那么进行回调UThreadDoneCallback,它的作用差不多类似把协程资源归还到vector中,并更新一些数据:

     74 void UThreadContextSystem :: UThreadFuncWrapper(uint32_t low32, uint32_t high32) {
     75     uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t) high32 << 32);
     76     UThreadContextSystem * uc = (UThreadContextSystem *)ptr;
     77     uc->func_(uc->args_);
     78     if (uc->callback_ != nullptr) {
     79         uc->callback_();
     80     }
     81 }
    
     77 void UThreadRuntime :: UThreadDoneCallback() {
     78     if (current_uthread_ != -1) {
     79         ContextSlot & context_slot = context_list_[current_uthread_];
     80         context_slot.next_done_item = first_done_item_;
     81         context_slot.status = UTHREAD_DONE;
     82         first_done_item_ = current_uthread_;
     83         unfinished_item_count_--;
     84         current_uthread_ = -1;
     85     }
     86 }
    

    如果first_done_item_为-1表示没有可用的协程,那么创建新的协程对象,并进行初始化,然后入vector进行管理。

     51 int UThreadRuntime :: Create(UThreadFunc_t func, void * args) {
     52     if (func == nullptr) {
     53         return -2;
     54     }
     55     int index = -1;
     56     if (first_done_item_ >= 0) {
     57         index = first_done_item_;
     58         first_done_item_ = context_list_[index].next_done_item;
     59         context_list_[index].context->Make(func, args);
     60     } else {
     61         index = context_list_.size();
     62         auto new_context = UThreadContext::Create(stack_size_, func, args,
     63                 std::bind(&UThreadRuntime::UThreadDoneCallback, this),
     64                 need_stack_protect_);
     65         assert(new_context != nullptr);
     66         ContextSlot context_slot;
     67         context_slot.context = new_context;
     68         context_list_.push_back(context_slot);
     69     }
     70 
     71     context_list_[index].next_done_item = -1;
     72     context_list_[index].status = UTHREAD_SUSPEND;
     73     unfinished_item_count_++;
     74     return index;
     75 }
    
     46 void UThreadContextSystem :: Make(UThreadFunc_t func, void * args) {
     47     func_ = func;
     48     args_ = args;
     49     getcontext(&context_);
     50     context_.uc_stack.ss_sp = stack_.top();
     51     context_.uc_stack.ss_size = stack_.size();
     52     context_.uc_stack.ss_flags = 0;
     53     context_.uc_link = GetMainContext();
     54     uintptr_t ptr = (uintptr_t)this;
     55     makecontext(&context_, (void (*)(void))UThreadContextSystem::UThreadFuncWrapper,
     56             2, (uint32_t)ptr, (uint32_t)(ptr >> 32));
     57 }
    

    创建完协程后,进行Resume开始切入该协程,设置status为running状态,并切出主协程,切入要运行的协程:

     88 bool UThreadRuntime :: Resume(size_t index) {
     89     if (index >= context_list_.size()) {
     90         return false;
     91     }
     92 
     93     auto context_slot = context_list_[index];
     94     if (context_slot.status == UTHREAD_SUSPEND) {
     95         current_uthread_ = index;
     96         context_slot.status = UTHREAD_RUNNING;
     97         context_slot.context->Resume();
     98         return true;
     99     }
    100     return false;
    101 }
    
     59 bool UThreadContextSystem :: Resume() {
     60     swapcontext(GetMainContext(), &context_);
     61     return true;
     62 }
    

    如果要等待某个资源,那么协程需要让出给另一个协程执行,即Yield,根据当前的协程索引,获得协程对象,并设置status为挂起suspend,然后让出:

    103 bool UThreadRuntime :: Yield() {
    104     if (current_uthread_ != -1) {
    105         auto context_slot = context_list_[current_uthread_];
    106         current_uthread_ = -1;
    107         context_slot.status = UTHREAD_SUSPEND;
    108         context_slot.context->Yield();
    109     }
    
     64 bool UThreadContextSystem :: Yield() {
     65     swapcontext(&context_, GetMainContext());
     66     return true;
     67 }
    

    以上差不多是整个协程池的实现了,后期也会分析下libco中的实现,跟这个不一样,然后pebble中的协程也会分析下。

    大概分析如下,其他的一些比如data_flow/buffer/timer等实现,如果有时间会分析下,以下连接中有分析了。基本上对于一个框架的理解,主要是从整个框架比如多进程还是单进程多线程,从消息源到处理,到返回消息响应,这么分析会比较好,然后再慢慢加上其他功能,比如统计,负载情况,以及怎么处理消息,怎么缓存请求等。

    参考资料:
    https://blog.csdn.net/shanshanpt/article/details/55213287
    https://blog.csdn.net/shanshanpt/article/details/55253379
    https://blog.csdn.net/lmfqyj/article/details/79437157
    https://blog.csdn.net/lmfqyj/article/details/79406952
    http://man7.org/linux/man-pages/man2/mmap.2.html
    https://segmentfault.com/p/1210000009166339/read
    http://man7.org/linux/man-pages/man2/getcontext.2.html
    https://segmentfault.com/a/1190000013177055

    相关文章

      网友评论

          本文标题:Phxrpc协程库实现

          本文链接:https://www.haomeiwen.com/subject/nkduhftx.html