一,writeConcern
writeConcern 决定一个写操作落到多少个节点上才算成功。writeConcern 的取值包括:
• 0:发起写操作,不关心是否成功;
• 1~集群最大数据节点数:写操作需要被复制到指定节点数才算成功;
• majority:写操作需要被复制到大多数节点上才算成功。
发起写操作的程序将阻塞到写操作到达指定的节点数为止
1.1 writeConcern测试
//进入主节点,执行
conf=rs.conf()
conf.members[2].slaveDelay = 5//延迟5秒
conf.members[2].priority = 0//投票优先级设置为0
rs.reconfig(conf)
db.test.insertOne({count: 1}, {writeConcern: {w: 3}})//5秒后执行完成
//返回
{
"acknowledged" : true,
"insertedId" : ObjectId("60072bacd28a60ee717d761e")
}
db.test.insertOne({count:2},{writeConcern: {w: 4, wtimeout:3000}})//超过节点数
//返回
WriteConcernError({
"code" : 100,
"codeName" : "UnsatisfiableWriteConcern",
"errmsg" : "Not enough data-bearing nodes",
"errInfo" : {
"writeConcern" : {
"w" : 4,
"wtimeout" : 3000,
"provenance" : "clientSupplied"
}
}
})
//查询
db.test.find({count:2})
//返回
{ "_id" : ObjectId("60072f63d28a60ee717d7623"), "count" : 2 }
db.test.insertOne({count:3},{writeConcern: {w: 3, wtimeout:3000}})//设置3秒超时,返回会报错
//返回
WriteConcernError({
"code" : 64,
"codeName" : "WriteConcernFailed",
"errmsg" : "waiting for replication timed out",
"errInfo" : {
"wtimeout" : true,
"writeConcern" : {
"w" : 3,
"wtimeout" : 3000,
"provenance" : "clientSupplied"
}
}
})
//主节点查询
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
//过了5秒,进入第二个从节点执行
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
writeConcern大于总节点数,或者等于总节点数但有一个节点故障,写入会报错,但数据还是会写入,但这种错误没有必要,重要数据设置成majority就可以了
设置超时,虽然会报错,当数据实际上是主节点先写入,然后再等待其他节点同步数据时发生超时,数据还是已经入库了,也不会因为超时错误停止同步
通常应对重要数据应用 {w: “majority”},普通数据可以应用 {w: 1} 以确保最佳性能。
writeConcern写操作返回耗时受到同步及从节点性能影响,但并不会显著增加集群压力,因此无论是否等待,写操作最终都会复制到所有节点上。设置 writeConcern 只是让写操作等待复制后再返回而已
二,读数据
作为集群数据库读取时需要关注2个问题:
1,去哪读-readPreference
2,数据隔离性-readConcern
2.1 readPreference
readPreference 决定使用哪一个节点来满足正在发起的读请求。可选值包括:
• primary: 只选择主节点;
• primaryPreferred:优先选择主节点,如果不可用则选择从节点;
• secondary:只选择从节点;
• secondaryPreferred:优先选择从节点,如果从节点不可用则选择主节点;
• nearest:选择最近的节点;
除此之外还可以通过标签来选择读取节点
2.1 readConcern
• available: 读取所有可用数据;
• local:读取属于当前分片的所有可用数据;
• majority:读取大部分已提交的;
• linearizable:线性读;
• snapshot:快照读;
available与local的区别
在复制集下面local和available没有区别,区别在于分片集迁移数据时,当chunk x需要从shard1迁移到shard2上面的过程中,在shard1,shard2上面都有chunk x的数据,但shard1为负责方。所有对chunk x的读写操作都会进入shard1,如果指定对shard2读取,available会读取包含chunk x的数据,而local只会读取当前分片负责的数据
majority
首先主节点写入数据,然后同步给从节点,当主节点收到大部分节点的写入确认后,再同步从节点数据已大部分节点写入,收到的从节点就认为这个数据已经是majority的,可被majority读取的。当有个节点没有收到数据或者majority确认,但是大部分节点已经写入了,当前节点的视角是没有该数据的
majority作用:
当主节点挂掉前将数据x=1,改为x=2,数据还没同步给从节点,没有设置majority,主节点读到x=2,主节点就挂掉了,x=2就再也无法获取,原来的就变成了脏读。
linearizable
majority大部分时候都能保证不会出现脏读,有一种特殊的情况,majority并不能很好支持。
存在server1,server2连接着node1,node1为主节点,当node1与其他节点失联,当server2可以连接node1。这时server1通过选举重新连接到node2作为主节点,往node2写入数据。但是server2却认为node1是主节点,获取不到对应数据,导致数据不一致。这种情况需要设置linearizable,这个配置在获取数据前,或从其他节点检查数据是否是真的最新。
三,因果一致性
有的博客包括极客时间上面提到,写入mongo时,设置writeConcern=majority,读取时设置readPreference=secondary,readConcern=majority就可以得到很好的性能且数据一致
其实不是的,先解释下这样子设置的意思,首先写保证大部分节点已经写入,读取时选择从节点,且是被大部分从节点有被写入的数据才能被读取。
这种情况会出现数据不一致:当前有一个一主二从的集群,当主节点与从节点1数据被写入了,当时从节点2没有被写入,在主节点和从节点1的视角看,这条数据已经被写入,且是mayjority的,但是如果访问的从节点2视角看,是不存在这条数据的,当我们设置readPreference=secondary时,刚好驱动刚好选择的是从节点2,数据会不一致
实验:
首先搭建一个一主二从的集群,进入从节点2的shell,执行
/*
把节点2锁住,用来模拟从节点延迟
不要设置延迟节点来模拟节点延迟,在golang的驱动库中就算延迟节点没有设置隐藏,也不会选为读取节点
*/
db.fsyncLock()
下面这段代码有三组测试:
1,没有使用session,会出现数据不一致
2,多个session,CausalConsistency为true,会出现不一致
3,单个session,CausalConsistency为false,会出现不一致,当把SetCausalConsistency(false)
这句删除,驱动默认为true,数据一致
所以在不考虑事务的情况,需要写入或者修改的数据能被立马读到需要满足:
1,写与读在同一个session
2,session的CausalConsistency为true
更多CausalConsistency的介绍:
1,https://docs.mongodb.com/manual/reference/server-sessions/
2,https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
3,https://docs.mongodb.com/manual/core/transactions/
package main
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"sync"
"time"
)
func main() {
noSessionTest()
multiSessionTest()
singleSessionTest()
}
func newCollection(client *mongo.Client) *mongo.Collection {
option := options.CollectionOptions{}
return client.Database("alex").
Collection("test",
option.SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
SetReadPreference(readpref.Secondary()).
SetReadConcern(readconcern.Majority()))
}
func newClient() *mongo.Client {
uri := "mongodb://member1.example.com:28017,member2.example.com:28018,member3.example.com:28019/admin?replicaSet=rs0"
ctx := context.Background()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
panic(err)
}
err = client.Ping(ctx, nil)
if err != nil {
panic(err)
}
fmt.Println("Successfully connected and pinged.")
return client
}
func multiSessionTest() {
ctx := context.TODO()
client := newClient()
opts := options.Session().
SetDefaultReadConcern(readconcern.Majority()).
SetDefaultReadPreference(readpref.Secondary())
sess, err := client.StartSession(opts)
if err != nil {
panic(err)
}
var insertId interface{}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
insertId = insert(sessionContext,coll)
return nil
})
sess, err = client.StartSession(opts)
if err != nil {
panic(err)
}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
find(sessionContext,coll,insertId)
return nil
})
}
func singleSessionTest() {
ctx := context.TODO()
client := newClient()
opts := options.Session().
SetDefaultReadConcern(readconcern.Majority()).
SetDefaultReadPreference(readpref.Secondary()).
SetCausalConsistency(false)
sess, err := client.StartSession(opts)
if err != nil {
panic(err)
}
var insertId interface{}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
insertId = insert(sessionContext,coll)
find(sessionContext,coll,insertId)
return nil
})
}
func noSessionTest() {
ctx := context.TODO()
client := newClient()
coll := newCollection(client)
insertId := insert(ctx, coll)
find(ctx,coll,insertId)
}
func insert(ctx context.Context, coll *mongo.Collection) (insertId interface{}) {
res, err := coll.InsertOne(ctx, bson.M{"time": time.Now().Unix()})
if err != nil {
panic(err)
}
return res.InsertedID
}
func find(ctx context.Context, coll *mongo.Collection, insertId interface{}) {
wait := sync.WaitGroup{}
wait.Add(100)
for i := 0; i < 100; i++ {
go func() {
result := coll.FindOne(ctx, bson.M{"_id": insertId})
if result.Err() != nil {
fmt.Println(result.Err())
} else {
fmt.Println(true)
}
wait.Done()
}()
}
wait.Wait()
}
网友评论