代码只是一个demo,主要是为了熟悉协程。
main.cc
#include <iostream>
#include <pthread.h>
#include <stdio.h>
#include "sche.h"
#include "corou.h"
#include "time.h"
using namespace std;
void test_cor_run1(void *arg){
int a=1;
std::cout<<"a "<<a<<std::endl;
Schedule *sche=get_scheduler();
Coroutine *c=sche->CurrentTask();
c->CorYield();
a++;
std::cout<<"a "<<a<<std::endl;
}
void test_cor_dtor1(void *arg){
std::cout<<"coroutine 1"<<std::endl;
}
void test_cor_run3(void *arg);
void test_cor_dtor3(void *arg);
void test_cor_run2(void *arg){
int b=1;
std::cout<<"b "<<b<<std::endl;
Schedule *sche=get_scheduler();
Coroutine *c=sche->CurrentTask();
printf("in 2 %p\n",c);
c->CorYield();
b++;
std::cout<<"b "<<b<<std::endl;
sche->CreateTask(test_cor_run3,test_cor_dtor3,nullptr);
}
void test_cor_dtor2(void *arg){
std::cout<<"coroutine 2"<<std::endl;
}
void test_cor_run3(void *arg){
int z=1;
std::cout<<"z "<<z<<std::endl;
Schedule *sche=get_scheduler();
Coroutine *c=sche->CurrentTask();
printf("in 2 %p\n",c);
c->CorYield();
z++;
std::cout<<"z "<<z<<std::endl;
}
void test_cor_dtor3(void *arg){
std::cout<<"coroutine 3"<<std::endl;
}
int main(){
Schedule *sche=Schedule::Create();
sche->CreateTask(test_cor_run1,test_cor_dtor1,nullptr);
sche->CreateTask(test_cor_run2,test_cor_dtor2,nullptr);
int64_t delta=500;
int64_t last=TimeMillis();
int64_t stop=last+delta;
int64_t now=0;
while(true){
now=TimeMillis();
sche->ScheduleProcess();
if(now>stop){
break;
}
}
int32_t elapse=now-last;
printf("time %d\n",elapse);
return 0;
}
sche.h
#pragma once
#include <pthread.h>
#include <map>
#include <deque>
#include <list>
#include "corou.h"
class Schedule{
public:
Schedule();
~Schedule();
static Schedule *Create();
void CreateTask(task_run run,task_destory destroy,void *arg);
void ScheduleProcess();
uint32_t AllocCoroutineId(){
return coroutine_id_++;
}
Coroutine* CurrentTask(){
return current_task_;
}
void Join(){
running_=false;
}
void CorAddToReady(Coroutine* c);
void RemoveCor(Coroutine* c);
void *get_main_ctx(){return main_ctx_;}
private:
Coroutine *current_task_{nullptr};
pthread_t pid_;
bool running_{true};
void *main_ctx_{nullptr};
uint32_t coroutine_id_{0};
std::deque<Coroutine*> ready_tasks_;
std::map<uint32_t,Coroutine*> sleep_tasks_;
std::list<Coroutine*> destroy_tasks_;
};
Schedule *get_scheduler();
sche.cc
#include "sche.h"
#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <vector>
#include <functional>
#include <assert.h>
std::vector<std::function<void()>>* ExitList()
{
static std::vector<std::function<void()>> *vec = new std::vector<std::function<void()>>;
return vec;
}
static void MyOnExit(){
auto vec = ExitList();
for (auto fn : *vec) {
fn();
}
vec->clear();
}
static int InitOnExit() {
atexit(&MyOnExit);
return 0;
}
pthread_key_t thread_sched_key;
static pthread_once_t key_once = PTHREAD_ONCE_INIT;
static void thread_key_destructor(void *data)
{
free(data);
}
static void thread_key_create(void)
{
assert(pthread_key_create(&thread_sched_key,
thread_key_destructor) == 0);
assert(pthread_setspecific(thread_sched_key, NULL) == 0);
return;
}
Schedule *get_scheduler(){
return (Schedule*)pthread_getspecific(thread_sched_key);
}
Schedule::Schedule(){
main_ctx_=ConvertThreadToFiber(NULL);
}
Schedule::~Schedule(){
while(!destroy_tasks_.empty()){
Coroutine *c=destroy_tasks_.front();
destroy_tasks_.pop_front();
delete c;
}
while(!ready_tasks_.empty()){
Coroutine *c=ready_tasks_.front();
destroy_tasks_.pop_front();
delete c;
}
ConvertFiberToThread();
main_ctx_=nullptr;
std::cout<<"dtor"<<std::endl;
}
Schedule *Schedule::Create(){
assert(pthread_once(&key_once, thread_key_create) == 0);
Schedule *sche=get_scheduler();
if(!sche){
InitOnExit();
sche=new Schedule();
auto vec = ExitList();
vec->push_back([=]{
delete sche;
});
pthread_setspecific(thread_sched_key,(void*)sche);
}
return sche;
}
void Schedule::CreateTask(task_run run,task_destory destroy,void *arg){
Coroutine *c=new Coroutine(this,run,destroy,arg);
ready_tasks_.push_back(c);
}
void Schedule::CorAddToReady(Coroutine* c){
ready_tasks_.push_back(c);
}
void Schedule::RemoveCor(Coroutine* c){
destroy_tasks_.push_back(c);
}
void Schedule::ScheduleProcess(){
/*while(running_)*/{
while(!destroy_tasks_.empty()){
Coroutine *c=destroy_tasks_.front();
destroy_tasks_.pop_front();
delete c;
}
if(!ready_tasks_.empty()){
Coroutine *c=ready_tasks_.front();
ready_tasks_.pop_front();
current_task_=c;
c->context.SwapIn();
current_task_=nullptr;
}
}
}
corou.h
#pragma once
#include <WinSock2.h>
#include <windows.h>
#include <stdint.h>
class Schedule;
//# define FCONTEXT_CALL __stdcall
typedef void (WINAPI *fn_t)(void *);
class Context{
public:
Context(fn_t fn, void* vp){
//SIZE_T commit_size = 4 * 1024;
ctx_ = CreateFiber(0,fn,vp);
}
~Context(){
DeleteFiber(ctx_);
}
void SwapIn(){
SwitchToFiber(ctx_);
}
void SwapOut(void* main){
SwitchToFiber(main);
}
private:
void* ctx_;
};
typedef void (*task_run)(void *arg);
typedef void (*task_destory)(void *arg);
class Coroutine{
public:
Coroutine(Schedule *sche,task_run process,task_destory dtor,void *user_data);
~Coroutine();
void CorYield();
public:
uint32_t id;
Schedule *scheduler;
task_run run;
task_destory destroy;
void *arg;
Context context;
};
void WINAPI fiberProc(void *fiber);
corou.cc
#include "corou.h"
#include <assert.h>
#include "sche.h"
Coroutine::Coroutine(Schedule *sche,task_run process,task_destory dtor,void *user_data)
:scheduler(sche)
,run(process)
,destroy(dtor)
,arg(user_data)
,context(fiberProc,this){
id=scheduler->AllocCoroutineId();
}
Coroutine::~Coroutine(){
if(destroy){
destroy(arg);
}
}
void Coroutine::CorYield(){
scheduler->CorAddToReady(this);
context.SwapOut(scheduler->get_main_ctx());
}
void WINAPI fiberProc(void *fiber){
Coroutine *coroutine=static_cast<Coroutine*>(fiber);
assert(coroutine);
coroutine->run(coroutine->arg);
Schedule *sche=coroutine->scheduler;
sche->RemoveCor(coroutine);
SwitchToFiber(sche->get_main_ctx());
}
time.h
#pragma once
#include <stdint.h>
int64_t TimeMillis();
int64_t TimeMicro();
time.cc
#include "time.h"
#define WIN_32
#if defined (WIN_32)
#include <windows.h>
#include <time.h>
#else
#include <unistd.h>
#include <time.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/types.h>
#endif
static inline void itimeofday(long *sec, long *usec){
#if defined (WIN_32)
static long mode = 0, addsec = 0;
bool retval;
static int64_t freq = 1;
int64_t qpc;
if (mode == 0) {
retval = QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
freq = (freq == 0)? 1 : freq;
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
addsec = (long)time(NULL);
addsec = addsec - (long)((qpc / freq) & 0x7fffffff);
mode = 1;
}
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
retval = retval * 2;
if (sec) *sec = (long)(qpc / freq) + addsec;
if (usec) *usec = (long)((qpc % freq) * 1000000 / freq);
#else
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
#endif
}
static int64_t base_clock64(void){
long s, u;
int64_t value;
itimeofday(&s, &u);
value = ((int64_t)s) * 1000 + (u / 1000);
return value;
}
static int32_t base_clock32(void){
return (int32_t)(base_clock64()& 0xfffffffful);
}
int64_t TimeMillis(){
return base_clock64();
}
int64_t TimeMicro(){
return base_clock64()*1000;
}
reference
[1] lthread https://github.com/halayli/lthread
[2] libgo https://github.com/yyzybb537/libgo
[3] kcp https://github.com/skywind3000/kcp
[4] linux中模拟pthread接口的用户态线程库gnu pth
网友评论