美文网首页
ETCD源码阅读(三)

ETCD源码阅读(三)

作者: 夕午wuw | 来源:发表于2023-03-30 17:59 被阅读0次

    阅读raftexample:etcd/contrib/raftexample

    serveChannels()

    func (rc *raftNode) serveChannels() {
        snap, err := rc.raftStorage.Snapshot()
        if err != nil {
            panic(err)
        }
        rc.confState = snap.Metadata.ConfState
        rc.snapshotIndex = snap.Metadata.Index
        rc.appliedIndex = snap.Metadata.Index
    
        defer rc.wal.Close()
    
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
    
        // send proposals over raft
        go func() {
            confChangeCount := uint64(0)
    
            for rc.proposeC != nil && rc.confChangeC != nil {
                select {
                case prop, ok := <-rc.proposeC:
                    if !ok {
                        rc.proposeC = nil
                    } else {
                        // blocks until accepted by raft state machine
                        rc.node.Propose(context.TODO(), []byte(prop))
                    }
    
                case cc, ok := <-rc.confChangeC:
                    if !ok {
                        rc.confChangeC = nil
                    } else {
                        confChangeCount++
                        cc.ID = confChangeCount
                        rc.node.ProposeConfChange(context.TODO(), cc)
                    }
                }
            }
            // client closed channel; shutdown raft if not already
            close(rc.stopc)
        }()
    
        // event loop on raft state machine updates
        for {
            select {
            case <-ticker.C:
                rc.node.Tick()
    
            // store raft entries to wal, then publish over commit channel
            case rd := <-rc.node.Ready():
                rc.wal.Save(rd.HardState, rd.Entries)
                if !raft.IsEmptySnap(rd.Snapshot) {
                    rc.saveSnap(rd.Snapshot)
                    rc.raftStorage.ApplySnapshot(rd.Snapshot)
                    rc.publishSnapshot(rd.Snapshot)
                }
                rc.raftStorage.Append(rd.Entries)
                rc.transport.Send(rd.Messages)
                applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
                if !ok {
                    rc.stop()
                    return
                }
                rc.maybeTriggerSnapshot(applyDoneC)
                rc.node.Advance()
    
            case err := <-rc.transport.ErrorC:
                rc.writeError(err)
                return
    
            case <-rc.stopc:
                rc.stop()
                return
            }
        }
    }
    

    serveChannels函数,是 raftexample 中用于处理 Raft 消息的主要事件循环。

    1. 通过调用 rc.raftStorage.Snapshot() 方法获取 etcd 当前的快照数据,并将其中的集群配置状态(ConfState)、快照索引(snapshotIndex)和已提交的索引(appliedIndex)等元数据信息记录到当前 raftNode 实例(即 rc 变量)中。
    2. 创建一个定时器 ticker,每 100 毫秒触发一次,用于驱动 Raft 状态机的 Tick() 方法。这个Tick()方法使Raft状态机的内部逻辑时钟加一,选举超时和心跳超时都以“tick”的单位来计算。选举超时:当一个节点在经过一段时间(通常是几个 tick)没有收到心跳和选举请求时,就会从follower状态转变成candidate状态,发起一次选举。心跳超时:只有leader拥有这个计时器,超时了就给follower发送心跳包。follower收到后就会把选举超市计数器置零。
    3. 创建一个新的 goroutine,用于将处理KVStore发送的proposal,HTTP API Server对集群配置的更改请求(如添加或删除节点)。
    4. 创建一个处理Raft状态机更新的Goroutine,不断处理以下事件:
      • ticker事件:参考上一段

      • rc.node.Ready():

        case rd := <-rc.node.Ready():
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            rc.raftStorage.Append(rd.Entries)
            rc.transport.Send(rd.Messages)
            applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
            if !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot(applyDoneC)
            // Advance notifies the Node that the application has 
            // saved progress up to the last Ready. 
            // It prepares the node to return the next available Ready.
            // The application should generally call Advance after
            // it applies the entries in last Ready.
            rc.node.Advance()
        

        获取aft节点已经准备好的一批Entry和当前的HardState(给客户端发送状态更新前的状态),并保存到 WAL 中。接着判断是否有快照数据,如果有就更新快照。

        将Entry添加到raft节点的raftStorage中。然后调用 rc.transport.Send() 方法将 Messages 发送给其他节点。

        最后调用 rc.entriesToApply() 方法获取需要apply的 committed entries,通过调用 rc.publishEntries() 方法将这些 committed entries 应用到状态机中,并返回一个 channel 用于通知应用完成。

      • rc.transport.ErrorC 和 rc.stopc:出现错误需要关闭Raft Node

    raft.go

    这段代码经过前面的解读,已经理解的差不多了,我们来看看对应的单元测试吧。选了一个比较有意思的。

    // TestProposeOnCommit starts three nodes and feeds commits back into the proposal
    // channel. The intent is to ensure blocking on a proposal won't block raft progress.
    func TestProposeOnCommit(t *testing.T) {
        clus := newCluster(3)
        defer clus.closeNoErrors(t)
    
        donec := make(chan struct{})
        for i := range clus.peers {
            // feedback for "n" committed entries, then update donec
            go func(pC chan<- string, cC <-chan *commit, eC <-chan error) {
                for n := 0; n < 100; n++ {
                    c, ok := <-cC
                    if !ok {
                        pC = nil
                    }
                    select {
                    case pC <- c.data[0]:
                        continue
                    case err := <-eC:
                        t.Errorf("eC message (%v)", err)
                    }
                }
                donec <- struct{}{}
                for range cC {
                    // acknowledge the commits from other nodes so
                    // raft continues to make progress
                }
            }(clus.proposeC[i], clus.commitC[i], clus.errorC[i])
    
            // one message feedback per node
            go func(i int) { clus.proposeC[i] <- "foo" }(i)
        }
    
        for range clus.peers {
            <-donec
        }
    }
    

    先mock了一个三节点的集群(其实就是将对应的channel连接上),然后在每个节点上启动了两个 goroutine。其中,一个 goroutine 持续地从该节点的 commit channel 中读取消息,然后将第一条消息反馈给该节点的 propose channel。另一个 goroutine 仅向该节点的 propose channel 中发送一条消息。

    这样就会发生不断地从 propose channel 中读取并将消息发送到 commit channel 中的过程,然后在一段时间后结束这个循环并向 donec channel 发送一个空结构体来指示完成。最后测试会阻塞直到从所有的节点都接收到了 donec channel 中的空结构体,以确保测试的所有 goroutine 都已结束。

    这个测试是为了验证:即使在等待反馈消息的时候,也不会阻塞 raft 的进展。

    相关文章

      网友评论

          本文标题:ETCD源码阅读(三)

          本文链接:https://www.haomeiwen.com/subject/qddcddtx.html