美文网首页
六,mongo事务

六,mongo事务

作者: alexgu | 来源:发表于2021-01-29 22:30 被阅读0次

    一,writeConcern

    writeConcern 决定一个写操作落到多少个节点上才算成功。writeConcern 的取值包括:
    • 0:发起写操作,不关心是否成功;
    • 1~集群最大数据节点数:写操作需要被复制到指定节点数才算成功;
    • majority:写操作需要被复制到大多数节点上才算成功。
    发起写操作的程序将阻塞到写操作到达指定的节点数为止

    1.1 writeConcern测试

    //进入主节点,执行
    conf=rs.conf()
    conf.members[2].slaveDelay = 5//延迟5秒
    conf.members[2].priority = 0//投票优先级设置为0
    rs.reconfig(conf)
    
    db.test.insertOne({count: 1}, {writeConcern: {w: 3}})//5秒后执行完成
    //返回
    {
            "acknowledged" : true,
            "insertedId" : ObjectId("60072bacd28a60ee717d761e")
    }
    
    db.test.insertOne({count:2},{writeConcern: {w: 4, wtimeout:3000}})//超过节点数
    //返回
    WriteConcernError({
            "code" : 100,
            "codeName" : "UnsatisfiableWriteConcern",
            "errmsg" : "Not enough data-bearing nodes",
            "errInfo" : {
                    "writeConcern" : {
                            "w" : 4,
                            "wtimeout" : 3000,
                            "provenance" : "clientSupplied"
                    }
            }
    })
    //查询
    db.test.find({count:2})
    //返回
    { "_id" : ObjectId("60072f63d28a60ee717d7623"), "count" : 2 }
    
    db.test.insertOne({count:3},{writeConcern: {w: 3, wtimeout:3000}})//设置3秒超时,返回会报错
    //返回
    WriteConcernError({
            "code" : 64,
            "codeName" : "WriteConcernFailed",
            "errmsg" : "waiting for replication timed out",
            "errInfo" : {
                    "wtimeout" : true,
                    "writeConcern" : {
                            "w" : 3,
                            "wtimeout" : 3000,
                            "provenance" : "clientSupplied"
                    }
            }
    })
    
    //主节点查询
    db.test.find({count:3})
    //返回
    { "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
    
    //过了5秒,进入第二个从节点执行
    db.test.find({count:3})
    //返回
    { "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
    

    writeConcern大于总节点数,或者等于总节点数但有一个节点故障,写入会报错,但数据还是会写入,但这种错误没有必要,重要数据设置成majority就可以了

    设置超时,虽然会报错,当数据实际上是主节点先写入,然后再等待其他节点同步数据时发生超时,数据还是已经入库了,也不会因为超时错误停止同步

    通常应对重要数据应用 {w: “majority”},普通数据可以应用 {w: 1} 以确保最佳性能。

    writeConcern写操作返回耗时受到同步及从节点性能影响,但并不会显著增加集群压力,因此无论是否等待,写操作最终都会复制到所有节点上。设置 writeConcern 只是让写操作等待复制后再返回而已

    二,读数据

    作为集群数据库读取时需要关注2个问题:
    1,去哪读-readPreference
    2,数据隔离性-readConcern

    2.1 readPreference

    readPreference 决定使用哪一个节点来满足正在发起的读请求。可选值包括:
    • primary: 只选择主节点;
    • primaryPreferred:优先选择主节点,如果不可用则选择从节点;
    • secondary:只选择从节点;
    • secondaryPreferred:优先选择从节点,如果从节点不可用则选择主节点;
    • nearest:选择最近的节点;
    除此之外还可以通过标签来选择读取节点

    2.1 readConcern

    • available: 读取所有可用数据;
    • local:读取属于当前分片的所有可用数据;
    • majority:读取大部分已提交的;
    • linearizable:线性读;
    • snapshot:快照读;

    available与local的区别

    在复制集下面local和available没有区别,区别在于分片集迁移数据时,当chunk x需要从shard1迁移到shard2上面的过程中,在shard1,shard2上面都有chunk x的数据,但shard1为负责方。所有对chunk x的读写操作都会进入shard1,如果指定对shard2读取,available会读取包含chunk x的数据,而local只会读取当前分片负责的数据

    majority

    首先主节点写入数据,然后同步给从节点,当主节点收到大部分节点的写入确认后,再同步从节点数据已大部分节点写入,收到的从节点就认为这个数据已经是majority的,可被majority读取的。当有个节点没有收到数据或者majority确认,但是大部分节点已经写入了,当前节点的视角是没有该数据的

    majority作用:
    当主节点挂掉前将数据x=1,改为x=2,数据还没同步给从节点,没有设置majority,主节点读到x=2,主节点就挂掉了,x=2就再也无法获取,原来的就变成了脏读。

    linearizable

    majority大部分时候都能保证不会出现脏读,有一种特殊的情况,majority并不能很好支持。
    存在server1,server2连接着node1,node1为主节点,当node1与其他节点失联,当server2可以连接node1。这时server1通过选举重新连接到node2作为主节点,往node2写入数据。但是server2却认为node1是主节点,获取不到对应数据,导致数据不一致。这种情况需要设置linearizable,这个配置在获取数据前,或从其他节点检查数据是否是真的最新。

    三,因果一致性

    有的博客包括极客时间上面提到,写入mongo时,设置writeConcern=majority,读取时设置readPreference=secondary,readConcern=majority就可以得到很好的性能且数据一致

    其实不是的,先解释下这样子设置的意思,首先写保证大部分节点已经写入,读取时选择从节点,且是被大部分从节点有被写入的数据才能被读取。

    这种情况会出现数据不一致:当前有一个一主二从的集群,当主节点与从节点1数据被写入了,当时从节点2没有被写入,在主节点和从节点1的视角看,这条数据已经被写入,且是mayjority的,但是如果访问的从节点2视角看,是不存在这条数据的,当我们设置readPreference=secondary时,刚好驱动刚好选择的是从节点2,数据会不一致

    实验:
    首先搭建一个一主二从的集群,进入从节点2的shell,执行

    /*
    把节点2锁住,用来模拟从节点延迟
    不要设置延迟节点来模拟节点延迟,在golang的驱动库中就算延迟节点没有设置隐藏,也不会选为读取节点
    */
    db.fsyncLock()
    

    下面这段代码有三组测试:
    1,没有使用session,会出现数据不一致
    2,多个session,CausalConsistency为true,会出现不一致
    3,单个session,CausalConsistency为false,会出现不一致,当把SetCausalConsistency(false)这句删除,驱动默认为true,数据一致

    所以在不考虑事务的情况,需要写入或者修改的数据能被立马读到需要满足:
    1,写与读在同一个session
    2,session的CausalConsistency为true

    更多CausalConsistency的介绍:
    1,https://docs.mongodb.com/manual/reference/server-sessions/
    2,https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
    3,https://docs.mongodb.com/manual/core/transactions/

    package main
    
    import (
        "context"
        "fmt"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
        "go.mongodb.org/mongo-driver/mongo/readconcern"
        "go.mongodb.org/mongo-driver/mongo/readpref"
        "go.mongodb.org/mongo-driver/mongo/writeconcern"
        "sync"
        "time"
    )
    
    func main() {
        noSessionTest()
        multiSessionTest()
        singleSessionTest()
    }
    
    func newCollection(client *mongo.Client) *mongo.Collection {
        option := options.CollectionOptions{}
        return client.Database("alex").
            Collection("test",
                option.SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
                    SetReadPreference(readpref.Secondary()).
                    SetReadConcern(readconcern.Majority()))
    }
    
    func newClient() *mongo.Client {
        uri := "mongodb://member1.example.com:28017,member2.example.com:28018,member3.example.com:28019/admin?replicaSet=rs0"
        ctx := context.Background()
        client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
        if err != nil {
            panic(err)
        }
    
        err = client.Ping(ctx, nil)
        if err != nil {
            panic(err)
        }
    
        fmt.Println("Successfully connected and pinged.")
        return client
    }
    
    func multiSessionTest() {
        ctx := context.TODO()
        client := newClient()
        opts := options.Session().
            SetDefaultReadConcern(readconcern.Majority()).
            SetDefaultReadPreference(readpref.Secondary())
    
        sess, err := client.StartSession(opts)
        if err != nil {
            panic(err)
        }
    
        var insertId interface{}
        err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
            coll :=newCollection(client)
            insertId = insert(sessionContext,coll)
            return nil
        })
    
        sess, err = client.StartSession(opts)
        if err != nil {
            panic(err)
        }
        err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
            coll :=newCollection(client)
            find(sessionContext,coll,insertId)
            return nil
        })
    }
    
    func singleSessionTest()  {
        ctx := context.TODO()
        client := newClient()
        opts := options.Session().
            SetDefaultReadConcern(readconcern.Majority()).
            SetDefaultReadPreference(readpref.Secondary()).
            SetCausalConsistency(false)
    
        sess, err := client.StartSession(opts)
        if err != nil {
            panic(err)
        }
    
        var insertId interface{}
        err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
            coll :=newCollection(client)
            insertId = insert(sessionContext,coll)
            find(sessionContext,coll,insertId)
            return nil
        })
    }
    
    func noSessionTest() {
        ctx := context.TODO()
        client := newClient()
        coll := newCollection(client)
        insertId := insert(ctx, coll)
        find(ctx,coll,insertId)
    }
    
    func insert(ctx context.Context, coll *mongo.Collection) (insertId interface{}) {
        res, err := coll.InsertOne(ctx, bson.M{"time": time.Now().Unix()})
        if err != nil {
            panic(err)
        }
    
        return res.InsertedID
    }
    
    func find(ctx context.Context, coll *mongo.Collection, insertId interface{}) {
        wait := sync.WaitGroup{}
        wait.Add(100)
        for i := 0; i < 100; i++ {
            go func() {
                result := coll.FindOne(ctx, bson.M{"_id": insertId})
                if result.Err() != nil {
                    fmt.Println(result.Err())
                } else {
                    fmt.Println(true)
                }
                wait.Done()
            }()
        }
        wait.Wait()
    }
    
    

    相关文章

      网友评论

          本文标题:六,mongo事务

          本文链接:https://www.haomeiwen.com/subject/puehzktx.html