第一步 阅读相关需求
https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html
的Part B: Key/value service with log compaction
论文的第7章。
第二步 思考
You should spend some time figuring out what the interface will be between your Raft library and your service so that your Raft library can discard log entries. Think about how your Raft will operate while storing only the tail of the log, and how it will discard old log entries.
下面给了一定的线索
You should compare maxraftstate to persister.RaftStateSize(). Whenever your key/value server detects that the Raft state size is approaching this threshold, it should save a snapshot, and tell the Raft library that it has snapshotted, so that Raft can discard old log entries. If maxraftstate is -1, you do not have to snapshot.
![](https://img.haomeiwen.com/i10803273/b6aee04c0c47e4c8.png)
所以日志瘦身的步骤大概是
- 当SERVER端发现 当前的RAFT STATE SIZE 接近 max raft state 的时候 触发do snap shot
- do snap shot 就是把这个INDEX前的日志截断,更新LAST INCULDED INDEX ,持久化到RAFT STATE
- 把 SERVER那边的SNAPSHOT 持久化进SNAPSHOT
- 更新RAFT的代码,保证下标的准确性(因为LOG被截断过,所以原INDEX需要减去LAST INCULDED INDEX, 才是新的LOG数组的下标)
- KV SERVER 启动的时候 需要载入SNAPSHOT
第三步 实现TASK1 DoSnapShot
任务目标
Your raft.go probably keeps the entire log in a Go slice. Modify it so that it can be given a log index, discard the entries before that index, and continue operating while storing only log entries after that index. Make sure you pass all the Raft tests after making these changes.
首先KV这边要把他的状态传到RAFT这边,因为发起这个DO SNAPSHOT的请求同时,可能会有新进来的LOG,所以要把这个SNAPSHOT所属的INDEX也发过来。RAFT接受到后
-
首先建立一个新的LOG数组,值保留这个INDEX之后的LOG
-
把LOG数组的指针指向新的数组
-
把新的RAFT STATE(currentTerm, voteFor, Log[])存下来
-
把KVSERVER的 SNAPSHOT存下来。同时要保存LAST INDEX和这个INDEX的TERM
image.png
![](https://img.haomeiwen.com/i10803273/0d27e2fbd86dec07.png)
![](https://img.haomeiwen.com/i10803273/f3213389186f7a01.png)
![](https://img.haomeiwen.com/i10803273/aacd0a5cf5a52fcf.png)
第四步 实现task 2 KV SERVER端的调用
Modify your kvserver so that it detects when the persisted Raft state grows too large, and then hands a snapshot to Raft and tells Raft that it can discard old log entries. Raft should save each snapshot with persister.SaveStateAndSnapshot() (don't use files). A kvserver instance should restore the snapshot from the persister when it re-starts.
![](https://img.haomeiwen.com/i10803273/d9bf5469ee4cbdac.png)
![](https://img.haomeiwen.com/i10803273/17ae19233798ee40.png)
![](https://img.haomeiwen.com/i10803273/0bd9067b14deb8ad.png)
![](https://img.haomeiwen.com/i10803273/3021765d5f7a052e.png)
第五步 实现下标的更新
原代码有很多处要更新,就是原来rf.log[i]
现在都调用这个方法rf.getLog(i)
原来的len(rf.log)
改成 rf.logLen()
还有些slice的地方也要自己注意修改,加上偏移量。
![](https://img.haomeiwen.com/i10803273/ed3a59fa6e3bbf83.png)
![](https://img.haomeiwen.com/i10803273/16e2579410b9cd47.png)
![](https://img.haomeiwen.com/i10803273/95a5fb3ac785d3c1.png)
注意有些0,要改成rf.lastIncludedIndex
,如下图
![](https://img.haomeiwen.com/i10803273/92b4d45f9da4b309.png)
第六步 测试
改完之后,重测raft的所有test
在kvserver 3A的test里定义一个CONST,随后把所有maxraftstate
的值改成150
![](https://img.haomeiwen.com/i10803273/c3b45eb130493db5.png)
发现第一个测试 可以过,加个log看下效果。
![](https://img.haomeiwen.com/i10803273/49d587501c05befb.png)
![](https://img.haomeiwen.com/i10803273/d80e100af4b9c8eb.png)
但是测第2个并发的时候 block住了。
进一步测试,发现只要在这个后台进程里调用了RAFT的LOCK,就会阻塞。
![](https://img.haomeiwen.com/i10803273/c04db513536eee6d.png)
但是这边只有一把锁,我一开始怎么也想不通为啥会阻塞,因为一把锁是不会死锁的。
然后我封装了加锁的逻辑,在每一处拿锁的地方打上LOG
func (rf *Raft)Unlock() {
if rf.me == 0 {
log.Println(rf.me, " unlock ", MyCaller())
}
rf.mu.Unlock()
}
func (rf *Raft)Lock(tag... string) {
tg := ""
if len(tag) > 0{
tg = tag[0]
}
if rf.me == 0 {
log.Println(rf.me, " want lock", MyCaller(), tg)
}
rf.mu.Lock()
if rf.me == 0 {
log.Println(rf.me, " lock", MyCaller(),tg)
}
}
func getFrame(skipFrames int) runtime.Frame {
// We need the frame at index skipFrames+2, since we never want runtime.Callers and getFrame
targetFrameIndex := skipFrames + 2
// Set size to targetFrameIndex+2 to ensure we have room for one more caller than we need
programCounters := make([]uintptr, targetFrameIndex+2)
n := runtime.Callers(0, programCounters)
frame := runtime.Frame{Function: "unknown"}
if n > 0 {
frames := runtime.CallersFrames(programCounters[:n])
for more, frameIndex := true, 0; more && frameIndex <= targetFrameIndex; frameIndex++ {
var frameCandidate runtime.Frame
frameCandidate, more = frames.Next()
if frameIndex == targetFrameIndex {
frame = frameCandidate
}
}
}
return frame
}
// MyCaller returns the caller of the function that called it :)
func MyCaller() string {
// Skip GetCallerFunctionName and the function to get the caller of
return getFrame(2).Function
}
随后定位到在LEADER 在APPEND LOG的时候拿了一把锁就没有还了。
![](https://img.haomeiwen.com/i10803273/31d1b66162e108e3.png)
为了确保一定会UNLOCK,我把LOCK和UNLOCK的逻辑抽到最上层用DEFER来写
但依然是这样,不得已,我加了一些输出,看走到哪一步后面就阻塞了。
![](https://img.haomeiwen.com/i10803273/80c47a383eb55734.png)
![](https://img.haomeiwen.com/i10803273/4a24ca194dbb3456.png)
根据LOG 有4 没6
所以怀疑是在SUCCESS里面,发现问题。
原因是在
rf.updateCommitIndex()
里面调用了
rf.updateLastApplied()
里面会往APPLY CH发消息,会阻塞,如果那边没人拿消息的话
![](https://img.haomeiwen.com/i10803273/ccc23550f449e177.png)
所以最终结果是一个阻塞队列 和一把锁引起的死锁。
解决方案有2个,修改RAFT的锁逻辑。把APPLYCH 移出锁的范围。
第二个在APPLICATION层的接受APPLYCH的GOROUTINE需要调用RAFT的锁都单独开个GOROUTINE去做,不要阻塞收APPLY MSG的工作。
这边选择了第一种。(之后发现重跑1000次RAFT的测试,会出现1次 LOG顺序加载不一致的情况)
![](https://img.haomeiwen.com/i10803273/e8c32ef0e9894878.png)
![](https://img.haomeiwen.com/i10803273/098fc300c78b16ea.png)
所以改成了第2种。
![](https://img.haomeiwen.com/i10803273/8e1c59c41d34dd0a.png)
修改之后,会到第3个测试fail掉
![](https://img.haomeiwen.com/i10803273/00660b828392cfd3.png)
在append entry 的时候,出现数组越界,加个log看一下
![](https://img.haomeiwen.com/i10803273/65d9f0cdc9d57b69.png)
![](https://img.haomeiwen.com/i10803273/5dadca132ad843a1.png)
大概是发生了,leader做了snapshot,此时还在给follower 发log,随后更新了lastIncludedIndex, 和log,这个时候旧的log还没发过去就被compact。
此时就需要用install snapshot rpc来解决了。
第7步 实现 task 3 install snapshot RPC struct and sender
Modify your Raft leader code to send an InstallSnapshot RPC to a follower when the leader has discarded the log entries the follower needs. When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg. Your solution is complete when it passes all of the Lab 3 tests.
重读论文第7章,FIRGURE 13。
结合HINT
![](https://img.haomeiwen.com/i10803273/81d617da7dca7c9c.png)
You should send the entire snapshot in a single InstallSnapshot RPC. You do not have to implement Figure 13's offset mechanism for splitting up the snapshot.
![](https://img.haomeiwen.com/i10803273/9eb5aa728141ae96.png)
第8步 实现 task 3 install snapshot RPC Handler
这边如果SNAPSHOT需要更新,需要通过APPLY MSG去和KV SERVER交互,所以要定义一个SNAPSHOT的APPLY MSG。如果是的话,就从里面拿到SNAPSHOT 去更新自己的DB 和CID2SEQ
When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg.
![](https://img.haomeiwen.com/i10803273/d2904ef904a8d854.png)
![](https://img.haomeiwen.com/i10803273/be39d06ae2aee40f.png)
![](https://img.haomeiwen.com/i10803273/37f5502c228a413a.png)
第9步 实现LOG被压缩了,走不同分支
Raft Leader在发送心跳的时候发现和Follower的log不匹配的点,在它现有的log中不存在(已经被存入snapshot),Leader就需要把AppendEntries RPC变为InstallSnapshot RPC,同时附带心跳效果。
.However, an exceptionallyslow follower or a new server joining the cluster (Section 6) would not. The way to bring such a follower up-to-date is for the leader to send it a snapshot over the network.
The leader uses a new RPC called InstallSnapshot to
send snapshots to followers that are too far behind;
![](https://img.haomeiwen.com/i10803273/82dd9c362150c5af.png)
![](https://img.haomeiwen.com/i10803273/e8a42b613e0ed10f.png)
![](https://img.haomeiwen.com/i10803273/475b6710d7d88a71.png)
第10步 添加KVSERVER更新SNAPSHOT逻辑
![](https://img.haomeiwen.com/i10803273/e59ee8f4260b3400.png)
第11步 测试
按照HINT 先从
Make sure you pass TestSnapshotRPC before moving on to the other Snapshot tests.
![](https://img.haomeiwen.com/i10803273/7b5f2a06d1e9ca5e.png)
原先的UNRELIABLE过不去的,也过了
![](https://img.haomeiwen.com/i10803273/6919202c92943d9a.png)
BUG 1 测试3B第一个会BLOCK
也是经过一阵打LOG 研究(消耗了4个小时),发现了这个问题。
![](https://img.haomeiwen.com/i10803273/5397fc3d518e1891.png)
论文里说
Raft also includes a small amount of metadata
in the snapshot: the last included index is the index of the
last entry in the log that the snapshot replaces (the last entry the state machine had applied), and the last included
term is the term of this entry. These are preserved to support the AppendEntries consistency check for the first log
entry following the snapshot, since that entry needs a previous log index and term.
所以我这边不能把最后的抹成空的term,需要把最后一个term也记进去。不然leader之后发送pre log term 永远是0,然后follower就会一直reply false。因为//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
正确写法
![](https://img.haomeiwen.com/i10803273/4b5ed50bab511fba.png)
过了3B第一个RPC的测试之后,开始测全部3B
![](https://img.haomeiwen.com/i10803273/cac9116fe96fc869.png)
发现有数组越界 去做检查
![](https://img.haomeiwen.com/i10803273/247cdb583b1e4cd0.png)
BUG 2
怀疑是有地方没有更新LAST APPLIED,造成的。
![](https://img.haomeiwen.com/i10803273/7f16e4e4aeece61d.png)
看了下这个TEST的属性,是1个CLIENT,但是会CRASH,怀疑是生成SNAPSHOT之后,CRASH恢复回来,没有更新LAST APPLIED 和 COMMIT INDEX
全局搜更新的地方
发我只在InstallSnapshot RPC Handler 里加了更新。
没有在READ PERSIST里加
![](https://img.haomeiwen.com/i10803273/a879c55a356d3f18.png)
这次 测试全过了
![](https://img.haomeiwen.com/i10803273/f0f9babecaa86eb9.png)
但是项目要求
A reasonable amount of time to take for the Lab 3 tests is 400 seconds of real time and 700 seconds of CPU time. Further, go test -run TestSnapshotSize should take less than 20 seconds of real time.
第12步 优化性能
因为最慢的是第二个测试 去研究下为啥他那么慢
在测试里打了很多LOG
![](https://img.haomeiwen.com/i10803273/ec5ae8c7c21a81ea.png)
因为这个循环有200次,一秒只能跑3次左右。所以要一分多钟。而网站给的跑测试的CASE里,这个CASE是1秒内就就解决了。
![](https://img.haomeiwen.com/i10803273/77d66c1d89b37aba.png)
![](https://img.haomeiwen.com/i10803273/c63136552bc4c169.png)
这里面的操作就是在PUT,而PUT是阻塞的,他必须等到LEADER把这个条目COMMIT了并且APPLIED出去,PUT才能返回。那么关键就是什么时候会COMMIT呢
仔细分析,COMMIT是在大多数NODE都把这个条目加到自己的LOG,更新了COMMIT INDEX随后告诉SERVER。LEADER才会做APPLY。
那么问题就是这个操作只会发生在LEADER发起APPEND LOG RPC的时候,根据RAFT的代码,是每100MS发起一次。这就是原因。
那要让PUT尽可能的快,就需要告诉LEADER加一条LOG,立刻去触发这个START APPEND LOG
![](https://img.haomeiwen.com/i10803273/bab204aceee02013.png)
我们可以看到前2个测试飞快就跑完了。
![](https://img.haomeiwen.com/i10803273/0ea9e366f2588d63.png)
第四个测试抛了数组越界,从下面这行代码
![](https://img.haomeiwen.com/i10803273/d77cdaf87bee5d69.png)
打了LOG,看到
lastApplied
,小于了last included index
猜测是append log rpc频率高了之后,这边的频率也会加高,先加个边界保护,因为
last included index
是LOG的基点,这个改动不破坏正确性。![](https://img.haomeiwen.com/i10803273/d5032bd9869eb2d8.png)
BUG 3 第5个测试阻塞
又是之前检测死锁,对LOCK,UNLOCK加LOG大法,发现死锁出现在APPEND ENTRY RPC HANDLER 或者 出现在 INSTALL SNAPSHOT RPC HANDLER。
这2个RPC HANDLER最后都对往APPLY CH里发消息。怀疑又是接受消息的后台函数堵塞。
阅读了代码,怀疑是下面这个。
![](https://img.haomeiwen.com/i10803273/12e1f377eccf1a0e.png)
![](https://img.haomeiwen.com/i10803273/fc00459293629e6a.png)
![](https://img.haomeiwen.com/i10803273/4f3fd88917a43db9.png)
根据STUDENT GUIDE也说到这点
Re-appearing indices: Say that your Raft library has some method Start() that takes a command, and return the index at which that command was placed in the log (so that you know when to return to the client, as discussed above). You might assume that you will never see Start() return the same index twice, or at the very least, that if you see the same index again, the command that first returned that index must have failed. It turns out that neither of these things are true, even if no servers crash.
解决方案,新的替代旧的SEND
![](https://img.haomeiwen.com/i10803273/99ba15bc86b5104a.png)
TEST RACE 测试成功
zyx@zyx-virtual-machine:~/Desktop/mit6824/mit6.824/src/kvraft$ go test -race
Test: snapshots, one client (3A) ...
... Passed -- 15.0 5 18675 3525
Test: snapshots, many clients (3A) ...
... Passed -- 15.1 5 21955 4067
Test: unreliable net, snapshots, many clients (3A) ...
... Passed -- 16.3 5 10669 1461
Test: concurrent append to same key, unreliable (3A) ...
... Passed -- 1.0 3 276 52
Test: progress in majority (3A) ...
... Passed -- 0.4 5 54 2
Test: no progress in minority (3A) ...
... Passed -- 1.0 5 127 3
Test: completion after heal (3A) ...
... Passed -- 1.0 5 60 3
Test: partitions, one client (3A) ...
... Passed -- 23.3 5 5696 775
Test: partitions, many clients (3A) ...
... Passed -- 22.8 5 10947 1005
Test: restarts, one client (3A) ...
... Passed -- 20.7 5 6857 952
Test: restarts, many clients (3A) ...
... Passed -- 20.2 5 11353 1263
Test: unreliable net, restarts, many clients (3A) ...
... Passed -- 21.4 5 8034 969
Test: restarts, partitions, many clients (3A) ...
... Passed -- 27.4 5 11132 1028
Test: unreliable net, restarts, partitions, many clients (3A) ...
... Passed -- 28.3 5 6883 695
Test: unreliable net, restarts, partitions, many clients, linearizability checks (3A) ...
... Passed -- 25.9 7 12984 649
Test: InstallSnapshot RPC (3B) ...
... Passed -- 2.8 3 704 63
Test: snapshot size is reasonable (3B) ...
... Passed -- 3.2 3 2869 800
Test: restarts, snapshots, one client (3B) ...
... Passed -- 19.1 5 18039 3209
Test: restarts, snapshots, many clients (3B) ...
... Passed -- 19.8 5 19701 2804
Test: unreliable net, snapshots, many clients (3B) ...
... Passed -- 15.8 5 11034 1533
Test: unreliable net, restarts, snapshots, many clients (3B) ...
... Passed -- 20.1 5 11658 1490
Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
... Passed -- 27.5 5 8853 1021
Test: unreliable net, restarts, partitions, snapshots, many clients, linearizability checks (3B) ...
... Passed -- 25.9 7 24348 1862
PASS
ok kvraft 375.658s
测试性能
![](https://img.haomeiwen.com/i10803273/67927cd4c05baf3d.png)
测500次 发现一些不是必然出现的BUG
BUG 1
![](https://img.haomeiwen.com/i10803273/96fa1e3a09900f92.png)
通过在更新NEXT INDEX数组的地方加LOG。定位有时会超过LOG的LEN。
修复方式
![](https://img.haomeiwen.com/i10803273/59370fa75e414931.png)
BUG 2
来源于3A系列
![](https://img.haomeiwen.com/i10803273/44d55237edd7125b.png)
DEBUG思路如下,因为之前新代码跑过1000次RAFT的回归测试,排除掉RAFT有问题。
而且3A系列又不涉及INSTALL SNAPSHOT(因为LAB2的测试COVER不到SNAPSHOT这块)
大概率 怀疑CLIENT SERVER端肯定有地方没写对。
仔细过了一遍代码思考每步有什么漏洞,加上一些LOG,发现比较OP的时候,没有用全,比如说GET,只要GET的KEY一样,OP很容易一样。
于是对每一个GET操作,用CLIENT端的NRAND函数,去生成一个唯一码
![](https://img.haomeiwen.com/i10803273/8dcd4e8e293ee55e.png)
同时更新EQUAL OP
![](https://img.haomeiwen.com/i10803273/c65b8e3925488cae.png)
单独测试上面这个失败的CASE,500次发现没有问题了。
单独测试全部的3A 测试集 200次,没有任何异常。
BUG3
测试整个LAB3, 在3B发现问题
来源于3B系列
![](https://img.haomeiwen.com/i10803273/6ffa4f225f428817.png)
这个错首先能明确CLIENT SERVER端代码好的(不然3A会有问题),RAFT基础好的。
3B出错,大概率代码错在INSTALL SNAPSHOT 这些改动上。
因为破坏的是线性一致性,也就是APPLY LOG不一致了。那么造成问题的一定是并行的(就会造成随机的)APPLY LOG,或者LAST APPLY 或 COMMIT INDEX这2个变量的更新有BUG。
按照这个思路也是阅读代码,排除了前者,因为APPLY LOG我的代码都严格的被锁保护了。
搜索
rf.commitIndex =
的代码发现之前我们曾经加过
![](https://img.haomeiwen.com/i10803273/b456458346804091.png)
随后在install snapshot rpc handler的地方代码改成和上面一样,通过打LOG也确实会发生,COMMIT INDEX 比 LAST INCLUDED INDEX大的情况。
可是依然线性一致性的检测有错,排查了很久。
发现,因为如果LAST APPLY已经超过 LAST INCLUDED INDEX 的情况下,KV SERVER端的DB 状态是比SNAPSHOT 更新的。
所以还要加上如下代码
![](https://img.haomeiwen.com/i10803273/45ad3af1cde92a50.png)
测试SHELL 脚本
export GOPATH="/home/zyx/Desktop/mit6824/mit6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
for ((c = $((i*10)); c < $(( (i+1)*10)); c++))
do
(go test -run TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable3B) &> ./res/$c &
done
sleep 40
if grep -nr "FAIL.*raft.*" res; then
echo "fail"
fi
done
测试500次通过
![](https://img.haomeiwen.com/i10803273/fc93b5075a5274d8.png)
下面对代码做一些CONCISE。
Client
package raftkv
import (
"labrpc"
)
import "crypto/rand"
import "math/big"
type Clerk struct {
servers []*labrpc.ClientEnd
lastLeader int
id int64
seqNum int
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.id = Nrand()//give each client a unique identifier, and then have them
ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
ck.lastLeader = 0
return ck
}
func (ck *Clerk) Get(key string) string {
index := ck.lastLeader
for {
args := GetArgs{key}
reply := GetReply{}
ok := ck.servers[index].Call("KVServer.Get", &args, &reply)
if ok && !reply.WrongLeader {
ck.lastLeader = index
return reply.Value
}
index = (index + 1) % len(ck.servers)
time.Sleep(time.Duration(100)*time.Millisecond)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
index := ck.lastLeader
args := PutAppendArgs{key, value, op, ck.id, ck.seqNum}
ck.seqNum++
for {
reply := PutAppendReply{}
ok := ck.servers[index].Call("KVServer.PutAppend", &args, &reply)
if ok && !reply.WrongLeader {
ck.lastLeader = index
return
}
index = (index + 1) % len(ck.servers)
time.Sleep(time.Duration(100)*time.Millisecond)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
server
package raftkv
import (
"bytes"
"labgob"
"labrpc"
"log"
"raft"
"strconv"
"sync"
"time"
)
type Op struct {
OpType string "operation type(eg. put/append)"
Key string
Value string
Cid int64
SeqNum int
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
maxraftstate int // snapshot if log grows this big
timeout time.Duration
persist *raft.Persister
db map[string]string
chMap map[int]chan Op
cid2Seq map[int64]int
killCh chan bool
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
//from hint: A simple solution is to enter every Get() (as well as each Put() and Append()) in the Raft log.
originOp := Op{"Get",args.Key,strconv.FormatInt(Nrand(),10),0,0}
reply.WrongLeader = true
index,_,isLeader := kv.rf.Start(originOp)
if !isLeader {return}
ch := kv.putIfAbsent(index)
op := beNotified(ch)
if equalOp(op,originOp) {
reply.WrongLeader = false
kv.mu.Lock()
reply.Value = kv.db[op.Key]
kv.mu.Unlock()
}
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
reply.WrongLeader = true
index,_,isLeader := kv.rf.Start(originOp)
if !isLeader {return}
ch := kv.putIfAbsent(index)
op := beNotified(ch)
if equalOp(originOp,op) {
reply.WrongLeader = false
}
}
func beNotified(ch chan Op) Op{
select {
case notifyArg := <- ch :
return notifyArg
case <- time.After(time.Duration(600)*time.Millisecond):
return Op{}
}
}
func (kv *KVServer) putIfAbsent(idx int) chan Op{
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.chMap[idx]; !ok {
kv.chMap[idx] = make(chan Op,1)
}
return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
return a.Key == b.Key && a.Value == b.Value && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}
func (kv *KVServer) Kill() {
kv.rf.Kill()
kv.killCh <- true
}
func (kv *KVServer) readSnapShot(snapshot []byte) {
kv.mu.Lock()
defer kv.mu.Unlock()
if snapshot == nil || len(snapshot) < 1 {return}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var db map[string]string
var cid2Seq map[int64]int
if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil {
log.Fatal("readSnapShot ERROR for server %v",kv.me)
} else {
kv.db, kv.cid2Seq = db, cid2Seq
}
}
func (kv *KVServer) needSnapShot() bool {
kv.mu.Lock()
defer kv.mu.Unlock()
threshold := 10
return kv.maxraftstate > 0 &&
kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}
func (kv *KVServer) doSnapShot(index int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
kv.mu.Lock()
e.Encode(kv.db)
e.Encode(kv.cid2Seq)
kv.mu.Unlock()
kv.rf.DoSnapShot(index,w.Bytes())
}
func send(notifyCh chan Op,op Op) {
select{
case <-notifyCh:
default:
}
notifyCh <- op
}
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
kv.persist = persister
// You may need initialization code here.
kv.db = make(map[string]string)
kv.chMap = make(map[int]chan Op)
kv.cid2Seq = make(map[int64]int)
kv.readSnapShot(kv.persist.ReadSnapshot())
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.killCh = make(chan bool,1)
go func() {
for {
select {
case <- kv.killCh:
return
case applyMsg := <- kv.applyCh:
if !applyMsg.CommandValid {
kv.readSnapShot(applyMsg.SnapShot)
continue
}
op := applyMsg.Command.(Op)
kv.mu.Lock()
maxSeq,found := kv.cid2Seq[op.Cid]
if !found || op.SeqNum > maxSeq {
switch op.OpType {
case "Put":
kv.db[op.Key] = op.Value
case "Append":
kv.db[op.Key] += op.Value
}
kv.cid2Seq[op.Cid] = op.SeqNum
}
kv.mu.Unlock()
notifyCh := kv.putIfAbsent(applyMsg.CommandIndex)
if kv.needSnapShot() {
go kv.doSnapShot(applyMsg.CommandIndex)
}
send(notifyCh,op)
}
}
}()
return kv
}
新的RAFT代码
package raft
import (
"bytes"
"labgob"
"log"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
)
import "labrpc"
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
SnapShot []byte
}
type State int
const (
Follower State = iota // value --> 0
Candidate // value --> 1
Leader // value --> 2
)
const NULL int = -1
type Log struct {
Term int "term when entry was received by leader"
Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// state a Raft server must maintain.
state State
//Persistent state on all servers:(Updated on stable storage before responding to RPCs)
currentTerm int "latest term server has seen (initialized to 0 increases monotonically)"
votedFor int "candidateId that received vote in current term (or null if none)"
log []Log "log entries;(first index is 1)"
//log compaction
lastIncludedIndex int "the snapshot replaces all entries up through and including this index"
lastIncludedTerm int "term of lastIncludedIndex"
//Volatile state on all servers:
commitIndex int "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
lastApplied int "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"
//Volatile state on leaders:(Reinitialized after election)
nextIndex []int "for each server,index of the next log entry to send to that server"
matchIndex []int "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"
//channel
applyCh chan ApplyMsg // from Make()
killCh chan bool //for Kill()
//handle rpc
voteCh chan bool
appendLogCh chan bool
}
// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = (rf.state == Leader)
return term, isleader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
rf.persister.SaveRaftState(rf.encodeRaftState())
}
func (rf *Raft) encodeRaftState() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
return w.Bytes()
}
func (rf *Raft) persistWithSnapShot(snapshot []byte) {
rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(),snapshot)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var voteFor int
var clog []Log
var lastIncludedIndex int
var lastIncludedTerm int
if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil ||
d.Decode(&lastIncludedIndex) != nil || d.Decode(&lastIncludedTerm) != nil{
log.Fatal("readPersist ERROR for server %v",rf.me)
} else {
rf.mu.Lock()
rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
rf.lastIncludedTerm, rf.lastIncludedIndex = lastIncludedTerm, lastIncludedIndex
rf.commitIndex, rf.lastApplied = rf.lastIncludedIndex, rf.lastIncludedIndex
rf.mu.Unlock()
}
}
// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
Term int "candidate’s term"
CandidateId int "candidate requesting vote"
LastLogIndex int "index of candidate’s last log entry (§5.4)"
LastLogTerm int "term of candidate’s last log entry (§5.4)"
}
// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
Term int "currentTerm, for candidate to update itself"
VoteGranted bool "true means candidate received vote"
}
//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm {//all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
// Reply false if term < currentTerm (§5.1) If votedFor is not null and not candidateId,
} else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
//If the logs have last entries with different terms, then the log with the later term is more up-to-date.
// If the logs end with the same term, then whichever log is longer is more up-to-date.
// Reply false if candidate’s log is at least as up-to-date as receiver’s log
} else {
//grant vote
rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.state = Follower
rf.persist()
send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up
}
}
////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
type AppendEntriesArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
PrevLogIndex int "index of log entry immediately preceding new ones"
PrevLogTerm int "term of prevLogIndex entry"
Entries []Log "log entries to store (empty for heartbeat;may send more than one for efficiency)"
LeaderCommit int "leader’s commitIndex"
}
type AppendEntriesReply struct {
Term int "currentTerm, for leader to update itself"
Success bool "true if follower contained entry matching prevLogIndex and prevLogTerm"
ConflictIndex int "the first index it stores for that conflict term"
ConflictTerm int "the term of the conflicting entry"
}
//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
reply.Term = rf.currentTerm
reply.Success = false
reply.ConflictTerm = NULL
reply.ConflictIndex = 0
//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
prevLogIndexTerm := -1
logSize := rf.logLen()
if args.PrevLogIndex >= rf.lastIncludedIndex && args.PrevLogIndex < rf.logLen() {
prevLogIndexTerm = rf.getLog(args.PrevLogIndex).Term
}
if prevLogIndexTerm != args.PrevLogTerm {
reply.ConflictIndex = logSize
if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
//it should return with conflictIndex = len(log) and conflictTerm = None.
} else { //If a follower does have prevLogIndex in its log, but the term does not match
reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
i := rf.lastIncludedIndex
for ; i < logSize; i++ {//and then search its log for
if rf.getLog(i).Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
reply.ConflictIndex = i
break
}
}
}
return
}
//2. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {return}
index := args.PrevLogIndex
for i := 0; i < len(args.Entries); i++ {
index++
if index < logSize {
if rf.getLog(index).Term == args.Entries[i].Term {
continue
} else {//3. If an existing entry conflicts with a new one (same index but different terms),
rf.log = rf.log[:index - rf.lastIncludedIndex]//delete the existing entry and all that follow it (§5.3)
}
}
rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
rf.persist()
break;
}
//5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
rf.updateLastApplied()
}
reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
//InstallSnapshot RPC
type InstallSnapshotArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
LastIncludedIndex int "the snapshot replaces all entries up through and including this index"
LastIncludedTerm int "term of lastIncludedIndex"
Data []byte "raw bytes of the snapshot chunk, starting at offset"
}
type InstallSnapshotReply struct {
Term int "currentTerm, for leader to update itself"
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm { //Reply immediately if term < currentTerm
return
}
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
if args.LastIncludedIndex <= rf.lastIncludedIndex {// discard any existing or partial snapshot with a smaller index
return
}
applyMsg := ApplyMsg{CommandValid: false, SnapShot: args.Data}
//If existing log entry has same index and term as snapshot’s last included entry,retain log entries following it and reply
if args.LastIncludedIndex < rf.logLen()-1 {
rf.log = append(make([]Log,0),rf.log[args.LastIncludedIndex -rf.lastIncludedIndex:]...)
}else {//7. Discard the entire log
rf.log = []Log{{args.LastIncludedTerm, nil},}
}
//Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
rf.lastIncludedIndex, rf.lastIncludedTerm = args.LastIncludedIndex, args.LastIncludedTerm
rf.persistWithSnapShot(args.Data)
rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
rf.lastApplied = Max(rf.lastApplied, rf.lastIncludedIndex)
if rf.lastApplied > rf.lastIncludedIndex {return} //snapshot is older than kvserver's db, so reply immediately
rf.applyCh <- applyMsg
}
func (rf *Raft) sendSnapshot(server int) {
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LastIncludedIndex: rf.lastIncludedIndex,
LastIncludedTerm: rf.lastIncludedTerm,
LeaderId: rf.me,
Data: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()
reply := InstallSnapshotReply{}
ret := rf.sendInstallSnapshot(server,&args,&reply)
rf.mu.Lock();
defer rf.mu.Unlock()
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
return
}
if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
return
}
rf.updateNextMatchIdx(server,rf.lastIncludedIndex)
}
//Leader Section:
func (rf *Raft) startAppendLog() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
for {
rf.mu.Lock();
if rf.state != Leader {
rf.mu.Unlock()
return
} //send initial empty AppendEntries RPCs (heartbeat) to each server
if rf.nextIndex[idx]-rf.lastIncludedIndex < 1 { //The leader uses a new RPC called InstallSnapshot to
rf.sendSnapshot(idx) // followers that are too far behind
return
}
args := AppendEntriesArgs{
rf.currentTerm,
rf.me,
rf.getPrevLogIdx(idx),
rf.getPrevLogTerm(idx),
//If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
//nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
append(make([]Log, 0), rf.log[rf.nextIndex[idx]-rf.lastIncludedIndex:]...),
rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
ret := rf.sendAppendEntries(idx, &args, reply)
rf.mu.Lock();
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm { //all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
rf.mu.Unlock()
return
}
if reply.Success { //If successful:update nextIndex and matchIndex for follower
rf.updateNextMatchIdx(idx, args.PrevLogIndex+len(args.Entries))
rf.mu.Unlock()
return
} else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
tarIndex := reply.ConflictIndex //If it does not find an entry with that term
if reply.ConflictTerm != NULL {
logSize := rf.logLen() //first search its log for conflictTerm
for i := rf.lastIncludedIndex; i < logSize; i++ { //if it finds an entry in its log with that term,
if rf.getLog(i).Term != reply.ConflictTerm {
continue
}
for i < logSize && rf.getLog(i).Term == reply.ConflictTerm {
i++
} //set nextIndex to be the one
tarIndex = i //beyond the index of the last entry in that term in its log
}
}
rf.nextIndex[idx] = Min(rf.logLen(),tarIndex);
rf.mu.Unlock()
}
}
}(i)
}
}
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := rf.currentTerm
isLeader := (rf.state == Leader)
//If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
if isLeader {
index = rf.getLastLogIdx() + 1
newLog := Log{
rf.currentTerm,
command,
}
rf.log = append(rf.log,newLog)
rf.persist()
rf.startAppendLog()
}
return index, term, isLeader
}
//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
rf.matchIndex[rf.me] = rf.logLen() - 1
copyMatchIndex := make([]int,len(rf.matchIndex))
copy(copyMatchIndex,rf.matchIndex)
sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
N := copyMatchIndex[len(copyMatchIndex)/2]
if N > rf.commitIndex && rf.getLog(N).Term == rf.currentTerm {
rf.commitIndex = N
rf.updateLastApplied()
}
}
func (rf *Raft) beLeader() {
if rf.state != Candidate {
return
}
rf.state = Leader
//initialize leader data
rf.nextIndex = make([]int,len(rf.peers))
rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
rf.nextIndex[i] = rf.getLastLogIdx() + 1
}
}
//end Leader section
//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
rf.state = Candidate
rf.currentTerm++ //Increment currentTerm
rf.votedFor = rf.me //vote myself first
rf.persist()
//ask for other's vote
go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
rf.mu.Lock()
args := RequestVoteArgs{
rf.currentTerm,
rf.me,
rf.getLastLogIdx(),
rf.getLastLogTerm(),
};
rf.mu.Unlock()
var votes int32 = 1;
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
reply := &RequestVoteReply{}
ret := rf.sendRequestVote(idx,&args,reply)
if ret {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
return
}
if rf.state != Candidate || rf.currentTerm != args.Term{
return
}
if reply.VoteGranted {
atomic.AddInt32(&votes,1)
} //If votes received from majority of servers: become leader
if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
rf.beLeader()
rf.startAppendLog()
send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
}
}
}(i)
}
}
//end Candidate section
//Follower Section:
func (rf *Raft) beFollower(term int) {
rf.state = Follower
rf.votedFor = NULL
rf.currentTerm = term
rf.persist()
}
//end Follower section
//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
rf.lastApplied = Max(rf.lastApplied,rf.lastIncludedIndex)
rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
curLog := rf.getLog(rf.lastApplied)
applyMsg := ApplyMsg{true, curLog.Command, rf.lastApplied, nil,}
rf.applyCh <- applyMsg
}
}
//log compaction:
func (rf *Raft) DoSnapShot(curIdx int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
if curIdx <= rf.lastIncludedIndex {return}
//update last included index & term
rf.log = append(make([]Log,0), rf.log[curIdx-rf.lastIncludedIndex:]...)
rf.lastIncludedIndex = curIdx
rf.lastIncludedTerm = rf.getLog(curIdx).Term
rf.persistWithSnapShot(snapshot)
}
// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
send(rf.killCh)
}
//Helper function
func send(ch chan bool) {
select {
case <-ch: //if already set, consume it then resent to avoid block
default:
}
ch <- true
}
func (rf *Raft) getLog(i int) Log {
return rf.log[i - rf.lastIncludedIndex]
}
func (rf *Raft) logLen() int {
return len(rf.log) + rf.lastIncludedIndex
}
func (rf *Raft) getPrevLogIdx(i int) int {
return rf.nextIndex[i] - 1
}
func (rf *Raft) getPrevLogTerm(i int) int {
prevLogIdx := rf.getPrevLogIdx(i)
if prevLogIdx < rf.lastIncludedIndex {
return -1
}
return rf.getLog(prevLogIdx).Term
}
func (rf *Raft) getLastLogIdx() int {
return rf.logLen() - 1
}
func (rf *Raft) getLastLogTerm() int {
idx := rf.getLastLogIdx()
if idx < rf.lastIncludedIndex {
return -1
}
return rf.getLog(idx).Term
}
func (rf *Raft) updateNextMatchIdx(server int, matchIdx int) {
rf.matchIndex[server] = matchIdx
rf.nextIndex[server] = matchIdx + 1
rf.updateCommitIndex()
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = NULL
rf.log = make([]Log,1) //(first index is 1)
rf.commitIndex = 0
rf.lastApplied = 0
rf.applyCh = applyCh
//because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
rf.voteCh = make(chan bool,1)
rf.appendLogCh = make(chan bool,1)
rf.killCh = make(chan bool,1)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
//because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
heartbeatTime := time.Duration(100) * time.Millisecond
//from hint :You'll need to write code that takes actions periodically or after delays in time.
// The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
go func() {
for {
select {
case <-rf.killCh:
return
default:
}
electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
switch state {
case Follower, Candidate:
select {
case <-rf.voteCh:
case <-rf.appendLogCh:
case <-time.After(electionTime):
rf.mu.Lock()
rf.beCandidate() //becandidate, Reset election timer, then start election
rf.mu.Unlock()
}
case Leader:
time.Sleep(heartbeatTime)
rf.startAppendLog()
}
}
}()
return rf
}
网友评论