第一步 阅读4B的文档
弄清楚里面的每个段落
第二步 基于LAB3,写出可以通过单GROUP的代码
Your first task is to pass the very first shardkv test. In this test, there is only a single assignment of shards, so your code should be very similar to that of your Lab 3 server. The biggest modification will be to have your server detect when a configuration happens and start accepting requests whose keys match shards that it now owns.
基于LAB3,把代码可以抄的都抄过去
image.png
抄代码就不展示了。基本大多数都是把代码复制过去。
因为这边的CLIENT 给了代码 是需要看ERR这个属性的。
image.png
所以和LAB3不同(那里我没用这个REPLY ERR的属性),需要加上返回值。
image.png
很快,测试1通过了
image.png
第三步 增加拉CONFIG,和拒绝不属于自己的SHARD的代码
来自HINT
image.pngAdd code to server.go to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test.
image.png
image.png
image.png
测试之后,依然确保可以过第一个TEST
第四步 思考
思考 CONFIG变化后,如何转移SHARD
我的思考是这样的。如果一个REPLICA GROUP A得到一个SHARD 1,对应B 失去一个SHARD 1
如果是A检测到多了,去等待别人来发给我,会比较被动。因为不知道要等多久。
其次B还需要发现自己失去SHARD 1后,要主动去发给A,这增加了B 的工作量。
因为我们这边在做SHARD MIGRATION的时候,是不能响应请求。但对B来说,他可以立刻更新CONFIG,即使没把SHARD 1发送出去,他也是可以响应请求。
但对A来说,一定要拿到SHARD 1后,它才可以继续服务。
基于上述思考,决定让A去问B要SHARD,这样会简化设计。因为这样做B可以如果发现了新的CONFIG,可以直接更新让它立刻生效。A需要等待PULL成功后,更新CONFIG让它生效。
Reconfiguration 会影响到 PutAppend/Get,因此同样需要利用 raft 保证 group 内的一致性,确保集群内完成了之前的操作后同时进行Reconfiguration;
要思考的第二个点,是传输SHARD的RPC。
首先SHARD DATA是一定要发过去的。
但是只是发SHARD DATA是不够的。
比如一个APPEND REQUEST 在向A SERVER发送的时候,TIMEOUT了。这个时候A server 已经做了这个更新操作。
在这个点之后,Reconfiguration 发生,CLIENT 去问B SERVER发送APPEND REQ。如果只是SHARD DATA过去。会造成APPEND2次。
所以我们还需要把去重的MAP也一起发过去。
发过去的参数除了要诉说我要哪个SHARD之外,还需要加上CONFIG NUM,因为有可能我发的CONFIG NUM比那边还要大,说明那边的CONFIG还没同步到。
基于上述思路。我设计的RPC如下。
image.png
第五步 实现MIGRATE SHARD RPC HANDLER
这里有个很重要的思路,每个RAFT GROUP都是由LEADER负责发送和接受RPC。FOLLOWER只负责从APPLY MSG里去和LEADER SYNC状态。
还有一个点,就是我们不能直接从DB里去取数据,如果我们没有实现清理数据的前提下,因为数据不清理。所以我们会有多的数据,想象一下。我们先接受SHARD1,然后不接受,再重新接受SHARD1,此时做迁移,会是一个并集。而我们只是希望是重新接受的那部分。基于上述考虑。我们需要基于每一个CONFIG,单独把要迁移的数据给抽出来。这样依据CONFIG来做迁移。
image.png
第六步 思考难点
如何去PULL DATA?如果我们选择让LEADER去交互,我们必须要HANDLER RAFT Leader挂掉,得有新的LEADER来负责PULL DATA。
所以在所有节点上必须得存好要问哪里去PULL DATA。如果PULL到,我们需要确保LEADER会往RAFT里发CMD(这个CMD是让节点同步数据,同时删掉那个维护的哪里去PULL DATA的地方)
而且我们必须额外开一个后台进程与循环的做这件事。不然LEADER转移过去之后,就没有人PULL DATA了。 因为PULL DATA 这件事是没有CLIENT超时重试的。
因为要后台循环去PULL DATA,我们拿到DATA后,送进RAFT,再进入到APPLY CH,需要所有的节点都可以同步这个数据。一旦同步成功,我们需要清理这个要等待的数据。这样后台线程可以少发很多无用的RPC。
同时我们在索要数据的时候也要知道往哪个REPLICA GROUP要。
image.png image.png image.png image.png
第7步
目前我们已经再LEADER端,把收到的新的CONFIG和拿到的MIGRATION DATA打给放进RAFT的LOG去做线性一致的排序。
所以当这个2个消息从APPLY MSG出来的时候,需要去做一些事情。
为此,我单独开了一个函数去写APPLY的逻辑
image.png
第8步 实现APPLY MSG 是MIGRATION DATA REPLY
这里的点(后面大量调试获得的),因为REPLY 发到RAFT里面,虽然有顺序,但返回的时候顺序可能是乱的。比如现在我的CONFIG已经更新到9,这个时候RAFT才把CONFIG的6 返回回来。我们应该直接忽略这个版本。如果更新了,就会产生不一致。
那么依据乱序思想,我们不得不CHECK 就是当前REPLY的CONFIG版本号必须是当前CONFIG版本号小一个。
为什么?
这里我们在收到CONFIG 变更,我们就会刷新CONFIG。但是此时CONFIG刷新之后,我们会更新COME IN SHARD,随后后台线程会去PULL。从更新COME IN SHARD到数据SHARD过来,这段时间内,我们必须得拒绝掉所有的索要该SHARD的请求。所以我们不能直接从CONFIG来判断是不是WRONG GROUP。
至此,我们需要额外再维护一个我现在能HANDLER哪些SHARD的数据结构。
image.png
那么发出去的SHARD,我可以直接从这个数据结构里删掉。要进来的话,等真的进来了,再添加到这个数据结构中。
image.png
判断是不是WRONG GROUP,也依据这个数据结构来看。
第9步 实现updateInAndOutDataShard
这边我们会根据新的CONFIG来,判断自己要送出去的数据是哪些,自己要接受进来的数组是哪些。在我的设计里这2个数据结构必要性在第5,6步讨论过了。
image.png
第十步 判断WRONG GROUP的时机
在前面的版本中,我们是在SERVER端接受到请求的时候,就直接去依据CONFIG判断WRONG GROUP。现在我们改成依据MYSHARD来看,但是这还是不足的。
还记得我们在做LAB 3的时候,判断去重,必须得再消息回来的时候再看一次。 因为可能在请求发送的时候,数据还在REPLICA GROUP 1。可是到消息从RAFT返回来的时候,当中发生过更新CONFIG。数据不再GROUP 1了。所以要把判断WRONG GROUP的逻辑,加在数据返回层。
同时因为数据会在APPLY CH收到新的CONFIG,一部分要TO OUT的数据就会从DB里DELETE掉。为了确保NOTIFY CH的传输过程中,这个DB的更改不会影响到实际的GET的返回值。我们需要在接到APPLY CH的时候就把结果给注入到OP里。不然等OP发过去再从DB拿,有一定概率此时另一个线程已经再DELETE DB了。
image.png
image.png
image.png
同时根据这个思路,我把SHARD MASTER的QUERY 也加在返回层来做。
image.png
第11步 初始化新加的属性
image.png第12步 更新POLL NEW CONFIG代码,需要一个个更新
来自HINT
Process re-configurations one at a time, in order.
同时注意如果当前CONFIG,那些需要转移的SHARD还没做完。不要立刻去拿下一个CONFIG。
image.png
image.png
第13步 测试JOIN AND LEAVE
发现有时可以过,有时过不了会阻塞。
BUG 1 死锁
通过几小时的调试发现,是一个3维死锁。首先RAFT里面拿了RAFT的锁,阻塞在APPLY CH那。APPLY CH的后台线程阻塞在KV SERVER的锁上。 还有一个PULL CONFIG的线程,持有了KV SERVER的锁,阻塞在RAFT的锁上。
image.pngFIX方法,交换代码顺序。
image.png
测试通过
但是写了这么多代码,很多地方都没注意保护共享变量。所以用TEST DATA RACE的时候会出问题。
检查思路,先看3个后台进程,随后看几个RPC handler
这边就自己加一下锁吧。
GO TEST RACE OK之后,会在第三个测试败掉。是SNAPSHOT
我们需要存储更多的状态进SNAPSHOT。
第14步 实现新的SNAPSHOT
image.pngimage.png
image.png
再直接测试,发现阻塞在UNRELIABLE 3
image.png
BUG 2 一处地方没有释放锁
image.png修复后重新对这个CASE单独测试100次通过,测全集。只剩下CHANLLEGE 1,DELETE的TASK了
image.png
第15步 思考如何删除不必要的状态
在上面的实现里,我们开了3个数据结构,一个是TO OUT,一个是COME IN,一个是MY SHARD;
第三个是固定大小的。不用考虑
第二个,我们已经再接受到DATA之后会去删除它。
唯一没有回收的就是第一个。
最NAIVE的实现是当我们把数据当做REPLY发过去的时候,就直接删掉。这是危险的。因为很有可能这个消息会丢失,被那边服务器拒绝,造成这个数据就永远不会被回收。
正确的做法是等到对方服务器,成功接收了DATA,然后删除了对应的COME IN,这个时候应该发REQUEST告诉TO OUT一方,你可以安全的把TO OUT里的这块DATA给回收了。
但是依然存在RPC会丢失的情况。和PULL的思想一样。(用一个COME IN LIST+ 后台线程,来不断重试,成功时候删除COME IN LIST内容,就不再去PULL直到有新的COME IN来。失败的话,因为COME IN 内容还在,就会自动重试,不怕网络不稳定)
那么我针对这个CASE,用相同的套路。后台GC线程+Garbage List.
具体思路就是当COME IN 的DATA收到后,我们要把这块数据标记进Garbage List。 后台GC线程发现Garbage List有内容,就会往对应的GROUP发送GC RPC。对应的GROUP清理成功后,REPLY告知。我们把Garbage List对应的内容删除。
同样我们依然只和LEADER交互,并且利用RAFT LOG,来确保所有节点都成功删除GARBAGE,再RPC回复SUCCESS
第16步 写GC RPC HANDLER,抽一个TEMPLATE
发现可以用ERR 里面加一个WRONGLEADER来代表LEADER不对。就可以去掉一个参数。
当OP TYPE是GC的时候,KEY 是CONFIG NUM,SEQNUM是SHARD。
image.png image.png image.png
第17步 实现GC
image.pngimage.png
第18步 实现GC后台进程
image.png image.png第19步 往GARBAGE里添加值
image.png第20步,更新SNAPSHOT
这里小伙伴自行更新吧
测试通过
image.png第21步
因为我的REPLICA GROUP里会往MASTER 发送QUERY请求,这个时候可能会造成LAST LEADER的DATA RACE。
所以我用原子方法改写。
image.png
第22步 CONCISE代码
1.我把WRONDLEADER给去掉了。同时用ERR 的WRONGLEADER来表示。
image.png
2.把几个RPC HANDLER用TEMPLATE 提取公有逻辑
image.png
最终430 行代码
GO TEST 测试200次
这个测试不适合并行,因为会大量开线程在做。并行测试会造成有些CASE跑的巨慢。所以串行测试了。
同时CONFIG里因为MASTER的资源没有回收。越到后面TEST 跑的越慢,我加了如下代码来提速测试
image.png
基本跑完一整套是2分钟
image.png
测试200次的结果 是
BUG 3
TestChallenge2Unaffected 会有1/50的概率阻塞。
经过打LOG 发现,是还没来得及把所有DATA SHARD完,超过了1秒,之后就有数据再也MIGRATE不过来。造成拿不到而阻塞。
这里分享一个打LOG的技巧,避免淹没在茫茫LOG海里。就是出了问题,再打LOG
image.png
image.png image.png image.png
原因如下:
image.png
下图这个4号数据块 在测试里属于101的OWN,但是还没来得及拿到,100的网就断了。再也取不到了。
image.png
解决方案,加快PULL的频率
image.png
加快QUERY CONFIG的速度。思路是如果是拿已知的CONFIG,因为CONFIG的APPEND不会修改,所以可以直接返回。
image.png
缩短MASTER CLIENT的睡眠时间。
image.png
测试200次后无阻塞。
SHARD KV 测试200次通过
测试脚本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6.824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 200; i++))
do
echo $i
(go test) > ./res/$i
grep -nr "FAIL.*" res
done
image.png
回归测试SHARD MASTER500次通过
image.png回归测试RAFT 300次通过
有2次时之前说的不是代码问题的KNOWN ISSUE,具体参考文集的2C部分
image.png
回归测试KVRAFT 210次通过
image.pngCONCISE SERVER
package shardkv
import (
"bytes"
"labrpc"
"log"
"shardmaster"
"strconv"
"time"
)
import "raft"
import "sync"
import "labgob"
type Op struct {
OpType string "operation type(eg. put/append/gc/get)"
Key string "key for normal, config num for gc"
Value string
Cid int64 "cid for put/append, operation uid for get/gc"
SeqNum int "seqnum for put/append, shard for gc"
}
type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int
masters []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
// Your definitions here.
mck *shardmaster.Clerk
cfg shardmaster.Config
persist *raft.Persister
db map[string]string
chMap map[int]chan Op
cid2Seq map[int64]int
toOutShards map[int]map[int]map[string]string "cfg num -> (shard -> db)"
comeInShards map[int]int "shard->config number"
myShards map[int]bool "to record which shard i can offer service"
garbages map[int]map[int]bool "cfg number -> shards"
killCh chan bool
}
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
originOp := Op{"Get",args.Key,"",Nrand(),0}
reply.Err,reply.Value = kv.templateStart(originOp)
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
reply.Err,_ = kv.templateStart(originOp)
}
func (kv *ShardKV) templateStart(originOp Op) (Err, string) {
index,_,isLeader := kv.rf.Start(originOp)
if isLeader {
ch := kv.put(index, true)
op := kv.beNotified(ch, index)
if equalOp(originOp, op) { return OK, op.Value }
if op.OpType == ErrWrongGroup { return ErrWrongGroup, "" }
}
return ErrWrongLeader,""
}
func (kv *ShardKV) GarbageCollection(args *MigrateArgs, reply *MigrateReply) {
reply.Err = ErrWrongLeader
if _, isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
if _,ok := kv.toOutShards[args.ConfigNum]; !ok {return}
if _,ok := kv.toOutShards[args.ConfigNum][args.Shard]; !ok {return}
originOp := Op{"GC",strconv.Itoa(args.ConfigNum),"",Nrand(),args.Shard}
kv.mu.Unlock()
reply.Err,_ = kv.templateStart(originOp)
kv.mu.Lock()
}
func (kv *ShardKV) ShardMigration(args *MigrateArgs, reply *MigrateReply) {
reply.Err, reply.Shard, reply.ConfigNum = ErrWrongLeader, args.Shard, args.ConfigNum
if _,isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Err = ErrWrongGroup
if args.ConfigNum >= kv.cfg.Num {return}
reply.Err,reply.ConfigNum, reply.Shard = OK, args.ConfigNum, args.Shard
reply.DB, reply.Cid2Seq = kv.deepCopyDBAndDedupMap(args.ConfigNum,args.Shard)
}
func (kv *ShardKV) deepCopyDBAndDedupMap(config int,shard int) (map[string]string, map[int64]int) {
db2 := make(map[string]string)
cid2Seq2 := make(map[int64]int)
for k, v := range kv.toOutShards[config][shard] {
db2[k] = v
}
for k, v := range kv.cid2Seq {
cid2Seq2[k] = v
}
return db2, cid2Seq2
}
func (kv *ShardKV) beNotified(ch chan Op,index int) Op{
select {
case notifyArg,ok := <- ch :
if ok {
close(ch)
}
kv.mu.Lock()
delete(kv.chMap,index)
kv.mu.Unlock()
return notifyArg
case <- time.After(time.Duration(1000)*time.Millisecond):
return Op{}
}
}
func (kv *ShardKV) put(idx int,createIfNotExists bool) chan Op{
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.chMap[idx]; !ok {
if !createIfNotExists {return nil}
kv.chMap[idx] = make(chan Op,1)
}
return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
return a.Key == b.Key && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}
func (kv *ShardKV) Kill() {
kv.rf.Kill()
select{
case <-kv.killCh:
default:
}
kv.killCh <- true
}
func (kv *ShardKV) readSnapShot(snapshot []byte) {
kv.mu.Lock()
defer kv.mu.Unlock()
if snapshot == nil || len(snapshot) < 1 {return}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var db map[string]string
var cid2Seq map[int64]int
var toOutShards map[int]map[int]map[string]string
var comeInShards map[int]int
var myShards map[int]bool
var garbages map[int]map[int]bool
var cfg shardmaster.Config
if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil || d.Decode(&comeInShards) != nil ||
d.Decode(&toOutShards) != nil || d.Decode(&myShards) != nil || d.Decode(&cfg) != nil ||
d.Decode(&garbages) != nil {
log.Fatal("readSnapShot ERROR for server %v",kv.me)
} else {
kv.db, kv.cid2Seq, kv.cfg = db, cid2Seq, cfg
kv.toOutShards, kv.comeInShards, kv.myShards, kv.garbages = toOutShards,comeInShards,myShards,garbages
}
}
func (kv *ShardKV) needSnapShot() bool {
kv.mu.Lock()
defer kv.mu.Unlock()
threshold := 10
return kv.maxraftstate > 0 &&
kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}
func (kv *ShardKV) doSnapShot(index int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
kv.mu.Lock()
e.Encode(kv.db)
e.Encode(kv.cid2Seq)
e.Encode(kv.comeInShards)
e.Encode(kv.toOutShards)
e.Encode(kv.myShards)
e.Encode(kv.cfg)
e.Encode(kv.garbages)
kv.mu.Unlock()
kv.rf.DoSnapShot(index,w.Bytes())
}
func (kv *ShardKV) tryPollNewCfg() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) > 0{
kv.mu.Unlock()
return
}
next := kv.cfg.Num + 1
kv.mu.Unlock()
cfg := kv.mck.Query(next)
if cfg.Num == next {
kv.rf.Start(cfg) //sync follower with new cfg
}
}
func (kv *ShardKV) tryGC() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.garbages) == 0{
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for cfgNum, shards := range kv.garbages {
for shard := range shards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
kv.mu.Lock()
defer kv.mu.Unlock()
delete(kv.garbages[cfgNum], shard)
if len(kv.garbages[cfgNum]) == 0 {
delete(kv.garbages, cfgNum)
}
}
}
}(shard, kv.mck.Query(cfgNum))
}
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) tryPullShard() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) == 0 {
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for shard, idx := range kv.comeInShards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.ShardMigration", &args, &reply); ok && reply.Err == OK {
kv.rf.Start(reply)
}
}
}(shard, kv.mck.Query(idx))
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) daemon(do func(), sleepMS int) {
for {
select {
case <-kv.killCh:
return
default:
do()
}
time.Sleep(time.Duration(sleepMS) * time.Millisecond)
}
}
func (kv *ShardKV) apply(applyMsg raft.ApplyMsg) {
if cfg, ok := applyMsg.Command.(shardmaster.Config); ok {
kv.updateInAndOutDataShard(cfg)
} else if migrationData, ok := applyMsg.Command.(MigrateReply); ok{
kv.updateDBWithMigrateData(migrationData)
}else {
op := applyMsg.Command.(Op)
if op.OpType == "GC" {
cfgNum,_ := strconv.Atoi(op.Key)
kv.gc(cfgNum,op.SeqNum);
} else {
kv.normal(&op)
}
if notifyCh := kv.put(applyMsg.CommandIndex,false); notifyCh != nil {
send(notifyCh,op)
}
}
if kv.needSnapShot() {
go kv.doSnapShot(applyMsg.CommandIndex)
}
}
func (kv *ShardKV) gc(cfgNum int, shard int) {
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.toOutShards[cfgNum]; ok {
delete(kv.toOutShards[cfgNum], shard)
if len(kv.toOutShards[cfgNum]) == 0 {
delete(kv.toOutShards, cfgNum)
}
}
}
func (kv *ShardKV) updateInAndOutDataShard(cfg shardmaster.Config) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cfg.Num <= kv.cfg.Num { //only consider newer config
return
}
oldCfg, toOutShard := kv.cfg, kv.myShards
kv.myShards, kv.cfg = make(map[int]bool), cfg
for shard, gid := range cfg.Shards {
if gid != kv.gid {continue}
if _, ok := toOutShard[shard]; ok || oldCfg.Num == 0 {
kv.myShards[shard] = true
delete(toOutShard, shard)
} else {
kv.comeInShards[shard] = oldCfg.Num
}
}
if len(toOutShard) > 0 { // prepare data that needed migration
kv.toOutShards[oldCfg.Num] = make(map[int]map[string]string)
for shard := range toOutShard {
outDb := make(map[string]string)
for k, v := range kv.db {
if key2shard(k) == shard {
outDb[k] = v
delete(kv.db, k)
}
}
kv.toOutShards[oldCfg.Num][shard] = outDb
}
}
}
func (kv *ShardKV) updateDBWithMigrateData(migrationData MigrateReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if migrationData.ConfigNum != kv.cfg.Num-1 {return}
delete(kv.comeInShards, migrationData.Shard)
//this check is necessary, to avoid use kv.cfg.Num-1 to update kv.cfg.Num's shard
if _, ok := kv.myShards[migrationData.Shard]; !ok {
kv.myShards[migrationData.Shard] = true
for k, v := range migrationData.DB {
kv.db[k] = v
}
for k, v := range migrationData.Cid2Seq {
kv.cid2Seq[k] = Max(v,kv.cid2Seq[k])
}
if _, ok := kv.garbages[migrationData.ConfigNum]; !ok {
kv.garbages[migrationData.ConfigNum] = make(map[int]bool)
}
kv.garbages[migrationData.ConfigNum][migrationData.Shard] = true
}
}
func (kv *ShardKV) normal(op *Op) {
shard := key2shard(op.Key)
kv.mu.Lock()
if _, ok := kv.myShards[shard]; !ok {
op.OpType = ErrWrongGroup
} else {
maxSeq,found := kv.cid2Seq[op.Cid]
if !found || op.SeqNum > maxSeq {
if op.OpType == "Put" {
kv.db[op.Key] = op.Value
} else if op.OpType == "Append" {
kv.db[op.Key] += op.Value
}
kv.cid2Seq[op.Cid] = op.SeqNum
}
if op.OpType == "Get" {
op.Value = kv.db[op.Key]
}
}
kv.mu.Unlock()
}
func send(notifyCh chan Op,op Op) {
select{
case <-notifyCh:
default:
}
notifyCh <- op
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
labgob.Register(MigrateArgs{})
labgob.Register(MigrateReply{})
labgob.Register(shardmaster.Config{})
kv := new(ShardKV)
kv.me = me
kv.maxraftstate = maxraftstate
kv.make_end = make_end
kv.gid = gid
kv.masters = masters
// Your initialization code here.
kv.persist = persister
// Use something like this to talk to the shardmaster:
kv.mck = shardmaster.MakeClerk(kv.masters)
kv.cfg = shardmaster.Config{}
kv.db = make(map[string]string)
kv.chMap = make(map[int]chan Op)
kv.cid2Seq = make(map[int64]int)
kv.toOutShards = make(map[int]map[int]map[string]string)
kv.comeInShards = make(map[int]int)
kv.myShards = make(map[int]bool)
kv.garbages = make(map[int]map[int]bool)
kv.readSnapShot(kv.persist.ReadSnapshot())
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.killCh = make(chan bool,1)
go kv.daemon(kv.tryPollNewCfg,50)
go kv.daemon(kv.tryPullShard,80)
go kv.daemon(kv.tryGC,100)
go func() {
for {
select {
case <- kv.killCh:
return
case applyMsg := <- kv.applyCh:
if !applyMsg.CommandValid {
kv.readSnapShot(applyMsg.SnapShot)
continue
}
kv.apply(applyMsg)
}
}
}()
return kv
}
CONCISE CLIENT
package shardkv
//
// client code to talk to a sharded key/value service.
//
// the client first talks to the shardmaster to find out
// the assignment of shards (keys) to groups, and then
// talks to the group that holds the key's shard.
//
import (
"labrpc"
)
import "crypto/rand"
import "math/big"
import "shardmaster"
import "time"
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardmaster.NShards
return shard
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
type Clerk struct {
sm *shardmaster.Clerk
config shardmaster.Config
make_end func(string) *labrpc.ClientEnd
// You will have to modify this struct.
lastLeader int
id int64
seqNum int
}
func MakeClerk(masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.sm = shardmaster.MakeClerk(masters)
ck.make_end = make_end
// You'll have to add code here.
ck.id = Nrand()//give each client a unique identifier, and then have them
ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
ck.lastLeader = 0
return ck
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
args := GetArgs{}
args.Key = key
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
// try each server for the shard.
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply GetReply
ok := srv.Call("ShardKV.Get", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si;
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
//
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{key,value,op,ck.id,ck.seqNum}
ck.seqNum++
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply PutAppendReply
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
CONCISE COMMON
package shardkv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//
const (
OK = "OK"
ErrWrongLeader = "ErrWrongLeader"
ErrWrongGroup = "ErrWrongGroup"
)
type Err string
// Put or Append
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
Cid int64 "client unique id"
SeqNum int "each request with a monotonically increasing sequence number"
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Err Err
Value string
}
type MigrateArgs struct {
Shard int
ConfigNum int
}
type MigrateReply struct {
Err Err
ConfigNum int
Shard int
DB map[string]string
Cid2Seq map[int64]int
}
func Max(x, y int) int {
if x > y {
return x
}
return y
}
最后再把全部代码提交进GITHUB。待我把MAP REDUCE写了一起吧。
网友评论