这是最后一篇了,其实还有很多重要的模块要分析的,但留给以后有多余时间再去研究吧,有兴趣的可以自行下载源码分析。这部分主要是围绕第三小问题展开,并附加些其他skynet中与此有关的设计,即:当并发时,如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);包括创建协程处理消息,挂起协程,切换。这块其实是针对lua上层来说的,底层框架的消息队列只是保证消息顺序入队列且出队列,如果交叉执行比如lua层的协程挂起,那么就会出现时序问题。
先简单回顾下前几篇博客的分析,包括skynet本身的设计,及C++协程。对于C++协程,比如一个请求a过来后,从协程池中pop一个协程并处理该请求a,如果需要等待,则让出协程并挂上定时器,然后再处理下一个请求b,如果此时a和b是相关联的,且b有可能依赖于a的执行结果,那么就会出现问题。这对于游戏中的业务来说,尤其涉及到金钱相关的逻辑,那是大问题。而那种独立的请求间,只是读之类的操作,那是没问题的。如果需要结合业务,那么就需要改造。
而对于skynet来说,当并发上来时,考虑到这个时序问题,底层实现相关的顺序队列,大概思路就是lua协程执行a到一半后,哪怕有b的消息被协程调度处理,此时会把这个b协程压入队列(lua中的table也可以,使用数组部分),必须等a执行完毕或超时后,再处理b的,也就是在业务上层串行化了服务的消息处理。这样保证了时序。
但这又引起了另一个问题,即可能存在后面的消息都超时了,然而上层如果无法识别继续处理,那么就白白浪费了资源,处理了无用的消息。这类的相关介绍在另一篇“谈谈缓存穿透雪崩和过载保护以及一致性等问题”中有相关的介绍及应对方案。
本节分为两个小点讨论,即:
1)如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);
2)创建协程处理消息,挂起协程,切换;
第一小点,撇开语言方面的限制,考虑skynet本身的框架设计,而不掺杂业务框架的设计。对于单进程多线程,要想并发的处理同一个客户端的请求,不管是读还是写,都必须路由到同一个线程处理,这样就保证了不会导致同一个client的请求分发到不同的线程,在skynet底层抽象client为一个agent service,有自己的消息队列,并且当工作线程处理这个agent消息时,先把这个消息队列从全局队列出摘出来,从这个队列pop一条消息,处理完毕后,再把这个消息队列挂到全局队列中;而对于push消息到agent队列则没有这种过程,只要获得自旋锁即可,相关源码可以见前面的分析。
这一层就保证了消息不会乱序,但是对于业务层,使用lua协程来提高并发,那么就要好好设计。
这里举例比如在主场景中,这样可以考虑到client的所有消息都路由到场景后需要考虑到的时序问题。
当与client有关的两条有依赖关系的消息a和b被场景服务dispatch分发处理时,不考虑读还是写,都会创建一个协程,并执行相关的处理函数。比如数据安全性不是特别严重的例子,玩家在帮派中,然后点领取今日奖励b消息,此时帮主把玩家踢出帮派a消息,本来是a先执行完毕后再b执行的顺序,这时可能出现a先执行导致挂起,而b执行完毕后,接着执行a的情况,多领了一份奖励。当然这里只是为了举例,通过检查可以避免这种问题。
简单分析下,在skynet的做法中,为每个服务加个lua层的消息队列,进入该队列的消息会被依次处理完毕,不管中间是否挂起,这样带来的问题是,并发度降底了且引入了一定的复杂度。
17 dispatch = function(session, from, ...)
18 table.insert(message_queue, {session = session, addr = from, ... })
19 if thread_id then //有消息,如果有等待则wakeup
20 skynet.wakeup(thread_id)
21 thread_id = nil
22 end
23 end
26 local function do_func(f, msg)
27 return pcall(f, table.unpack(msg))
28 end
29
30 local function message_dispatch(f)
31 while true do
32 if #message_queue==0 then //没消息则挂起
33 thread_id = coroutine.running()
34 skynet.wait()
35 else
36 local msg = table.remove(message_queue,1) //依次处理消息
37 local session = msg.session
38 if session == 0 then //不需要响应
39 local ok, msg = do_func(f, msg)
40 if ok then
41 if msg then
42 skynet.fork(message_dispatch,f)
44 end
45 else
46 skynet.fork(message_dispatch,f)
48 end
49 else
50 local data, size = skynet.pack(do_func(f,msg))
51 -- 1 means response
52 c.send(msg.addr, 1, session, data, size) //需要响应
53 end
54 end
55 end
56 end
上面代码实现细节不作过多分析,简单注释了下,大致就是从table数组中remove前面的消息并处理之,如果会挂起则等响应结果或超时,再处理下一条。
如上面的实现,新消息来了fork一个协程处理:
533 function skynet.fork(func,...)
534 local args = table.pack(...) //打包参数
535 local co = co_create(function()
536 func(table.unpack(args,1,args.n)) //设置协程执行函数和参数
537 end)
538 table.insert(fork_queue, co) //回收协程资源
539 return co
540 end
104 local function co_create(f)
105 local co = table.remove(coroutine_pool)
106 if co == nil then
107 co = coroutine.create(function(...)
108 f(...)
109 while true do
110 local session = session_coroutine_id[co]
111 if session and session ~= 0 then
112 local source = debug.getinfo(f,"S")
//log error
117 end
118 f = nil
119 coroutine_pool[#coroutine_pool+1] = co
120 f = coroutine_yield "EXIT"
121 f(coroutine_yield())
122 end
123 end)
124 else
125 coroutine_resume(co, f)
126 end
127 return co
128 end
上面co_create就从协程池中取一个协程对象处理消息,如果没有协程对象则创建。你一定会好奇执行完后,返回结果在哪?
对于lua的协程api,当create协程时它的状态还没开始,处于挂起suspended状态,然后resume后会处理running状态,执行完后为dead状态,引用下面的:
a)coroutine.create(arg):根据一个函数创建一个协同程序,参数为一个函数;
b)coroutine.resume(co):使协同从挂起变为运行(1)激活coroutine,也就是让协程函数开始运行;(2)唤醒yield,使挂起的协同接着上次的地方继续运行。该函数可以传入参数;
c)coroutine.yield():使正在运行的协同挂起,可以传入参数;
而真正强大之处在于当第二次resume时,resume和yield相关交换数据,具体怎么交互的建议看下lua协程基础。
在skynet中进行了对lua原始协程api进行封装并管理,下面说明第二个小点,当然会把第一小点也部分说明下,毕竟是个整体,从创建到处理到回收,以及中间的注意点。通过几个常用的接口来说明这套工作流程。
以下实现是wakeup
相关:
493 function skynet.wakeup(token)
494 if sleep_session[token] then
495 table.insert(wakeup_queue, token) //在下一次suspend时被处理
496 return true
497 end
498 end
339 function skynet.wait(token)
340 local session = c.genid()
341 token = token or coroutine.running()
342 local ret, msg = coroutine_yield("SLEEP", session, token)//切出协程(A)
343 sleep_session[token] = nil //协程切回来重置相关数据
344 session_id_coroutine[session] = nil
345 end
130 local function dispatch_wakeup()
131 local token = table.remove(wakeup_queue,1)
132 if token then
133 local session = sleep_session[token]
134 if session then
135 local co = session_id_coroutine[session]
136 local tag = session_coroutine_tracetag[co]
137 if tag then c.trace(tag, "resume") end
138 session_id_coroutine[session] = "BREAK"
139 return suspend(co, coroutine_resume(co, false, "BREAK"))(B) 调度被挂起的协程
140 end
141 end
142 end
157 function suspend(co, result, command, param, param2)
//more code
183 elseif command == "SLEEP" then
184 local tag = session_coroutine_tracetag[co]
185 if tag then c.trace(tag, "sleep", co, 2) end
186 session_id_coroutine[param] = co
187 if sleep_session[param2] then
188 error(debug.traceback(co, "token duplicative"))
189 end
190 sleep_session[param2] = param
307 dispatch_wakeup()
308 dispatch_error_queue()
309 end
把要唤醒的协程通过token插入到wakeup_queue
数组中(注意下,很多实现逻辑是使用table的数组部分,因为有序但带来的问题是从索引x处删除元素后,涉及到移动)
然后dispatch_wakeup
会处理wakeup_queue
,重点是这一句return suspend(co, coroutine_resume(co, false, "BREAK"))
,这部分在后面分析。
(A)处把当前协程切出去后,那三个参数作为主协程的返回值,即coroutine_resume
的返回值,再加一个本身返回的true or false,然后调用suspend
,同理coroutine_resume
的后两个参数作为coroutine_yield
的返回值。
以上部分还是比较容易理解,这里可以结合c++协程中的实现,有专门的协程调度器,要么超时要么有数据过来(响应)进而切回相应的协程处理。
不过经历过的项目貌似没有那种加限时的请求,如果call长时间收不到响应,可能会出问题,这个需要多研究下。不过,结合skynet基础实现也好办;另外底层框架也是skynet,lua层的源码部分都有返回,不管正确还是失败都会返回,除非这条call请求消息根本没有被目标服务的消息队列收到(可能出错),或者没有被工作线程调度,再或者没有被上层服务处理;前者可能基本为零,第一种可能性不大,因为框架已经保证消息一定会被发送到消息队列中(消息队列目前是无界的),而后面两种可能确实存在,比如一个死循环或者处理耗时的功能等,这些只能靠开发人员注意及必要code review了。
311 function skynet.timeout(ti, func)
312 local session = c.intcommand("TIMEOUT",ti)
313 assert(session)
314 local co = co_create(func)
315 assert(session_id_coroutine[session] == nil)
316 session_id_coroutine[session] = co
317 end
318
319 function skynet.sleep(ti, token)
320 local session = c.intcommand("TIMEOUT",ti)
321 assert(session)
322 token = token or coroutine.running()
323 local succ, ret = coroutine_yield("SLEEP", session, token)
324 sleep_session[token] = nil
325 if succ then
326 return
327 end
328 if ret == "BREAK" then
329 return "BREAK"
330 else
331 error(ret)
332 end
333 end
上面就是超时的实现,也即弄个协程,向skynet框架注册个定时器,当超时时,发条消息到上层,上层创建协程处理。这个跟c++协程一样,实现中不能有sleep这种调用,只能用超时,然后挂到事件列表中,超时resume协程回调,不然阻塞其他。
剩下的不过多分析,这三篇只是简单分析了个大概,还有蛮多值得学习,关键在于思考为什么要这么做,可以根据自己的经验,去尝试改进或在github上提pr,分析别人的设计,可能并不像作者一路踩坑过来,并持续重构那样,恰到好处的设计。
接下来的一篇准备研究下锁的性能,主要是对前几天学习的一个总结。
skynet 中 Lua 服务的消息处理
Lua中的协同程序 coroutine
Lua Coroutine详解
skynet 里的 coroutine
skynet coroutine 运行笔记
网友评论