美文网首页超级账本HyperLeder
Fabric 1.4源码分析 - orderer的raft实现

Fabric 1.4源码分析 - orderer的raft实现

作者: 小蜗牛爬楼梯 | 来源:发表于2019-12-23 09:07 被阅读0次

    Raft可以说是Fabric 1.X系列的首个真正意义的共识算法。Fabric的实现主要涉及到三个类,chain.go <-> etcdraft/node.go <-> raft/node.go, 其中raft/node.go是etcd的开源包,chain.go是实现共识算法的主要类,etcdraft/node.go则是相当于适配模式下的适配器,用于连接两者,对实现屏蔽Raft的具体实现方案。

    首先看chain.go的struct(略去部分field),包含etcdraft/node.go对象。调用chain.go#Start方法时(省略),内部调用了etcdraft/node.go#start方法。

    type Chain struct {
        rpc RPC
    
        raftID    uint64
        channelID string
        lastKnownLeader uint64
    
        submitC  chan *submit
        applyC   chan apply
        observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
        snapC    chan *raftpb.Snapshot // Signal to catch up with snapshot
        gcC      chan *gc              // Signal to take snapshot
    
        configInflight       bool // this is true when there is config block or ConfChange in flight
        blockInflight        int  // number of in flight blocks
    
        // needed by snapshotting
        sizeLimit        uint32 // SnapshotIntervalSize in bytes
        accDataSize      uint32 // accumulative data size since last snapshot
        lastSnapBlockNum uint64
        confState        raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
    
        createPuller CreateBlockPuller // func used to create BlockPuller on demand
    
        // this is exported so that test can use `Node.Status()` to get raft node status.
        Node *node
    }
    
    // Start instructs the orderer to begin serving the chain and keep it current.
    func (c *Chain) Start() {
        c.Node.start(c.fresh, isJoin)
    
        // 响应c.gcC channel的信号,进行c.Node.takeSnapshot操作,并且将过期的可配置个数前的消息和snapshot清空
        go c.gc()
        go c.serveRequest()
    
        // 
        es := c.newEvictionSuspector()
        interval := DefaultLeaderlessCheckInterval
        c.periodicChecker.Run()
    }
    
    type node struct {
        chainID string
        storage *RaftStorage    // raft的wal, ram等持久化或者暂存内存实现类
        config  *raft.Config
        rpc RPC     // 负责节点间的grpc通信,管理与各个节点的grpc client/stream
        chain *Chain
        raft.Node   // 开源库etcd的raft节点实现
    }
    

    etcdraft/node.go#run是其主要逻辑。(以下截取展示部分逻辑)。实际上可以看到,开源库etcd的raft节点实现只管理raft相关的propose, commit, election等过程,而把其他的业务相关留给使用方,包括节点间通信等。

    for {
        //// n为来自开源库etcd的raft节点raft.Node,通知消息
        case rd := <-n.Ready():
            // wal 
            if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
                n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
            }
    
            // 落后和新加入节点需要同步的snapshot
            if !raft.IsEmptySnap(rd.Snapshot) {
                n.chain.snapC <- &rd.Snapshot
            }
    
            // skip empty apply。 来自raft节点的新消息(提议的提交信息rd.CommittedEntries,或者状态变换rd.SoftState,如新leader,节点数量变化等等)
            if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
                n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
            }
    
            n.Advance()
    
            // TODO(jay_guo) leader can write to disk in parallel with replicating to the followers and them writing to their disks. Check 10.2.1 in thesis
            // 调用rpc RPC,交由管理的grpc client发送
            n.send(rd.Messages)
        }
    

    Orderer消息的入口还是chain.go#Order和chain.go#Configure,实际上最后调用chain.go#Submit,其如注释所说,如果本节点是leader则发送到submitC channel内,否则通过rpc发送到leader节点。

    // Submit forwards the incoming request to:
    // - the local serveRequest goroutine if this is leader
    // - the actual leader via the transport mechanism
    // The call fails if there's no leader elected yet.
    func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
        leadC := make(chan uint64, 1)
        select {
        case c.submitC <- &submit{req, leadC}:
            lead := <-leadC
            if lead != c.raftID {
                if err := c.rpc.SendSubmit(lead, req); err != nil {
                    c.Metrics.ProposalFailures.Add(1)
                    return err
                }
            }
        }
    }
    

    chain.go的运行主体在chain.go#serveRequest,其中主要是select。

    select {
        // 来自于`chain.go#Submit`,也就是提交的proposal,这里主要是判断当前是leader才会进行propose。
        // 如果当前节点是leader,则调用`consensus.ConsenterSupport#ProcessConfigMsg/ProcessNormalMsg`,然后调用`support.BlockCutter().Ordered`切割batch。
        // 对切割后还有pending的消息启动timer,也就是下面的`<-timer.C():`分支,到期后在进行切割。
        case s := <-submitC:
            // 与orderer的其他实现一致,可参考[Fabric 1.4源码分析 - chaincode instantiate(8)orderer的排序过程]
            batches, pending, err := c.ordered(s.req)
            if pending {
                startTimer() // no-op if timer is already started
            } else {
                stopTimer()
            }
    
            c.propose(propC, bc, batches...)
        // 来自上文提到的`etcdraft/node.go#run`,也就是开源库etcd的raft节点raft.Node的通知消息
        // 这部分消息可能包含leader的切换,最新的记录在消息的`chain.go/apply/raft.SoftState/Lead`字段,进而相应的`propC, cancelProp = becomeLeader()`或者`becomeFollower()`。
        // propC即上面的propose方法的参数,在becomeLeader内处理,即调用`raft.go#Node.Propose`进行propose
        case app := <-c.applyC:
            // 这里的entries是CommittedEntries,即需要commit的entry,也就是leader当初propose的block
            // n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
            c.apply(app.entries)
        case <-timer.C():
        // snapC    chan *raftpb.Snapshot // Signal to catch up with snapshot
        // 来自于`etcdraft/node.go#run`, select-case的`rd := <-n.Ready():`内`n.chain.snapC <- &rd.Snapshot`,即底层raft的需要同步的snapshot。
        // 用于落后或者新加入的节点追上当前的消息状态
        case sn := <-c.snapC:
            c.catchUp(sn); err != nil
        case <-c.doneC:
    
    }
    
    func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
        // 如果前面调用的c.ordered(s.req)切割出来的batches非空,则创建一个新block,并且在becomeLeader里的propC里调用c.Node.Propose()
        for _, batch := range batches {
            b := bc.createNextBlock(batch)
            select {
            case ch <- b:
            default:
            }
            // if it is config block, then we should wait for the commit of the block
            if utils.IsConfigBlock(b) {
                c.configInflight = true
            }
            c.blockInflight++
        }
        return
    }
    
    becomeLeader := func() (chan<- *common.Block, 
        // Leader should call Propose in go routine, because this method may be blocked
        // if node is leaderless (this can happen when leader steps down in a heavily
        // loaded network). We need to make sure applyC can still be consumed properly.
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    if err := c.Node.Propose(ctx, data); err != ni
                    {...}
                }
            }
        }(ctx, ch)
    
        return ch, cancel
    }
    
    func (c *Chain) apply(ents []raftpb.Entry) {
        var position int
        for i := range ents {
            switch ents[i].Type {
            // 调用writeBlock,将commitEntry里的block写入账本
            // accDataSize是累计的block数据大小
            case raftpb.EntryNormal:
                position = i
                c.accDataSize += uint32(len(ents[i].Data))
                block := utils.UnmarshalBlockOrPanic(ents[i].Data)
                c.writeBlock(block, ents[i].Index)
            case raftpb.EntryConfChange:
                ...
                
            if ents[i].Index > c.appliedIndex {
                c.appliedIndex = ents[i].Index
            }
        }
    
        // accDataSize是累计的block数据大小,大于配置的sizeLimit后,写入c.gcC channel,在`chain.go#gc`内处理c.Node.takeSnapshot
        if c.accDataSize >= c.sizeLimit {
            b := utils.UnmarshalBlockOrPanic(ents[position].Data)
    
            select {
            case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
                c.accDataSize = 0
                c.lastSnapBlockNum = b.Header.Number
        }
    }
    

    chain.go#gc

    func (c *Chain) gc() {
        for {
            select {
            case g := <-c.gcC:
                c.Node.takeSnapshot(g.index, g.state, g.data)
            }
        }
    }
    

    总体架构流程如上,具体细节可以参考以下,这些参考比较详细都描述了实现细节。

    相关文章

      网友评论

        本文标题:Fabric 1.4源码分析 - orderer的raft实现

        本文链接:https://www.haomeiwen.com/subject/avzanctx.html