ChanRPC实现模块(Module)goroutine间的通信
为了进一步分析Leaf游戏服务器,我们需要了解Leaf的ChanRPC
Leaf中每个模块在独立的goroutine间运行,Leaf提供了基于channel的RPC机制来实现模块间的相互通信
chanrpc功能的实现(Server为例)
// leaf\chanrpc\chanrpc.go
type Server struct {
// id -> function
//
// function:
// func(args []interface{})
// func(args []interface{}) interface{}
// func(args []interface{}) []interface{}
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
Server类型的变量会维护一个函数映射关系functions
,通过接受到的消息去调用相应注册的函数,而ChanCall
通道则用来搭建通道间的信息传递桥梁
Server
的Register
方法实现了对functions
的注册
// leaf\chanrpc\chanrpc.go
// you must call the function before calling Open and Go
func (s *Server) Register(id interface{}, f interface{}) {
switch f.(type) {
case func([]interface{}):
case func([]interface{}) interface{}:
case func([]interface{}) []interface{}:
default:
panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
}
if _, ok := s.functions[id]; ok {
panic(fmt.Sprintf("function id %v: already registered", id))
}
s.functions[id] = f
}
ChanCall
参数则将在某个地方进行通道数据的读取和写入,来达到通道间通信的功能
例如使用GO调用,往通道里写入数据,数据包括要进行回调的functions中的函数及其参数
// leaf\chanrpc\chanrpc.go
func (s *Server) Go(id interface{}, args ...interface{}) {
// 拿到要调用的函数
f := s.functions[id]
if f == nil {
return
}
defer func() {
recover()
}()
// 往通道里写入数据 函数 和 参数
s.ChanCall <- &CallInfo{
f: f,
args: args,
}
}
而在写入数据之前,必定有在程序的某个地方通过这个通道等待接收数据,LeafServer中在进入通过Skeleton模块建立的Module的Run()
生命周期中,在模块的goroutine中就会等待接收数据,接收到数据后执行Exec()来进行处理,Skeleton相关内容将在后面的文章进行简析
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
// 等待来自通道的数据
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
Exec()
调用相应函数
// leaf\chanrpc\chanrpc.go
func (s *Server) Exec(ci *CallInfo) {
err := s.exec(ci)
if err != nil {
log.Error("%v", err)
}
}
func (s *Server) exec(ci *CallInfo) (err error) {
defer func() {
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
err = fmt.Errorf("%v: %s", r, buf[:l])
} else {
err = fmt.Errorf("%v", r)
}
s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)})
}
}()
// execute
switch ci.f.(type) {
case func([]interface{}):
ci.f.(func([]interface{}))(ci.args)
return s.ret(ci, &RetInfo{})
case func([]interface{}) interface{}:
ret := ci.f.(func([]interface{}) interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
case func([]interface{}) []interface{}:
ret := ci.f.(func([]interface{}) []interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
}
panic("bug")
}
LeafServer中的chanrpc使用举例
简单讲解LeafServer中gate.Module与game.Module的通信,下文中的目录server
为LeafServer项目根目录
首先程序一开始在game模块中注册了相应的ChanRPC消息和响应函数(Go pkg里的init()
函数会在main()
之前执行),game模块注册了NewAgent 和CloseAgent 两个ChanRPC,分别执行用户建立连接和断开链接的相关逻辑
// server\game\internal\chanrpc.go
func init() {
skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
}
func rpcNewAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
func rpcCloseAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
// skeleton.RegisterChanRPC()方法调用的是上文提到的Server.Register
// leaf\module\skeleton.go
func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
if s.ChanRPCServer == nil {
panic("invalid ChanRPCServer")
}
// 注册
s.server.Register(id, f)
}
注册完ChanRPC后,在程序运行起来后,会按照Module生命周期进到如前文所说的Run
中等待数据被写入通道
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
// ...
// 等待来自通道的数据
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
//...
}
}
}
gate模块中当用户连接时将调用Server.Go
函数像game模块发送消息
// leaf\gate\gate.go
gate.AgentChanRPC.Go("NewAgent", a)
// leaf\chanrpc\chanrpc.go
func (s *Server) Go(id interface{}, args ...interface{}) {
// ...
// 往通道里写入数据 函数 和 参数
s.ChanCall <- &CallInfo{
f: f,
args: args,
}
}
调用了Server.GO
后,在Run()
里面等待的通道就可以拿到消息,从而执行相应的处理函数了
而对于gate.AgentChanRPC则在gate模块的初始化中就指向了game模块的ChanRPC:
// server\gate\internal\module.go
func (m *Module) OnInit() {
m.Gate = &gate.Gate{
// ...
AgentChanRPC: game.ChanRPC, //注册为game的ChanRPC,于是就可以通过gate.AgentChanRPC.Go("NewAgent", a)和game模块通信
}
}
小结
本文仅仅只是简单梳理了一下ChanRPC通信的流程,回头看整个过程其实很清晰,程序初始化的时候模块注册好需要的ChanRPC,在生命周期Run()里等待模块的ChanRPC里的通道的信号.其他模块往等待信号的通道里发送内容,等待信号的模块拿到内容后调用注册好的函数.抛砖引玉了一下,欢迎交流讨论
网友评论