美文网首页我爱编程skynet
浅析skynet底层框架上篇

浅析skynet底层框架上篇

作者: fooboo | 来源:发表于2018-06-09 09:29 被阅读410次

    写在前面

    这篇文章是分析skynet框架,自己“用”skynet已经有一年,项目中是以它为底层框架,上层使用lua,以消息方式驱动逻辑,做到隔离保护;

    我先说说自己这段时间使用的感受,不会涉及到项目里功能的具体实现。当时我刚入职,给自己列下三周的计划,前一周熟悉下lua语言,比较重要和常用的部分,能做到怎么使用好lua以及规范这样;第二周熟悉skynet框架,从main开始,分析它有几个部分,每个部分做了什么,然后不懂的地方看下官方的wiki和别人写的分析,再然后gdb看下一些变量值什么的;第三周是看项目的整体框架及各个模块组成及作用,以及消息走向,和登陆流程等,当然也是结合lua具体怎么使用;第四周开始接需求和从其他同事手中接手原来的所有玩法功能,一方面是维护,另一方面是增加或修改需求吧,我是比较赞成老人做新事,新人做老事,这样有个熟悉的过程,也减少些不必要的风险和其他成本。

    当然我一直做的是新需求,而且比较重要,且系统有点复杂,附加任务是维护老的功能。我实现功能主要是从复杂度和扩展性上面考虑,一般先把需求文档弄清楚,约定通信协议格式,把临时数据和需要持久化的数据定好,然后将整个关键点列出来然后逐一分解,而且策划需求总是变更,把可能会变的功能,跟基础功能分开,改动的比较少引入新bug可能性会小些。

    后来也断断续续利用业余时间研究skynet源码,所以准备记录下来。我这里以“浅析”是因为对skynet框架还没熟练运用并理解里面的实现原理,这里能看懂源码,但对云风作者为什么要这么设计等当时的思考不知,虽然会参考他的博客,但是有些一步步走过来的坑还不知道。

    我会带着几个问题去探索这个skynet设计以及运行原理,研究源码,并不是把代码给分析一遍,而是需要思考为什么这么设计,换一种方式是否可行?要带着why,去思考how和why,这样才有可能学到,并且可能进行二次开发等。

    以下是几个问题,可能并不能全面理解所有实现,因为一篇文章是无法分析完的,当然可能有后续的分析,下面正式开始。

    问题:
    1)整个skynet框架的组成,以及各模块的作用,以及启动流程;
    2)如何加载一个服务,以及从服务a发消息至服务b的流程,处理并返回;
    3)当并发时,如何保证正确时序,以及如何使用协程处理消息(同步/异步/超时);

    以上三个问题涉及到的东西比较多,有不明白的可以参考网上资料和源码。

    这里不会详细介绍lua的东西,我会在后续博客中重点分析下lua的协程实现源码以及闭包实现,这是为了填以往博客的坑。也不会分析游戏中的框架,这里会用身边的场景来代入问题。这里的分析只考虑单点,不会考虑分布式节点。

    首先说明的是,skynet是多线程框架,然后里面对应了一些服务(service),每个服务对应一个lua 虚拟机,而一个虚拟机中可以跑很多个协程,但同一时刻就一个协程,每条消息处理由协程来做,且运行在保护模式下,lua层实现了协程池和时序相关的队列,这里可以类比前面c++协程相关实现,关于虚拟机的原理以及如何和c/c++交互可以查阅资料。

    skynet框架那些事儿

    整个框架相关的源码在skynet-src目录下,下面分析时不会涉及到哪个目录哪个文件,会从启动流程开始,有些初始化会跳过不作详细分析,但重要的相关的会说明。

    main启动时,会有个lua格式的config配置文件,里面配置了工作线程个数,要加载的lua服务,以及环境相关的参数信息等,部分代码如下,其中skynet_start是关键。

    118 int
    119 main(int argc, char *argv[]) {
    135     struct skynet_config config;
    140     interr =  luaL_loadbufferx(L, load_config, strlen(load_config),"=[skynet config]", "t");
    152     config.thread =  optint("thread",8);
    153     config.module_path = optstring("cpath","./cservice/?.so");
    155     config.bootstrap = optstring("bootstrap","snlua bootstrap");
    156     config.daemon = optstring("daemon", NULL);
    157     config.logger = optstring("logger", NULL);
    158     config.logservice = optstring("logservice", "logger");
    163     skynet_start(&config);
    167     return 0;
    168 }
    
    246 void   
    247 skynet_start(struct skynet_config * config) {
    262     skynet_mq_init();
    264     skynet_timer_init();
    265     skynet_socket_init();
    268     struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
    274     bootstrap(ctx, config->bootstrap);
    276     start(config->thread);
    

    以上是部分实现代码,在skynet_start过程中,会先设置信号处理函数,判断是否作为daemon进程,初始化harbor是否作为分布式节点之一(这时只考虑有一个节点)。

    代码行262初始化全局消息队列:

    211 void 
    212 skynet_mq_init() {
    213     struct global_queue *q = skynet_malloc(sizeof(*q));
    214     memset(q,0,sizeof(*q));
    215     SPIN_INIT(q);
    216     Q=q;
    217 }
    
     35 struct global_queue {
     36    struct message_queue *head;
     37    struct message_queue *tail;
     38    struct spinlock lock;
     39 };
    

    skynet中的消息队列是非常重要的,服务之间通信正是通过消息来驱动的,这里的设计是有一个全局消息队列,然后每个服务在创建时有一个消息队列,是发往到该服务待处理的消息,然后挂载到全局队列中等工作线程处理,引用一张图如下:


    框架

    关于把消息入队和出队在这里不做详细分析。

    代码行264是初始化定时器相关的数据,数据结构如下:

     46 struct timer {
     47    struct link_list near[TIME_NEAR];
     48    struct link_list t[4][TIME_LEVEL];
     49    struct spinlock lock;
     50    uint32_t time;
     51    uint32_t starttime;
     52    uint64_t current;
     53    uint64_t current_point;
     54 };
    

    其中为什么有这一行struct link_list t[4][TIME_LEVEL],当时我分析的时候也是一头雾水,会在后面说明原因,以及如何跟协程绑定和处理超时,会在后面说明。

    代码行265是初始化socket相关的数据,主要结构如下:

     100 struct socket_server {
     101    int recvctrl_fd;
     102    int sendctrl_fd;
     103    int checkctrl;
     104    poll_fd event_fd;
     105    int alloc_id;
     106    int event_n;
     107    int event_index;
     108    struct socket_object_interface soi;
     109    struct event ev[MAX_EVENT];
     110    struct socket slot[MAX_SOCKET];
     111    char buffer[MAX_INFO];
     112    uint8_t udpbuffer[MAX_UDP_PACKAGE];
     113    fd_set rfds;
     114 };
    

    初始化部分实现如下:

     328 struct socket_server *
     329 socket_server_create() {
     331    int fd[2];
     332    poll_fd efd = sp_create();
     337    if (pipe(fd)) { }
     342    if (sp_add(efd, fd[0], NULL)) { }
     351    struct socket_server *ss = MALLOC(sizeof(*ss));
     352    ss->event_fd = efd;
     353    ss->recvctrl_fd = fd[0];
     354    ss->sendctrl_fd = fd[1];
     371 }
    

    以上这块设计的挺好的,每个字段作用从命名能知道一二,具体的流程会在代码中分析(不知道能写多少)。

    代码行268会先创建个log服务,struct skynet_context对应一个虚拟机:

    125 struct skynet_context *
    126 skynet_context_new(const char * name, const char *param) {
    127     struct skynet_module * mod = skynet_module_query(name);
    132     void *inst = skynet_module_instance_create(mod);
    135     struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
    136     CHECKCALLING_INIT(ctx)
    138     ctx->mod = mod;
    139     ctx->instance = inst;
    140     ctx->ref =2;
    141     ctx->cb =NULL;
    142     ctx->cb_ud =NULL;
    143     ctx->session_id =0;
    154     ctx->handle =0;
    155     ctx->handle = skynet_handle_register(ctx);
    156     struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
    161     int r = skynet_module_instance_init(mod, inst, ctx, param);
    162     CHECKCALLING_END(ctx)
    163     if (r == 0) {
    164         struct skynet_context * ret = skynet_context_release(ctx);
    165         if (ret) {
    166             ctx->init =true;
    167         }
    168         skynet_globalmq_push(queue);
    172         return ret;
    173     }else {
    174         //more code...
    181     }
    182 }
    

    以上这段实现非常重要,这里加载动态库的模块为logger.so,非snlua.so,在用snlua启动bootstrap服务时会说明其他部分。

    这里skynet_module_query根据name加载动态库用以初始化各服务,没有加载过的话会dlopen(tmp, RTLD_NOW | RTLD_GLOBAL),并设置相关的接口:

     92 static int
     93 open_sym(struct skynet_module *mod) {
     94    mod->create = get_api(mod,"_create");
     95    mod->init = get_api(mod,"_init");
     96    mod->release = get_api(mod,"_release");
     97    mod->signal = get_api(mod,"_signal");
     99    return mod->init == NULL;
    100 }
    

    后面的逻辑就是调用logger_create,这里不对logger相关的接口说明,毕竟是次要的,重点会分析snlua的;其中ctx->cb =NULL;ctx->cb_ud =NULL;是消息分发到lua层的回调函数和参数,ctx->handle = skynet_handle_register(ctx)是分配一个唯一的句柄handle与每个服务关联,通过handle可找到对应的服务,相当于地址;然后为每个服务创建服务消息队列,接着logger_init,最后把消息队列通过skynet_globalmq_push到全局队列,一方面接收新的消息,另一方面由工作线程依次处理。

    代码行274通过snlua启动bootstrap第一个服务(其实logger也算是一个服务):

    232 static void
    233 bootstrap(struct skynet_context * logger, const char * cmdline) {
    238     struct skynet_context *ctx = skynet_context_new(name, args);
    244 }
    

    其中cmdlinesnlua bootstrap,然后解析后namesnluaargsbootstrap;同logger一样,这里加载的是snlua.so,其他逻辑一样,这时重点分析下snlua_createsnlua_init

    180 struct snlua *
    181 snlua_create(void) {
    182     struct snlua * l = skynet_malloc(sizeof(*l));
    183     memset(l,0,sizeof(*l));
    186     l->L = lua_newstate(lalloc, l);
    187     return l;
    188 }
    
     14 struct snlua {
     15    lua_State * L;
     16    struct skynet_context * ctx;
     20 };
    

    其中lua_State是一个虚拟机的上下文,每个服务都有一个,做到服务与服务之间隔离,具体的数据组成可以看下lua源码中的声明;

    147 int
    148 snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
    152     skynet_callback(ctx, l , launch_cb);
    156     skynet_send(ctx,0, handle, PTYPE_TAG_DONTCOPY,0, "bootstrap", 9);
    158 }
    

    这里设置服务的回调函数launch_cb,然后发第一条消息给自己,由launch_cb处理,处理完后,重新设置回调函数为lua层的,那么后面路由到该服务的消息就能正确的分发到对应的回调函数了:

    134 static int
    135 launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , co    nst void * msg, size_t sz) {
    136     assert(type ==0 && session == 0);
    137     struct snlua *l = ud;
    138     skynet_callback(context,NULL, NULL);
    139     int err = init_cb(l, context, msg, sz);
    145 }
     75 static int
     76 init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
    104     const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
    106     int r = luaL_loadfile(L,loader);
    113     r = lua_pcall(L,1,0,1);
    
    108 static int
    109 lcallback(lua_State *L) {
    110     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
    111     int forward = lua_toboolean(L, 2);
    119     if (forward) {
    120         skynet_callback(context, gL, forward_cb);
    121     }else {
    122         skynet_callback(context, gL,_cb);
    123     }
    125     return 0;
    126 }
    
     55 static int
     56 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const vo    id * msg, size_t sz) {
     57    lua_State *L = ud;
     58    int trace = 1;
     59    int r;
     60    int top = lua_gettop(L);
     61    if (top == 0) {
     62        lua_pushcfunction(L, traceback);
     63        lua_rawgetp(L, LUA_REGISTRYINDEX,_cb);
     64    }else {
     65        assert(top ==2);
     66    }
     67    lua_pushvalue(L,2);
     69    lua_pushinteger(L, type);
     70    lua_pushlightuserdata(L, (void *)msg);
     71    lua_pushinteger(L,sz);
     72    lua_pushinteger(L, session);
     73    lua_pushinteger(L, source);
     75    r = lua_pcall(L,5, 0 , trace);
     97 
     98    return 0;
     99}
    

    init_cb比较复杂,设置虚拟机的相关环境变量,加载lua文件等,怎么加载的看上面列的几行关键代码,具体为什么看下源码和lua的luaL_loadfile和c与lua的交互栈。

    代码行276是启动线程:

    181 static void
    182 start(int thread) {
    183     pthread_t pid[thread+3];
    185     struct monitor *m = skynet_malloc(sizeof(*m));
    186     memset(m,0, sizeof(*m));
    187     m->count = thread;
    188     m->sleep =0;
    189 
    190     m->m = skynet_malloc(thread *sizeof(struct skynet_monitor *));
    191     int i;
    192     for (i=0;i<thread;i++) {
    193         m->m[i] = skynet_monitor_new();
    194     }
    
    204     create_thread(&pid[0], thread_monitor, m);
    205     create_thread(&pid[1], thread_timer, m);
    206     create_thread(&pid[2], thread_socket, m);
    208     static int weight[] = {
    209         -1, -1, -1, -1, 0, 0, 0, 0,
    210         1, 1, 1, 1, 1, 1, 1, 1,
    211         2, 2, 2, 2, 2, 2, 2, 2,
    212         3, 3, 3, 3, 3, 3, 3, 3, };
    213     struct worker_parm wp[thread];
    214     for (i=0;i<thread;i++) {
    215         wp[i].m = m;
    216         wp[i].id = i;
    217         if (i < sizeof(weight)/sizeof(weight[0])) {
    218             wp[i].weight= weight[i];
    219         }else {
    220             wp[i].weight =0;
    221         }
    222         create_thread(&pid[i+3], thread_worker, &wp[i]);
    223     }
    224     //pthread_join all thread
    230 }
    

    以上是创建多个线程,有thread_monitor线程,用于判断相应服务的消息列表是否过载;创建定时器thread_timer线程,用于处理超时;主要实现如下:

    172 static void
    173 timer_update(struct timer *T) {
    174     SPIN_LOCK(T);
    176     // try to dispatch timeout 0 (rare condition)
    177     timer_execute(T);
    179     // shift time first, and then dispatch timer message
    180     timer_shift(T);
    182     timer_execute(T);
    184     SPIN_UNLOCK(T);
    185 }
    

    接着再启动thread_socket网络线程,主要功能在skynet_socket_poll实现中,处理读和写,重点是读事件,这部分会放在后面分析。

    下面是处理消息的工作线程实现原理:

    152 static void *
    153 thread_worker(void *p) {
    154     struct worker_parm *wp = p;
    155     int id = wp->id;
    156     int weight = wp->weight;
    160     struct message_queue * q = NULL;
    161     while (!m->quit) {
    162         q = skynet_context_message_dispatch(sm, q, weight);
    163         //check q is null
    177     }
    178     return NULL;
    179 }
    

    上面是一个列循环,当m->quittrue时才退出,当没有消息时会进行pthread_cond_wait,然后就是一直skynet_context_message_dispatch,其中weight是权重,在队列消息数有一定数量的情况下,有些线程会根据weight尝试处理多条消息:

    296 structmessage_queue * 
    297 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight)     {       
    298     //check q is null
    299     q = skynet_globalmq_pop();
    300     //check q is null    
    304     uint32_t handle = skynet_mq_handle(q);
    305                
    306     struct skynet_context * ctx = skynet_handle_grab(handle);
    307     if (ctx == NULL) {
    308         struct drop_t d = { handle };
    309         skynet_mq_release(q, drop_message, &d);
    310         return skynet_globalmq_pop();
    311     }
    13      int i,n=1;
    314     struct skynet_message msg;
    315 
    316     for (i=0;i<n;i++) { 
    317         if (skynet_mq_pop(q,&msg)) {
    318             skynet_context_release(ctx);
    319             return skynet_globalmq_pop();
    320         } else if (i==0 && weight >= 0) {
    321             n = skynet_mq_length(q);
    322             n >>= weight;
    323         }
    324         int overload = skynet_mq_overload(q);
    325         if (overload) {
    326             skynet_error(ctx, "May overload, message queue length = %d", overload);
    327         }
    328 
    329         skynet_monitor_trigger(sm, msg.source , handle);
    330 
    331         if (ctx->cb == NULL) {
    332             skynet_free(msg.data);
    333         } else {
    334             dispatch_message(ctx, &msg);
    335         }
    336 
    337         skynet_monitor_trigger(sm, 0,0);
    338     }
    40      assert(q == ctx->queue);
    341     struct message_queue *nq = skynet_globalmq_pop();
    342     if (nq) {
    343         // If global mq is not empty , push q back, and return next queue (nq)
    344         // Else (global mq is empty or block, don't push q back, and return q again (for next di    spatch)
    345         skynet_globalmq_push(q);
    346         q = nq;
    347     }
    348     skynet_context_release(ctx);
    349 
    350     return q;
    351 }
    

    上面代码工作线程从全局队列中pop出一个服务的队列,然后根据队列的handle获取到对应服务的skynet_context,然后根据weight要处理多少条消息,主要逻辑在dispatch_message中,然后pop下一个服务的消息队列,这里算是公平吧,然后把原来的队列push到全局队列中,这里也有一些负载统计逻辑。

    dispatch_message里面最终执行的是ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz),就回到了上面说的调用lua的回调函数。
    以上是skynet框架的整体实现,可能有些细节或遗漏的地方没有在这里分析。

    总结下,主要引用连接中作者的设计思想:

    1)把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。

    2)简单说,Skynet 只负责把一个数据包从一个服务内发送出去,让同一进程内的另一个服务收到,调用对应的 callback 函数处理。它保证,模块的初始化过程,每个独立的 callback 调用,都是相互线程安全的。编写服务的人不需要特别的为多线程环境考虑任何问题。专心处理发送给它的一个个数据包。

    3)它仅仅是把数据包的指针,以及你声称的数据包长度(并不一定是真实长度)传递出去。由于服务都是在同一个进程内,接收方取得这个指针后,就可以直接处理其引用的数据了。
    这个机制可以在必要时,保证绝对的零拷贝,几乎等价于在同一线程内做一次函数调用的开销。
    但,这只是 Skynet 提供的性能上的可能性。它推荐的是一种更可靠,性能略低的方案:它约定,每个服务发送出去的包都是复制到用 malloc 分配出来的连续内存。接收方在处理完这个数据块(在处理的 callback 函数调用完毕)后,会默认调用 free 函数释放掉所占的内存。即,发送方申请内存,接收方释放。

    4)在 Skynet 启动时,建立了若干工作线程(数量可配置),它们不断的从主消息列队中取出一个次级消息队列来,再从次级队列中取去一条消息,调用对应的服务的 callback 函数进行出来。为了调用公平,一次仅处理一条消息,而不是耗净所有消息(虽然那样的局部效率更高,因为减少了查询服务实体的次数,以及主消息队列进出的次数),这样可以保证没有服务会被饿死。
    用户定义的 callback 函数不必保证线程安全,因为在 callback 函数被调用的过程中,其它工作线程没有可能获得这个 callback 函数所熟服务的次级消息队列,也就不可能被并发了。一旦一个服务的消息队列暂时为空,它的消息队列就不再被放回全局消息队列了。这样使大部分不工作的服务不会空转 CPU 。

    收发处理消息的那些事儿

    这节会说明如何从在lua层,从服务发条消息到另一个服务,这里为了简化起见,考虑的是send调用,发的参数是string类型,顺便印证上个小结最后总结引用的设计思想。

    当我们在lua层的业务逻辑中写这么一条语句skynet.send(agentAddr, "lua", "CallFunc", "hello world.")后会发生什么事情?
    他调用的是:

    416 function skynet.send(addr, typename, ...)
    417     local p = proto[typename]
    418     return c.send(addr, p.id, 0 , p.pack(...))
    419 end
    

    其中"lua"是我们的协议类型, "CallFunc"和"hello world."表示的是方法名和参数,表示agentAddr对方服务handle,skynet.pack是对方法名和参数名编码,是调用C层的:

    599 LUAMOD_API int
    600 luaseri_pack(lua_State *L) {
    601     struct block temp;
    602     temp.next = NULL;
    603     struct write_block wb;
    604     wb_init(&wb, &temp);
    605     pack_from(L,&wb,0);
    606     assert(wb.head == &temp);
    607     seri(L, &temp, wb.len); 
    609     wb_free(&wb);
    611     return 2;
    612 }
    
    534 static void
    535 seri(lua_State *L, struct block *b, int len) {
    536     uint8_t * buffer = skynet_malloc(len);
    537     uint8_t * ptr = buffer;
    538     int sz = len;
    539     while(len>0) {
    540         if (len >= BLOCK_SIZE) {
    541             memcpy(ptr, b->buffer, BLOCK_SIZE);
    542             ptr += BLOCK_SIZE;
    543             len -= BLOCK_SIZE;
    544             b = b->next;
    545         } else {
    546             memcpy(ptr, b->buffer, len);
    547             break;
    548         }
    549     }
    550 
    551     lua_pushlightuserdata(L, buffer);
    552     lua_pushinteger(L, sz);
    553 }
    
    296 static void
    297 pack_one(lua_State *L, struct write_block *b, int index, int depth) {
    320     case LUA_TSTRING: {
    321         size_t sz = 0;
    322         const char *str = lua_tolstring(L,index,&sz);
    323         wb_string(b, str, (int)sz);
    324         break;
    325     }
    

    打包的时候,会进行一次拷贝,最后返回一个C指针和数据长度,接着调用C层的send函数,即lsendlsend调用send_message(L, 0, 2)

    232 static int
    233 send_message(lua_State *L, int source, int idx_type) {
    234     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
    252     int mtype = lua_type(L,idx_type+2);
    253     switch (mtype) {
    267     case LUA_TLIGHTUSERDATA: {
    268         void * msg = lua_touserdata(L,idx_type+2);
    269         int size = luaL_checkinteger(L,idx_type+3);
    270         if (dest_string) {
    271             session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
    272         } else {
    273             session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
    274         }
    275         break;
    276     }
    285     lua_pushinteger(L,session);
    286     return 1;
    287 }
    

    其中type或上PTYPE_TAG_DONTCOPY表示不要拷贝数据,因为msg是C层变量指针,会在适当的时候由C释放;最后会返回session用于可能后续的响应消息回来找到上下文(对于call调用);

    699 int 
    700 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, i    nt session, void * data, size_t sz) {
    708     _filter_args(context, type, &session, (void **)&data, &sz);
    710     if (source == 0) {
    711         source = context->handle;
    712     }   
    713             
    714     if (destination == 0) {
    715         return session;
    716     }
    717     if (skynet_harbor_message_isremote(destination)) {
    718         //跟另外节点通讯
    724     } else {
    725         struct skynet_message smsg;
    726         smsg.source = source;
    727         smsg.session = session;
    728         smsg.data = data;
    729         smsg.sz = sz;
    730         
    731         if (skynet_context_push(destination, &smsg)) {
    732             skynet_free(data);
    733             return -1;
    734         }   
    735     }       
    736     return session;
    737 }
    
    229 int
    230 skynet_context_push(uint32_t handle, struct skynet_message *message) {
    231     struct skynet_context * ctx = skynet_handle_grab(handle);
    232     if (ctx == NULL) {
    233         return -1;
    234     }
    235     skynet_mq_push(ctx->queue, message);
    236     skynet_context_release(ctx);
    237 
    238     return 0;
    239 }
    

    上面代码是把消息压入对方服务的消息队列,而_filter_args主要是(可能)分配一个session值,对长度进行编码,高八位存放type信息。
    另外每个服务分配的session都是大于0的,每条消息唯一session,且当int变为负数时重置session为1,不大可能造成两条不同的消息而session相同。

    当对方服务被工作线程处理时,会进行消息的回调,就回到了上面的实现。

    工作中整理的问题:

    1)如果处理某个服务的消息造成死循环或处理时间过久,那么可能导致底层一些服务队列得不到调度和处理,可能造成消息队列过大,占用更多内存,从而导致“雪崩”问题,后面处理的消息都可能是超时的。比如定时任务运行耗时过多;业务需求比如从一些道具列表中随机几个不同的道具,直到选到几个不同的才退出循环,没有选择适合的随机算法;有的业务逻辑不正确,导致使用协程数量过多等待,无法切换回来并释放。

    2)协程的调度,切换回来后出现各种各样的问题,比如某个对象已经释放,虽然通过闭包引用着某个对象(地址),但是obj->isReleased()是true,所以处理了各种错误的数据。

    3)用不到的对象本应该释放,不小心相互引用着对方,导致lua gc时不能够回收相应资源,造成内存泄漏。

    4)不合理的使用call,导致在某些关键路径上任性的切换协程,而可能导致时序问题,因为使用协程后,有些是不确定的,即切出后什么时候切回来,如果玩在在登陆的时候请求其他服务数据导致切出协程,那什么时候切回来呢。

    5)还有跟顺序有关的消息处理,引用云风作者举的例子“如果 B 是一个 lua 服务,当 A 向 B 发送了两条消息 x 和 y 。skynet 一定保证 x 先被 B 中的 lua 虚拟机收到,并为 x 消息生成了一个 coroutine X ,并运行这个 coroutine 。然后才会收到消息 y ,重新生成一个新的 coroutine Y ,接下来运行。”
    如果处理X的时候,协程因为某些原因挂起,那么处理Y的时候可能会改变一些状态等。Skynet中Lua服务的消息处理

    6)还有使用不适合的算法,比如时间复杂度的O(n)或更差实现等。

    浅析skynet底层框架中篇主要分析skynet的定时器和网络实现部分,再加个消息队列,和本篇的第三个小问题。

    下面是参考资料:
    云风博客
    skynet源码
    Skynet 设计综述
    skynet服务的过载保护

    相关文章

      网友评论

        本文标题:浅析skynet底层框架上篇

        本文链接:https://www.haomeiwen.com/subject/xhtteftx.html