美文网首页
Golang 游戏leaf系列(三) NewAgent在chan

Golang 游戏leaf系列(三) NewAgent在chan

作者: 合肥黑 | 来源:发表于2019-05-28 17:47 被阅读0次

    Golang 游戏leaf系列(二) 网络消息流程概述中(下文简称系列二),提到了Module接口

    type Module interface {
        OnInit()
        OnDestroy()
        Run(closeSig chan bool)
    }
    

    其中gate这个module,通过组合方式实现了Module接口,即自己项目里实现OnInit方法,通过匿名结构体gate.Gate在源码里实现OnDestroy和Run方法。那么其它的module是怎么实现的,比如game这个module。

    一、game module实现Module接口
    //game/module.go
    var (
        skeleton = base.NewSkeleton()
        ChanRPC  = skeleton.ChanRPCServer
    )
    
    type Module struct {
        *module.Skeleton
    }
    
    func (m *Module) OnInit() {
        m.Skeleton = skeleton
    }
    
    func (m *Module) OnDestroy() {
    }
    

    很明显,Run方法交给匿名的Skeleton去实现了。并且因为先执行OnInit方法,这个skeleton实际上是base.NewSkeleton()。

    //src/server/base/skeleton.go
    func NewSkeleton() *module.Skeleton {
        skeleton := &module.Skeleton{
            GoLen:              conf.GoLen,
            TimerDispatcherLen: conf.TimerDispatcherLen,
            AsynCallLen:        conf.AsynCallLen,
            ChanRPCServer:      chanrpc.NewServer(conf.ChanRPCLen),
        }
        skeleton.Init()
        return skeleton
    }
    

    这里涉及了chanrpc和skeleton两个重要功能。根据参数初始化完成后,又调用skeleton的Init方法。根据系列二解析,作为一个Module,执行完Init,就会去执行Run了。

    二、skeleton.Run()
    func (s *Skeleton) Run(closeSig chan bool) {
        for {
            select {
            case <-closeSig:
                s.commandServer.Close()
                s.server.Close()
                for !s.g.Idle() || !s.client.Idle() {
                    s.g.Close()
                    s.client.Close()
                }
                return
            case ri := <-s.client.ChanAsynRet:
                s.client.Cb(ri)
            case ci := <-s.server.ChanCall:
                s.server.Exec(ci)
            case ci := <-s.commandServer.ChanCall:
                s.commandServer.Exec(ci)
            case cb := <-s.g.ChanCb:
                s.g.Cb(cb)
            case t := <-s.dispatcher.ChanTimer:
                t.Cb()
            }
        }
    }
    

    这都是啥啊,没有注释,懵逼中……不过可以猜一猜,根据select多路利用及读取通道箭头,可以猜测是在等一个通道写入后,去做某个操作。结合官方文档,作者确实说过不同module间的通信使用的是channel。那么,通道是什么时候写入的?

    三、通道的写入

    在系列二中说过,tcpServer执行NewAgent时,实际会转交给gate.AgentChanRPC去执行,也就是例子中的game.ChanRPC。转交方式是.Go("NewAgent", a),就像抛出一个事件一样,有一个名称,有一个参数。

    //chanrpc/chanrpc.go
    // goroutine safe
    func (s *Server) Go(id interface{}, args ...interface{}) {
        f := s.functions[id]
        if f == nil {
            return
        }
    
        defer func() {
            recover()
        }()
    
        s.ChanCall <- &CallInfo{
            f:    f,
            args: args,
        }
    }
    

    找到了,在向ChanCall通道写入一个CallInfo,这会导致Run方法中

    case ci := <-s.commandServer.ChanCall:
    s.commandServer.Exec(ci)
    

    开始执行,还是看看Skeleton 和 chanrpc相关结构吧。

    四、chanrpc和skeleton
    //chanrpc.go
    type RetInfo struct {
        ret interface{}
        err error
        cb interface{}
    }
    
    type CallInfo struct {
        f       interface{}
        args    []interface{}
        chanRet chan *RetInfo
        cb      interface{}
    }
    
    type Server struct {
        functions map[interface{}]interface{}
        ChanCall  chan *CallInfo
    }
    
    type Client struct {
        s               *Server
        chanSyncRet     chan *RetInfo
        ChanAsynRet     chan *RetInfo
        pendingAsynCall int
    }
    
    func NewServer(l int) *Server {
        s := new(Server)
        s.functions = make(map[interface{}]interface{})
        s.ChanCall = make(chan *CallInfo, l)
        return s
    }
    
    // you must call the function before calling Open and Go
    func (s *Server) Register(id interface{}, f interface{}) {
        ...
        s.functions[id] = f
    }
    

    看到了Server结构体,也看到了Register方法,想想系列二那个注册吧

    //game.internal.chanrpc.go
    func init() {
        skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
        skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
    }
    
    func rpcNewAgent(args []interface{}) {
        a := args[0].(gate.Agent)
        _ = a
    }
    
    func rpcCloseAgent(args []interface{}) {
        a := args[0].(gate.Agent)
        _ = a
    }
    

    实际上,skeleton只是转交给s.server

    func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
        if s.ChanRPCServer == nil {
            panic("invalid ChanRPCServer")
        }
    
        s.server.Register(id, f)
    }
    
    

    所以调用Register方法,就是以id当作key,放到了functions的映射中。

    再看看Skeleton结构体和它的Init方法:

    type Skeleton struct {
        GoLen              int
        TimerDispatcherLen int
        AsynCallLen        int
        ChanRPCServer      *chanrpc.Server
        g                  *g.Go
        dispatcher         *timer.Dispatcher
        client             *chanrpc.Client
        server             *chanrpc.Server
        commandServer      *chanrpc.Server
    }
    
    func (s *Skeleton) Init() {
        if s.GoLen <= 0 {
            s.GoLen = 0
        }
        if s.TimerDispatcherLen <= 0 {
            s.TimerDispatcherLen = 0
        }
        if s.AsynCallLen <= 0 {
            s.AsynCallLen = 0
        }
    
        s.g = g.New(s.GoLen)
        s.dispatcher = timer.NewDispatcher(s.TimerDispatcherLen)
        s.client = chanrpc.NewClient(s.AsynCallLen)
        s.server = s.ChanRPCServer
    
        if s.server == nil {
            s.server = chanrpc.NewServer(0)
        }
        s.commandServer = chanrpc.NewServer(0)
    }
    
    

    现在,可以理一理,系列二中NewAgent那个消息,是怎么通讯的。以下按顺序说明:

    • main.go中调用leaf.Run,第一个参数是game.Module,这样触发game模块的external.go中Module = new(internal.Module)
    • game模块的module开始初始化,先是base.NewSkeleton,这里面进一步执行Skeleton本身的Init(),重点关注执行到s.server = chanrpc.NewServer(0),这里根据NewServer源码s.ChanCall = make(chan *CallInfo, l)得知,生成的通道是个无缓冲的channel,类型为CallInfo。
    • external.go中的第二行ChanRPC = internal.ChanRPC触发其init,在里面调用skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)。最终是以"NewAgent"当作key,放到了s.server.functions的映射中。
    • game模块初始化完毕,开始执行Run方法,ci := <-s.server.ChanCall阻塞,等待写入一个CallInfo
    • 在系列二中说过,gate模块指定了它的AgentChanRPC是game.ChanRPC。这样tcpServer执行NewAgent时,实际会转交给gate.AgentChanRPC去执行,也就是例子中的game.ChanRPC。game.ChanRPC是通过external.go暴露的,实际上,指向的是ChanRPC = skeleton.ChanRPCServer。结合skeleton结构体和Init方法,s.server = s.ChanRPCServer,也就是一样的。最终转交方式是gate.AgentChanRPC.Go("NewAgent", a),实际上就是由gate模块指定了game模块的skeleton.server去执行Go方法
    // goroutine safe
    func (s *Server) Go(id interface{}, args ...interface{}) {
        f := s.functions[id]
        if f == nil {
            return
        }
    
        defer func() {
            recover()
        }()
    
        s.ChanCall <- &CallInfo{
            f:    f,
            args: args,
        }
    }
    
    • Run方法里的通道读取到了CallInfo,开始执行s.server.Exec(ci)
    func (s *Server) Exec(ci *CallInfo) {
        err := s.exec(ci)
        if err != nil {
            log.Error("%v", err)
        }
    }
    
    func (s *Server) exec(ci *CallInfo) (err error) {
        defer func() {
            if r := recover(); r != nil {
                if conf.LenStackBuf > 0 {
                    buf := make([]byte, conf.LenStackBuf)
                    l := runtime.Stack(buf, false)
                    err = fmt.Errorf("%v: %s", r, buf[:l])
                } else {
                    err = fmt.Errorf("%v", r)
                }
    
                s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)})
            }
        }()
    
        // execute
        switch ci.f.(type) {
        case func([]interface{}):
            ci.f.(func([]interface{}))(ci.args)
            return s.ret(ci, &RetInfo{})
        case func([]interface{}) interface{}:
            ret := ci.f.(func([]interface{}) interface{})(ci.args)
            return s.ret(ci, &RetInfo{ret: ret})
        case func([]interface{}) []interface{}:
            ret := ci.f.(func([]interface{}) []interface{})(ci.args)
            return s.ret(ci, &RetInfo{ret: ret})
        }
    
        panic("bug")
    }
    

    相关文章

      网友评论

          本文标题:Golang 游戏leaf系列(三) NewAgent在chan

          本文链接:https://www.haomeiwen.com/subject/pkudtctx.html