美文网首页
Golang 游戏leaf系列(十) mongodb模块 使用m

Golang 游戏leaf系列(十) mongodb模块 使用m

作者: 合肥黑 | 来源:发表于2019-06-04 14:24 被阅读0次

    Golang MongoDB中简单介绍了https://github.com/globalsign/mgo

    package main
    
    import (
        "fmt"
        "log"
    
        "gopkg.in/mgo.v2"
        "gopkg.in/mgo.v2/bson"
    )
    
    type Person struct {
        Name  string
        Phone string
    }
    
    func main() {
        //连接
        session, err := mgo.Dial("localhost:27017")
        if err != nil {
            panic(err)
        }
        defer session.Close()
    
        // Optional. Switch the session to a monotonic behavior.
        session.SetMode(mgo.Monotonic, true)
    
        //通过Database.C()方法切换集合(Collection)
        //func (db Database) C(name string) *Collection
        c := session.DB("test").C("people")
    
        //插入
        //func (c *Collection) Insert(docs ...interface{}) error
        err = c.Insert(&Person{"superWang", "13478808311"},
            &Person{"David", "15040268074"})
        if err != nil {
            log.Fatal(err)
        }
    
        result := Person{}
        //查询
        //func (c Collection) Find(query interface{}) Query
        err = c.Find(bson.M{"name": "superWang"}).One(&result)
        if err != nil {
            log.Fatal(err)
        }
    
        fmt.Println("Name:", result.Name)
        fmt.Println("Phone:", result.Phone)
    }
    ----------------------
    Name: superWang
    Phone: 13478808311
    

    leaf中使用的也是这个驱动,并且作了一些封装

    一、mgo基础
    1.mgo 模式

    参考
    一日一学_Go语言mgo(mongo场景应用)
    mgo mode说明
    session 能够和 mongodb 集群中的所有Server通讯。session设置的模式分别为:

    • Strong(默认使用)
      session 的读写一直向主服务器发起并使用一个唯一的连接,因此所有的读写操作完全的一致。
    • Monotonic
      session 的读操作开始是向其他服务器发起(且通过一个唯一的连接),只要出现了一次写操作,session 的连接就会切换至主服务器。由此可见此模式下,能够分散一些读操作到其他服务器,但是读操作不一定能够获得最新的数据。
    • Eventual
      session 的读操作会向任意的其他服务器发起,多次读操作并不一定使用相同的连接,也就是读操作不一定有序。session 的写操作总是向主服务器发起,但是可能使用不同的连接,也就是写操作也不一定有序。

    leaf里没有搜索到session.SetMode(mgo.Monotonic, true)这种设置mode的代码,所以使用了默认模式Strong。

    2.New或者Copy创建session

    参考
    mgo 的 session 与连接池
    mgo 的 session 与连接池
    为什么要在每次使用时都Copy,而不是直接使用Dial生成的session实例呢?个人认为,这与mgo.Session的Socket缓存机制有关。来看Session的核心数据结构。

    
    type Session struct {
    
        m                sync.RWMutex
    
        ...
    
        slaveSocket      *mongoSocket
    
        masterSocket     *mongoSocket
    
        ...
    
        consistency      Mode
    
        ...
    
        poolLimit        int
    
        ...
    
    }
    

    这里列出了mgo.Session的五个私有成员变量,与Copy机制有关的是,m,slaveSocket,masterSocket。

    m是mgo.Session的并发锁,因此所有的Session实例都是线程安全的。

    slaveSocket,masterSocket代表了该Session到mongodb主节点和从节点的一个物理连接的缓存。而Session的策略总是优先使用缓存的连接。是否缓存连接,由consistency也就是该Session的模式决定。假设在并发程序中,使用同一个Session实例,不使用Copy,而该Session实例的模式又恰好会缓存连接,那么,所有的通过该Session实例的操作,都会通过同一条连接到达mongodb。虽然mongodb本身的网络模型是非阻塞通信,请求可以通过一条链路,非阻塞地处理;但经过比较简陋的性能测试,在mongodb3.0中,10条连接并发写比单条连接的效率高一倍(在mongodb3.4中基本没有差别)。所以,使用Session Copy的一个重要原因是,可以将请求并发地分散到多个连接中。

    以上只是效率问题,但第二个问题是致命的。mgo.Session缓存的一主一从连接,实例本身不负责维护。也就是说,当slaveSocket,masterSocket任意其一,连接断开,Session自己不会重置缓存,该Session的使用者如果不主动重置缓存,调用者得到的将永远是EOF。这种情况在主从切换时就会发生,在网络抖动时也会发生。在业务代码中主动维护数据库Session的可用性,显然是不招人喜欢的。

    // See the Copy and Clone methods.
    //
    func (s *Session) New() *Session {
        s.m.Lock()
        scopy := copySession(s, false)
        s.m.Unlock()
        scopy.Refresh()
        return scopy
    }
    
    // Copy works just like New, but preserves the exact authentication
    // information from the original session.
    func (s *Session) Copy() *Session {
        s.m.Lock()
        scopy := copySession(s, true)
        s.m.Unlock()
        scopy.Refresh()
        return scopy
    }
    

    以上是Copy函数的实现,解决了使用全局Session的两个问题。其中,copySession将源Session浅拷贝到临时Session中,这样源Session的配置就拷贝到了临时Session中。关键的Refresh,将源Session浅拷贝到临时Session的连接缓存指针,也就是slaveSocket,masterSocket置为空,这样临时Session就不存在缓存连接,而转为去尝试获取一个空闲的连接。

    二、leaf的mongodb.go

    leaf用了一个切片来维护所有的session,在issues 请问下mongodb的db.Ref建立了好多和MongoDB的连接,只有一个玩家,不应该只建立少量连接吗?,作者回答如下:

    如果有 20 个数据库连接,然后有 1000 个用户,这时候会尽量让每个数据库连接服务 50 个用户。

    1.自动排序的切片

    Golang源码 container 系列三 heap堆排序介绍过堆排序,除了sort接口,还要额外实现一下push,pop

    // session
    type Session struct {
        *mgo.Session
        ref   int
        index int
    }
    
    // session heap
    type SessionHeap []*Session
    
    func (h SessionHeap) Len() int {
        return len(h)
    }
    
    func (h SessionHeap) Less(i, j int) bool {
        return h[i].ref < h[j].ref
    }
    
    func (h SessionHeap) Swap(i, j int) {
        h[i], h[j] = h[j], h[i]
        h[i].index = i
        h[j].index = j
    }
    
    func (h *SessionHeap) Push(s interface{}) {
        s.(*Session).index = len(*h)
        *h = append(*h, s.(*Session))
    }
    
    func (h *SessionHeap) Pop() interface{} {
        l := len(*h)
        s := (*h)[l-1]
        s.index = -1
        *h = (*h)[:l-1]
        return s
    }
    

    从Less里可以看出,ref引用次数越小,越会往切片的前面排。

    2.初始化
    type DialContext struct {
        sync.Mutex
        sessions SessionHeap
    }
    
    // goroutine safe
    func Dial(url string, sessionNum int) (*DialContext, error) {
        c, err := DialWithTimeout(url, sessionNum, 10*time.Second, 5*time.Minute)
        return c, err
    }
    
    // goroutine safe
    func DialWithTimeout(url string, sessionNum int, dialTimeout time.Duration,
     timeout time.Duration) (*DialContext, error) {
        if sessionNum <= 0 {
            sessionNum = 100
            log.Release("invalid sessionNum, reset to %v", sessionNum)
        }
    
        s, err := mgo.DialWithTimeout(url, dialTimeout)
        if err != nil {
            return nil, err
        }
        s.SetSyncTimeout(timeout)
        s.SetSocketTimeout(timeout)
    
        c := new(DialContext)
    
        // sessions
        c.sessions = make(SessionHeap, sessionNum)
        c.sessions[0] = &Session{s, 0, 0}
        for i := 1; i < sessionNum; i++ {
            c.sessions[i] = &Session{s.New(), 0, i}
        }
        heap.Init(&c.sessions)
    
        return c, nil
    }
    

    这里使用了s.New()对切片中的session进行了初始化

    3.使用
    // goroutine safe
    func (c *DialContext) Close() {
        c.Lock()
        for _, s := range c.sessions {
            s.Close()
            if s.ref != 0 {
                log.Error("session ref = %v", s.ref)
            }
        }
        c.Unlock()
    }
    
    // goroutine safe
    func (c *DialContext) Ref() *Session {
        c.Lock()
        s := c.sessions[0]
        if s.ref == 0 {
            s.Refresh()
        }
        s.ref++
        heap.Fix(&c.sessions, 0)
        c.Unlock()
    
        return s
    }
    
    // goroutine safe
    func (c *DialContext) UnRef(s *Session) {
        c.Lock()
        s.ref--
        heap.Fix(&c.sessions, s.index)
        c.Unlock()
    }
    

    调用Ref拿出来一个session,每次都从切片第一个元素拿,因为进行过堆排序,当然是ref引用次数最少的数据 被拿出来用了。可以看出,每次ref变化 时,都调用heap.Fix对相应的数据进行重排序。

    4.自增ID

    分布式数据库 分库分表 读写分离 UUID中介绍过分库分表后,全局唯一 id 如何生成:

    在分库分表之后,对于插入数据库中的核心 id,不能直接简单使用表自增 id,要全局生成唯一 id,然后插入各个表中,保证每个表内的某个 id,全局唯一。比如说订单表虽然拆分为了 1024 张表,但是 id = 50 这个订单,只会存在于一个表里。
    方案一:独立数据库自增 id
    方案二:UUID
    方案三:获取系统当前时间
    方案四、snowflake 算法

    MongoDB中_id(ObjectId)生成介绍了mongo的自动生成的字段:"_id"

    MongoDB 中我们经常会接触到一个自动生成的字段:"_id",类型为ObjectId。之前我们使用MySQL等关系型数据库时,主键都是设置成自增的。但在分布式环境下,这种方法就不可行了,会产生冲突。为此,mongodb采用了一个称之为ObjectId的类型来做主键。ObjectId是一个12字节的 BSON 类型字符串。按照字节顺序,一次代表:
    4字节:UNIX时间戳
    3字节:表示运行MongoDB的机器
    2字节:表示生成此_id的进程
    3字节:由一个随机数开始的计数器生成的值

    MongoDB 自动增长提供了mongo的解决方式:
    MongoDB 没有像 SQL 一样有自动增长的功能, MongoDB 的 _id 是系统自动生成的12字节唯一标识。但在某些情况下,我们可能需要实现 ObjectId 自动增长功能。由于 MongoDB 没有实现这个功能,我们可以通过编程的方式来实现,以下我们将在 counters 集合中实现_id字段自动增长。

    db.createCollection("counters")
    db.counters.insert({_id:"productid",sequence_value:0})
    function getNextSequenceValue(sequenceName){
       var sequenceDocument = db.counters.findAndModify(
          {
             query:{_id: sequenceName },
             update: {$inc:{sequence_value:1}},
             "new":true
          });
       return sequenceDocument.sequence_value;
    }
    

    在leaf中也提供了类似的解决方案:

    // goroutine safe
    func (c *DialContext) EnsureCounter(db string, collection string, id string) error {
        s := c.Ref()
        defer c.UnRef(s)
    
        err := s.DB(db).C(collection).Insert(bson.M{
            "_id": id,
            "seq": 0,
        })
        if mgo.IsDup(err) {
            return nil
        } else {
            return err
        }
    }
    
    // goroutine safe
    func (c *DialContext) NextSeq(db string, collection string, id string) (int, error) {
        s := c.Ref()
        defer c.UnRef(s)
    
        var res struct {
            Seq int
        }
        _, err := s.DB(db).C(collection).FindId(id).Apply(mgo.Change{
            Update:    bson.M{"$inc": bson.M{"seq": 1}},
            ReturnNew: true,
        }, &res)
    
        return res.Seq, err
    }
    
        // auto increment
        err = c.EnsureCounter("test", "counters", "test")
        if err != nil {
            fmt.Println(err)
            return
        }
        for i := 0; i < 3; i++ {
            id, err := c.NextSeq("test", "counters", "test")
            if err != nil {
                fmt.Println(err)
                return
            }
            fmt.Println(id)
        }
    ---------------
        // Output:
        // 1
        // 2
        // 3
    

    这里可以看到,自增ID可以有多个,都会放在counters集合里。

    三、db范例 https://github.com/xm-tech/leaf_db_example
    1.mongodb.go
    var mongoDB *mongodb.DialContext
    
    func init() {
        // mongodb
        if conf.Server.DBMaxConnNum <= 0 {
            conf.Server.DBMaxConnNum = 100
        }
        db, err := mongodb.Dial(conf.Server.DBUrl, conf.Server.DBMaxConnNum)
        if err != nil {
            log.Fatal("dial mongodb error: %v", err)
        }
        mongoDB = db
    ...
    

    做了一个包内变量mongoDB

    2.user.go,userdata.go
    type UserData struct {
        UserID int "_id"
        AccID  string
    }
    
    func (user *User) login(accID string) {
        userData := new(UserData)
        skeleton.Go(func() {
            db := mongoDB.Ref()
            defer mongoDB.UnRef(db)
    
            // load
            err := db.DB("game").C("users").
                Find(bson.M{"accid": accID}).One(userData)
    ...
    
    func (user *User) autoSaveDB() {
        const duration = 5 * time.Minute
    
        // save
        user.saveDBTimer = skeleton.AfterFunc(duration, func() {
            data := util.DeepClone(user.data)
            user.Go(func() {
                db := mongoDB.Ref()
                defer mongoDB.UnRef(db)
                userID := data.(*UserData).UserID
                _, err := db.DB("game").C("users").
                    UpsertId(userID, data)
                if err != nil {
                    log.Error("save user %v data error: %v", userID, err)
                }
            }, func() {
                user.autoSaveDB()
            })
        })
    }
    

    在mongodb.go的init方法中,曾经设置过唯一索引

        // users
        err = db.EnsureUniqueIndex("game", "users", []string{"accid"})
        if err != nil {
            log.Fatal("ensure index error: %v", err)
        }
    
    3.issure 请问给出的server_db_example_new里为什么用两个map保存User
    accIDUsers = make(map[string]*User)
    users = make(map[int]*User)
    

    方便通过账号 ID 和用户 ID 找到用户。有一些游戏,对此有需求,可以按实际情况修改。
    这里的账号ID是自增的,每次有新账户时,初始化一个出来:

    // new
    err := userData.initValue(accID)
    if err != nil {
        log.Error("init acc %v data error: %v", accID, err)
        userData = nil
        user.WriteMsg(&msg.S2C_Close{Err: msg.S2C_Close_InnerError})
        user.Close()
        return
    }
    
    func (data *UserData) initValue(accID string) error {
        userID, err := mongoDBNextSeq("users")
        if err != nil {
            return fmt.Errorf("get next users id error: %v", err)
        }
    
        data.UserID = userID
        data.AccID = accID
    
        return nil
    }
    
    4.其它issue

    issue db范例的login漏洞
    issue skeleton.Go和user.Go

    相关文章

      网友评论

          本文标题:Golang 游戏leaf系列(十) mongodb模块 使用m

          本文链接:https://www.haomeiwen.com/subject/wezkxctx.html