美文网首页
浅析skynet底层框架下篇

浅析skynet底层框架下篇

作者: fooboo | 来源:发表于2018-07-08 17:02 被阅读53次

    这是最后一篇了,其实还有很多重要的模块要分析的,但留给以后有多余时间再去研究吧,有兴趣的可以自行下载源码分析。这部分主要是围绕第三小问题展开,并附加些其他skynet中与此有关的设计,即:当并发时,如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);包括创建协程处理消息,挂起协程,切换。这块其实是针对lua上层来说的,底层框架的消息队列只是保证消息顺序入队列且出队列,如果交叉执行比如lua层的协程挂起,那么就会出现时序问题。

    先简单回顾下前几篇博客的分析,包括skynet本身的设计,及C++协程。对于C++协程,比如一个请求a过来后,从协程池中pop一个协程并处理该请求a,如果需要等待,则让出协程并挂上定时器,然后再处理下一个请求b,如果此时a和b是相关联的,且b有可能依赖于a的执行结果,那么就会出现问题。这对于游戏中的业务来说,尤其涉及到金钱相关的逻辑,那是大问题。而那种独立的请求间,只是读之类的操作,那是没问题的。如果需要结合业务,那么就需要改造。

    而对于skynet来说,当并发上来时,考虑到这个时序问题,底层实现相关的顺序队列,大概思路就是lua协程执行a到一半后,哪怕有b的消息被协程调度处理,此时会把这个b协程压入队列(lua中的table也可以,使用数组部分),必须等a执行完毕或超时后,再处理b的,也就是在业务上层串行化了服务的消息处理。这样保证了时序。

    但这又引起了另一个问题,即可能存在后面的消息都超时了,然而上层如果无法识别继续处理,那么就白白浪费了资源,处理了无用的消息。这类的相关介绍在另一篇“谈谈缓存穿透雪崩和过载保护以及一致性等问题”中有相关的介绍及应对方案。

    本节分为两个小点讨论,即:
    1)如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);
    2)创建协程处理消息,挂起协程,切换;

    第一小点,撇开语言方面的限制,考虑skynet本身的框架设计,而不掺杂业务框架的设计。对于单进程多线程,要想并发的处理同一个客户端的请求,不管是读还是写,都必须路由到同一个线程处理,这样就保证了不会导致同一个client的请求分发到不同的线程,在skynet底层抽象client为一个agent service,有自己的消息队列,并且当工作线程处理这个agent消息时,先把这个消息队列从全局队列出摘出来,从这个队列pop一条消息,处理完毕后,再把这个消息队列挂到全局队列中;而对于push消息到agent队列则没有这种过程,只要获得自旋锁即可,相关源码可以见前面的分析。

    这一层就保证了消息不会乱序,但是对于业务层,使用lua协程来提高并发,那么就要好好设计。

    这里举例比如在主场景中,这样可以考虑到client的所有消息都路由到场景后需要考虑到的时序问题。
    当与client有关的两条有依赖关系的消息a和b被场景服务dispatch分发处理时,不考虑读还是写,都会创建一个协程,并执行相关的处理函数。比如数据安全性不是特别严重的例子,玩家在帮派中,然后点领取今日奖励b消息,此时帮主把玩家踢出帮派a消息,本来是a先执行完毕后再b执行的顺序,这时可能出现a先执行导致挂起,而b执行完毕后,接着执行a的情况,多领了一份奖励。当然这里只是为了举例,通过检查可以避免这种问题。

    简单分析下,在skynet的做法中,为每个服务加个lua层的消息队列,进入该队列的消息会被依次处理完毕,不管中间是否挂起,这样带来的问题是,并发度降底了且引入了一定的复杂度。

     17     dispatch = function(session, from, ...)
     18         table.insert(message_queue, {session = session, addr = from, ... })
     19         if thread_id then //有消息,如果有等待则wakeup
     20             skynet.wakeup(thread_id)
     21             thread_id = nil
     22         end
     23     end
    
     26 local function do_func(f, msg)
     27     return pcall(f, table.unpack(msg))
     28 end
     29 
     30 local function message_dispatch(f)
     31     while true do
     32         if #message_queue==0 then  //没消息则挂起
     33             thread_id = coroutine.running()
     34             skynet.wait()
     35         else
     36             local msg = table.remove(message_queue,1)  //依次处理消息
     37             local session = msg.session
     38             if session == 0 then  //不需要响应
     39                 local ok, msg = do_func(f, msg)
     40                 if ok then
     41                     if msg then
     42                         skynet.fork(message_dispatch,f)
     44                     end
     45                 else
     46                     skynet.fork(message_dispatch,f)
     48                 end
     49             else
     50                 local data, size = skynet.pack(do_func(f,msg))
     51                 -- 1 means response
     52                 c.send(msg.addr, 1, session, data, size) //需要响应
     53             end
     54         end
     55     end
     56 end
    

    上面代码实现细节不作过多分析,简单注释了下,大致就是从table数组中remove前面的消息并处理之,如果会挂起则等响应结果或超时,再处理下一条。

    如上面的实现,新消息来了fork一个协程处理:

    533 function skynet.fork(func,...)
    534     local args = table.pack(...)  //打包参数
    535     local co = co_create(function()
    536         func(table.unpack(args,1,args.n)) //设置协程执行函数和参数
    537     end)
    538     table.insert(fork_queue, co) //回收协程资源
    539     return co
    540 end
    
    104 local function co_create(f)
    105     local co = table.remove(coroutine_pool)
    106     if co == nil then
    107         co = coroutine.create(function(...)
    108             f(...)
    109             while true do
    110                 local session = session_coroutine_id[co]
    111                 if session and session ~= 0 then
    112                     local source = debug.getinfo(f,"S")
                            //log error
    117                 end
    118                 f = nil
    119                 coroutine_pool[#coroutine_pool+1] = co
    120                 f = coroutine_yield "EXIT"
    121                 f(coroutine_yield())
    122             end
    123         end)
    124     else
    125         coroutine_resume(co, f)
    126     end
    127     return co
    128 end
    

    上面co_create就从协程池中取一个协程对象处理消息,如果没有协程对象则创建。你一定会好奇执行完后,返回结果在哪?

    对于lua的协程api,当create协程时它的状态还没开始,处于挂起suspended状态,然后resume后会处理running状态,执行完后为dead状态,引用下面的:
    a)coroutine.create(arg):根据一个函数创建一个协同程序,参数为一个函数;
    b)coroutine.resume(co):使协同从挂起变为运行(1)激活coroutine,也就是让协程函数开始运行;(2)唤醒yield,使挂起的协同接着上次的地方继续运行。该函数可以传入参数;
    c)coroutine.yield():使正在运行的协同挂起,可以传入参数;

    而真正强大之处在于当第二次resume时,resume和yield相关交换数据,具体怎么交互的建议看下lua协程基础。

    在skynet中进行了对lua原始协程api进行封装并管理,下面说明第二个小点,当然会把第一小点也部分说明下,毕竟是个整体,从创建到处理到回收,以及中间的注意点。通过几个常用的接口来说明这套工作流程。

    以下实现是wakeup相关:

    493 function skynet.wakeup(token)
    494     if sleep_session[token] then
    495         table.insert(wakeup_queue, token) //在下一次suspend时被处理
    496         return true
    497     end
    498 end
    
    339 function skynet.wait(token)
    340     local session = c.genid()
    341     token = token or coroutine.running()
    342     local ret, msg = coroutine_yield("SLEEP", session, token)//切出协程(A)
    343     sleep_session[token] = nil  //协程切回来重置相关数据
    344     session_id_coroutine[session] = nil
    345 end
    
    130 local function dispatch_wakeup()
    131     local token = table.remove(wakeup_queue,1)
    132     if token then
    133         local session = sleep_session[token]
    134         if session then
    135             local co = session_id_coroutine[session]
    136             local tag = session_coroutine_tracetag[co]
    137             if tag then c.trace(tag, "resume") end
    138             session_id_coroutine[session] = "BREAK"
    139             return suspend(co, coroutine_resume(co, false, "BREAK"))(B)  调度被挂起的协程
    140         end
    141     end
    142 end
    
    157 function suspend(co, result, command, param, param2)
            //more code
    183     elseif command == "SLEEP" then
    184         local tag = session_coroutine_tracetag[co]
    185         if tag then c.trace(tag, "sleep", co, 2) end
    186         session_id_coroutine[param] = co
    187         if sleep_session[param2] then
    188             error(debug.traceback(co, "token duplicative"))
    189         end
    190         sleep_session[param2] = param
    307     dispatch_wakeup()
    308     dispatch_error_queue()
    309 end
    

    把要唤醒的协程通过token插入到wakeup_queue数组中(注意下,很多实现逻辑是使用table的数组部分,因为有序但带来的问题是从索引x处删除元素后,涉及到移动)

    然后dispatch_wakeup会处理wakeup_queue,重点是这一句return suspend(co, coroutine_resume(co, false, "BREAK")),这部分在后面分析。
    (A)处把当前协程切出去后,那三个参数作为主协程的返回值,即coroutine_resume的返回值,再加一个本身返回的true or false,然后调用suspend,同理coroutine_resume的后两个参数作为coroutine_yield的返回值。

    以上部分还是比较容易理解,这里可以结合c++协程中的实现,有专门的协程调度器,要么超时要么有数据过来(响应)进而切回相应的协程处理。

    不过经历过的项目貌似没有那种加限时的请求,如果call长时间收不到响应,可能会出问题,这个需要多研究下。不过,结合skynet基础实现也好办;另外底层框架也是skynet,lua层的源码部分都有返回,不管正确还是失败都会返回,除非这条call请求消息根本没有被目标服务的消息队列收到(可能出错),或者没有被工作线程调度,再或者没有被上层服务处理;前者可能基本为零,第一种可能性不大,因为框架已经保证消息一定会被发送到消息队列中(消息队列目前是无界的),而后面两种可能确实存在,比如一个死循环或者处理耗时的功能等,这些只能靠开发人员注意及必要code review了。

    311 function skynet.timeout(ti, func)
    312     local session = c.intcommand("TIMEOUT",ti)
    313     assert(session)
    314     local co = co_create(func)
    315     assert(session_id_coroutine[session] == nil)
    316     session_id_coroutine[session] = co
    317 end
    318 
    319 function skynet.sleep(ti, token)
    320     local session = c.intcommand("TIMEOUT",ti)
    321     assert(session)
    322     token = token or coroutine.running()
    323     local succ, ret = coroutine_yield("SLEEP", session, token)
    324     sleep_session[token] = nil
    325     if succ then
    326         return
    327     end
    328     if ret == "BREAK" then
    329         return "BREAK"
    330     else
    331         error(ret)
    332     end
    333 end
    

    上面就是超时的实现,也即弄个协程,向skynet框架注册个定时器,当超时时,发条消息到上层,上层创建协程处理。这个跟c++协程一样,实现中不能有sleep这种调用,只能用超时,然后挂到事件列表中,超时resume协程回调,不然阻塞其他。

    剩下的不过多分析,这三篇只是简单分析了个大概,还有蛮多值得学习,关键在于思考为什么要这么做,可以根据自己的经验,去尝试改进或在github上提pr,分析别人的设计,可能并不像作者一路踩坑过来,并持续重构那样,恰到好处的设计。

    接下来的一篇准备研究下锁的性能,主要是对前几天学习的一个总结。

    skynet 中 Lua 服务的消息处理
    Lua中的协同程序 coroutine
    Lua Coroutine详解
    skynet 里的 coroutine
    skynet coroutine 运行笔记

    相关文章

      网友评论

          本文标题:浅析skynet底层框架下篇

          本文链接:https://www.haomeiwen.com/subject/xpsryftx.html