美文网首页
skynet消息调度及处理

skynet消息调度及处理

作者: Tao_Liang | 来源:发表于2017-02-16 23:08 被阅读0次

    skynet内部服务都是由一个一个的消息所驱动,每个服务的上下文结构体struct skynet_context有个字段struct message_queue *queue描述其消息队列,所有服务的消息队列挂在全局消息对列的列表struct global_queue *Q

    skynet在启动时会启动config->threadworker线程来处理所有服务的消息,worker线程的入口函数为static void *thread_worker(void *p),其处理逻辑如下:

    1. 如果当前要处理的消息队列为空,则从全局消息队列的列表中取下一个消息队列
    2. 对消息队列中的每个消息,调用该消息队列所属服务的回调函数,每次至少处理一个消息,之多处理消息队列长度右移weight个消息,其中weight是事先配置好的
    static int weight[] = { 
            -1, -1, -1, -1, 0, 0, 0, 0,
            1, 1, 1, 1, 1, 1, 1, 1, 
            2, 2, 2, 2, 2, 2, 2, 2, 
            3, 3, 3, 3, 3, 3, 3, 3, };
    

    比如配置启动nworker线程,第i个线程的weight为:当i小于weight数组长度时,线程weightweight[i-1],否则为0

    每个服务的消息队列都会被worker进程公平的进行处理,但是每个线程一次处理的消息个数由工作线程配置的权重决定。

    下面以snlua为例理解消息回调处理,在dispatch_message函数中,通过调用服务的回调函数来让服务处理其收到的消息:
    ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)

    snlua是执行lua服务的沙盒环境,启动一个lua服务之后,在lua代码中会设置回调函数,通常在skynet.lua文件中的skynet.start中设置c.callback(skynet.dispatch_message)c.callback调用的是:

      83 static int
      84 _callback(lua_State *L) {
      85     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
      86     int forward = lua_toboolean(L, 2);
      87     luaL_checktype(L,1,LUA_TFUNCTION);
      88     lua_settop(L,1);
      89     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
      90
      91     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
      92     lua_State *gL = lua_tothread(L,-1);
      93
      94     if (forward) {
      95         skynet_callback(context, gL, forward_cb);
      96     } else {
      97         skynet_callback(context, gL, _cb);
      98     }
      99
     100     return 0;
     101 }
    
    • 85行获取服务的上下文结构,此upvalue是在启动次服务的时候设置的
    • 89行在注册表中设置_cb=>skynet.dispatch_message
    • 91-92行获取服务的LUA状态机结构
    • 95或者97行设置服务上下结构体中的回调函数为_cb,回调函数私有数据为LUA状态机gL

    下面来分析回调函数_cb,任何LUA沙盒服务收到的消息的回调函数入口都是_cb

      30 static int
      31 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
      32     lua_State *L = ud;
      33     int trace = 1;
      34     int r;
      35     int top = lua_gettop(L);
      36     if (top == 0) {
      37         lua_pushcfunction(L, traceback);
      38         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
      39     } else {
      40         assert(top == 2);
      41     }
      42     lua_pushvalue(L,2);
      43
      44     lua_pushinteger(L, type);
      45     lua_pushlightuserdata(L, (void *)msg);
      46     lua_pushinteger(L,sz);
      47     lua_pushinteger(L, session);
      48     lua_pushinteger(L, source);
      49
      50     r = lua_pcall(L, 5, 0 , trace);
      51
      52     if (r == LUA_OK) {
      53         return 0;
      54     }
      55     const char * self = skynet_command(context, "REG", NULL);
      56     switch (r) {
      57     case LUA_ERRRUN:
      58         skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
      59         break;
      60     case LUA_ERRMEM:
      61         skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
      62         break;
      63     case LUA_ERRERR:
      64         skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
      65         break;
      66     case LUA_ERRGCMM:
      67         skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
      68         break;
      69     };
      70
      71     lua_pop(L,1);
      72
      73     return 0;
      74 }
    
    • 32-38行在LUA状态机的栈中设置即将执行的LUA函数及参数,依次是tracebackskynet.dispatch_messagetypemsgszsessionsource
    • 50行在保护模式执行skynet.dispatch_message函数,在此函数进行真正消息处理

    以上粗略的分析了skynet框架是如何调度每个服务的消息队列,以及如何通过回调函数来对服务的消息进行处理

    相关文章

      网友评论

          本文标题:skynet消息调度及处理

          本文链接:https://www.haomeiwen.com/subject/xrxkwttx.html