美文网首页
六,mongo事务

六,mongo事务

作者: alexgu | 来源:发表于2021-01-29 22:30 被阅读0次

一,writeConcern

writeConcern 决定一个写操作落到多少个节点上才算成功。writeConcern 的取值包括:
• 0:发起写操作,不关心是否成功;
• 1~集群最大数据节点数:写操作需要被复制到指定节点数才算成功;
• majority:写操作需要被复制到大多数节点上才算成功。
发起写操作的程序将阻塞到写操作到达指定的节点数为止

1.1 writeConcern测试

//进入主节点,执行
conf=rs.conf()
conf.members[2].slaveDelay = 5//延迟5秒
conf.members[2].priority = 0//投票优先级设置为0
rs.reconfig(conf)

db.test.insertOne({count: 1}, {writeConcern: {w: 3}})//5秒后执行完成
//返回
{
        "acknowledged" : true,
        "insertedId" : ObjectId("60072bacd28a60ee717d761e")
}

db.test.insertOne({count:2},{writeConcern: {w: 4, wtimeout:3000}})//超过节点数
//返回
WriteConcernError({
        "code" : 100,
        "codeName" : "UnsatisfiableWriteConcern",
        "errmsg" : "Not enough data-bearing nodes",
        "errInfo" : {
                "writeConcern" : {
                        "w" : 4,
                        "wtimeout" : 3000,
                        "provenance" : "clientSupplied"
                }
        }
})
//查询
db.test.find({count:2})
//返回
{ "_id" : ObjectId("60072f63d28a60ee717d7623"), "count" : 2 }

db.test.insertOne({count:3},{writeConcern: {w: 3, wtimeout:3000}})//设置3秒超时,返回会报错
//返回
WriteConcernError({
        "code" : 64,
        "codeName" : "WriteConcernFailed",
        "errmsg" : "waiting for replication timed out",
        "errInfo" : {
                "wtimeout" : true,
                "writeConcern" : {
                        "w" : 3,
                        "wtimeout" : 3000,
                        "provenance" : "clientSupplied"
                }
        }
})

//主节点查询
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }

//过了5秒,进入第二个从节点执行
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }

writeConcern大于总节点数,或者等于总节点数但有一个节点故障,写入会报错,但数据还是会写入,但这种错误没有必要,重要数据设置成majority就可以了

设置超时,虽然会报错,当数据实际上是主节点先写入,然后再等待其他节点同步数据时发生超时,数据还是已经入库了,也不会因为超时错误停止同步

通常应对重要数据应用 {w: “majority”},普通数据可以应用 {w: 1} 以确保最佳性能。

writeConcern写操作返回耗时受到同步及从节点性能影响,但并不会显著增加集群压力,因此无论是否等待,写操作最终都会复制到所有节点上。设置 writeConcern 只是让写操作等待复制后再返回而已

二,读数据

作为集群数据库读取时需要关注2个问题:
1,去哪读-readPreference
2,数据隔离性-readConcern

2.1 readPreference

readPreference 决定使用哪一个节点来满足正在发起的读请求。可选值包括:
• primary: 只选择主节点;
• primaryPreferred:优先选择主节点,如果不可用则选择从节点;
• secondary:只选择从节点;
• secondaryPreferred:优先选择从节点,如果从节点不可用则选择主节点;
• nearest:选择最近的节点;
除此之外还可以通过标签来选择读取节点

2.1 readConcern

• available: 读取所有可用数据;
• local:读取属于当前分片的所有可用数据;
• majority:读取大部分已提交的;
• linearizable:线性读;
• snapshot:快照读;

available与local的区别

在复制集下面local和available没有区别,区别在于分片集迁移数据时,当chunk x需要从shard1迁移到shard2上面的过程中,在shard1,shard2上面都有chunk x的数据,但shard1为负责方。所有对chunk x的读写操作都会进入shard1,如果指定对shard2读取,available会读取包含chunk x的数据,而local只会读取当前分片负责的数据

majority

首先主节点写入数据,然后同步给从节点,当主节点收到大部分节点的写入确认后,再同步从节点数据已大部分节点写入,收到的从节点就认为这个数据已经是majority的,可被majority读取的。当有个节点没有收到数据或者majority确认,但是大部分节点已经写入了,当前节点的视角是没有该数据的

majority作用:
当主节点挂掉前将数据x=1,改为x=2,数据还没同步给从节点,没有设置majority,主节点读到x=2,主节点就挂掉了,x=2就再也无法获取,原来的就变成了脏读。

linearizable

majority大部分时候都能保证不会出现脏读,有一种特殊的情况,majority并不能很好支持。
存在server1,server2连接着node1,node1为主节点,当node1与其他节点失联,当server2可以连接node1。这时server1通过选举重新连接到node2作为主节点,往node2写入数据。但是server2却认为node1是主节点,获取不到对应数据,导致数据不一致。这种情况需要设置linearizable,这个配置在获取数据前,或从其他节点检查数据是否是真的最新。

三,因果一致性

有的博客包括极客时间上面提到,写入mongo时,设置writeConcern=majority,读取时设置readPreference=secondary,readConcern=majority就可以得到很好的性能且数据一致

其实不是的,先解释下这样子设置的意思,首先写保证大部分节点已经写入,读取时选择从节点,且是被大部分从节点有被写入的数据才能被读取。

这种情况会出现数据不一致:当前有一个一主二从的集群,当主节点与从节点1数据被写入了,当时从节点2没有被写入,在主节点和从节点1的视角看,这条数据已经被写入,且是mayjority的,但是如果访问的从节点2视角看,是不存在这条数据的,当我们设置readPreference=secondary时,刚好驱动刚好选择的是从节点2,数据会不一致

实验:
首先搭建一个一主二从的集群,进入从节点2的shell,执行

/*
把节点2锁住,用来模拟从节点延迟
不要设置延迟节点来模拟节点延迟,在golang的驱动库中就算延迟节点没有设置隐藏,也不会选为读取节点
*/
db.fsyncLock()

下面这段代码有三组测试:
1,没有使用session,会出现数据不一致
2,多个session,CausalConsistency为true,会出现不一致
3,单个session,CausalConsistency为false,会出现不一致,当把SetCausalConsistency(false)这句删除,驱动默认为true,数据一致

所以在不考虑事务的情况,需要写入或者修改的数据能被立马读到需要满足:
1,写与读在同一个session
2,session的CausalConsistency为true

更多CausalConsistency的介绍:
1,https://docs.mongodb.com/manual/reference/server-sessions/
2,https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
3,https://docs.mongodb.com/manual/core/transactions/

package main

import (
    "context"
    "fmt"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "go.mongodb.org/mongo-driver/mongo/readconcern"
    "go.mongodb.org/mongo-driver/mongo/readpref"
    "go.mongodb.org/mongo-driver/mongo/writeconcern"
    "sync"
    "time"
)

func main() {
    noSessionTest()
    multiSessionTest()
    singleSessionTest()
}

func newCollection(client *mongo.Client) *mongo.Collection {
    option := options.CollectionOptions{}
    return client.Database("alex").
        Collection("test",
            option.SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
                SetReadPreference(readpref.Secondary()).
                SetReadConcern(readconcern.Majority()))
}

func newClient() *mongo.Client {
    uri := "mongodb://member1.example.com:28017,member2.example.com:28018,member3.example.com:28019/admin?replicaSet=rs0"
    ctx := context.Background()
    client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
    if err != nil {
        panic(err)
    }

    err = client.Ping(ctx, nil)
    if err != nil {
        panic(err)
    }

    fmt.Println("Successfully connected and pinged.")
    return client
}

func multiSessionTest() {
    ctx := context.TODO()
    client := newClient()
    opts := options.Session().
        SetDefaultReadConcern(readconcern.Majority()).
        SetDefaultReadPreference(readpref.Secondary())

    sess, err := client.StartSession(opts)
    if err != nil {
        panic(err)
    }

    var insertId interface{}
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        insertId = insert(sessionContext,coll)
        return nil
    })

    sess, err = client.StartSession(opts)
    if err != nil {
        panic(err)
    }
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        find(sessionContext,coll,insertId)
        return nil
    })
}

func singleSessionTest()  {
    ctx := context.TODO()
    client := newClient()
    opts := options.Session().
        SetDefaultReadConcern(readconcern.Majority()).
        SetDefaultReadPreference(readpref.Secondary()).
        SetCausalConsistency(false)

    sess, err := client.StartSession(opts)
    if err != nil {
        panic(err)
    }

    var insertId interface{}
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        insertId = insert(sessionContext,coll)
        find(sessionContext,coll,insertId)
        return nil
    })
}

func noSessionTest() {
    ctx := context.TODO()
    client := newClient()
    coll := newCollection(client)
    insertId := insert(ctx, coll)
    find(ctx,coll,insertId)
}

func insert(ctx context.Context, coll *mongo.Collection) (insertId interface{}) {
    res, err := coll.InsertOne(ctx, bson.M{"time": time.Now().Unix()})
    if err != nil {
        panic(err)
    }

    return res.InsertedID
}

func find(ctx context.Context, coll *mongo.Collection, insertId interface{}) {
    wait := sync.WaitGroup{}
    wait.Add(100)
    for i := 0; i < 100; i++ {
        go func() {
            result := coll.FindOne(ctx, bson.M{"_id": insertId})
            if result.Err() != nil {
                fmt.Println(result.Err())
            } else {
                fmt.Println(true)
            }
            wait.Done()
        }()
    }
    wait.Wait()
}

相关文章

网友评论

      本文标题:六,mongo事务

      本文链接:https://www.haomeiwen.com/subject/puehzktx.html