我们已经在raft-example看过了对Propose的简单处理了,但是真正的etcd对Propose的处理更加复杂。主要是有如下几个点:
- consistent index。用于处理boltdb和raftlog之间的幂等性。
- 同步返回。由于raft的log复制是异步的,如何做到同步返回结果。
当blotdb用作状态机的时候,wal和blotdb作为两个不同的实体,很有可能存在不一致的情况。所以etcd在blotdb中存储一条记录consistent-index,来代表已经apply到blot-db上成功的log index,这样当根据wal恢复blot-db的时候,就可以判断log index是不是已经被apply过。
处理过程
在etcd-server中,一条propose的处理过程:
首先是为每一条请求注册一个唯一requestID,然后register并等待requestID处理完成
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, ErrTooManyRequests
}
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
id := r.ID
if id == 0 {
id = r.Header.ID
}
ch := s.w.Register(id)
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
select {
case x := <-ch:
return x.(*applyResult), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:
return nil, ErrStopped
}
}
请求转发到raft-node处理
根据之前的知识,raft-node经过一番处理之后,交给上层的ready结构来处理,首先msg转发到leader,然后leader调用processMsg,这里主要的操作是copy log到follower。
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
if !raft.IsEmptySnap(rd.Snapshot) {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}
if err := r.storage.Release(rd.Snapshot); err != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
// gofail: var raftAfterWALRelease struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
msgs := r.processMessages(rd.Messages)
notifyc <- struct{}{}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
r.Advance()
最终调用的是apply
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
case raftpb.EntryConfChange:
// We need to apply all WAL entries on top of v2store
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
shouldApplyV3 := membership.ApplyV2storeOnly
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
}
appliedi, appliedt = e.Index, e.Term
}
return appliedt, appliedi, shouldStop
}
网友评论