美文网首页
Hyperledger-Fabric源码分析(Gossip-El

Hyperledger-Fabric源码分析(Gossip-El

作者: Pillar_Zhong | 来源:发表于2019-04-02 13:19 被阅读0次
    // Gossip leader election module
    // Algorithm properties:
    // - Peers break symmetry by comparing IDs
    // - Each peer is either a leader or a follower,
    //   and the aim is to have exactly 1 leader if the membership view
    //   is the same for all peers
    // - If the network is partitioned into 2 or more sets, the number of leaders
    //   is the number of network partitions, but when the partition heals,
    //   only 1 leader should be left eventually
    // - Peers communicate by gossiping leadership proposal or declaration messages
    
    // The Algorithm, in pseudo code:
    //
    //
    // variables:
    //     leaderKnown = false
    //
    // Invariant:
    // Peer listens for messages from remote peers
    // and whenever it receives a leadership declaration,
    // leaderKnown is set to true
    //
    // Startup():
    //     wait for membership view to stabilize, or for a leadership declaration is received
    //      or the startup timeout expires.
    // goto SteadyState()
    //
    // SteadyState():
    //     while true:
    //    If leaderKnown is false:
    //           LeaderElection()
    //    If you are the leader:
    //       Broadcast leadership declaration
    //       If a leadership declaration was received from
    //           a peer with a lower ID,
    //       become a follower
    //    Else, you're a follower:
    //       If haven't received a leadership declaration within
    //           a time threshold:
    //          set leaderKnown to false
    //
    // LeaderElection():
    //     Gossip leadership proposal message
    // Collect messages from other peers sent within a time period
    // If received a leadership declaration:
    //    return
    // Iterate over all proposal messages collected.
    //     If a proposal message from a peer with an ID lower
    //     than yourself was received, return.
    // Else, declare yourself a leader
    

    基本上注释写得很清楚,就是一个简化版的Raft共识算法。

    初始化

    func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService {
       if len(id) == 0 {
          panic("Empty id")
       }
       le := &leaderElectionSvcImpl{
          id:            peerID(id),
          proposals:     util.NewSet(),
          adapter:       adapter,
          stopChan:      make(chan struct{}, 1),
          interruptChan: make(chan struct{}, 1),
          logger:        util.GetLogger(util.ElectionLogger, ""),
          callback:      noopCallback,
       }
    
       if callback != nil {
          le.callback = callback
       }
    
       go le.start()
       return le
    }
    

    随Gossip启动而启动,最后开始start

    启动

    func (le *leaderElectionSvcImpl) start() {
       le.stopWG.Add(2)
       go le.handleMessages()
       le.waitForMembershipStabilization(getStartupGracePeriod())
       go le.run()
    }
    

    这里便是election模块的核心起手式了。下面我们具体来分析下里面的实现。

    handleMessages

    func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
        msgType := "proposal"
        if msg.IsDeclaration() {
            msgType = "declaration"
        }
        le.logger.Debug(le.id, ":", msg.SenderID(), "sent us", msgType)
        le.Lock()
        defer le.Unlock()
    
        if msg.IsProposal() {
            le.proposals.Add(string(msg.SenderID()))
        } else if msg.IsDeclaration() {
            atomic.StoreInt32(&le.leaderExists, int32(1))
            if le.sleeping && len(le.interruptChan) == 0 {
                le.interruptChan <- struct{}{}
            }
            if bytes.Compare(msg.SenderID(), le.id) < 0 && le.IsLeader() {
                le.stopBeingLeader()
            }
        } else {
            // We shouldn't get here
            le.logger.Error("Got a message that's not a proposal and not a declaration")
        }
    }
    
    • 首先,跟选举有关的消息类型是LeadershipMessage

    • 而消息又根据行为分为两种,proposal和declaration,前者是提案表达我要参加选举,后者是声明表示我已经当选

    • 搞清楚这些,那么这里就知道是什么意思了。

      • 首先如果是提案消息的话,先收集下来,为选举做准备,具体怎么用后面再讲
      • 如果有声明消息过来,说明有新的leader选举产生,那么如果接收点是上届leader,需要交出leader权限。
        • interruptChan这里需要注意,有两个地方会等待,一是选举的过程中如果收到这个通知,说明有新的选举结果产生,有可能是自己当选,也有可能是别人。
        • 二是成为leader之后如果收到这个通知,说明有新的leader产生,可能需要换届。
      func (le *leaderElectionSvcImpl) stopBeingLeader() {
         le.logger.Info(le.id, "Stopped being a leader")
         atomic.StoreInt32(&le.isLeader, int32(0))
         le.callback(false)
      }
      
      • 看起来也很简单,只是设置isLeader的标志位而已。
      • 下面我们看下callback在干嘛?

    callback

    func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
       return func(isLeader bool) {
          if isLeader {
             yield := func() {
                g.lock.RLock()
                le := g.leaderElection[chainID]
                g.lock.RUnlock()
                le.Yield()
             }
            
             if err := g.deliveryService[chainID].StartDeliverForChannel(chainID, committer, yield); err != nil {
               
             }
          } else {
           
             if err := g.deliveryService[chainID].StopDeliverForChannel(chainID); err != nil {
            
             }
    
          }
    
       }
    }
    
    • 如果看过orderer篇的应该知道Delivery service,用来拉取order的block,这里很关键,不做leader,不是简单标识下就完了,需要交出代成员接收block的权力。
    • 同一个组织内的peer节点的block同步是靠leader来运作的。
    • 这里有个地方需要注意的是yield,只在当前节点是leader的情况下进来。当选举结果需要换届时,不光是交出了delivery的权力,还对le本身设置了yield的标志位。

    waitForMembershipStabilization

    func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) {
       le.logger.Debug(le.id, ": Entering")
       defer le.logger.Debug(le.id, ": Exiting, peers found", len(le.adapter.Peers()))
       endTime := time.Now().Add(timeLimit)
       viewSize := len(le.adapter.Peers())
       for !le.shouldStop() {
          time.Sleep(getMembershipSampleInterval())
          newSize := len(le.adapter.Peers())
          if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() {
             return
          }
          viewSize = newSize
       }
    }
    

    这个方法很有意思,其目的是等待成员稳定。这里三种方式来退出这个黑洞。

    • 隔一段时间,来比对前后的peer列表个数是否一致,一致就说明稳定
    • 当然了,不能没完没了的等待一致,如果到达deadline,也强行退出
    • 或者有leader选出,也直接退出

    run

    func (le *leaderElectionSvcImpl) run() {
       defer le.stopWG.Done()
       for !le.shouldStop() {
          if !le.isLeaderExists() {
             le.leaderElection()
          }
          // If we are yielding and some leader has been elected,
          // stop yielding
          if le.isLeaderExists() && le.isYielding() {
             le.stopYielding()
          }
          if le.shouldStop() {
             return
          }
          if le.IsLeader() {
             le.leader()
          } else {
             le.follower()
          }
       }
    }
    
    • 我们下面分步来拆解
    • 这里的逻辑基于上面handleMessage的结论进行进一步处理
      • 如果当前没有leader产生,那么立即发起选举,并且自荐给其他好朋友。这里后面详细讲。
      • 如果当前有leader存在,而且当前节点是yield状态,停止这个过度状态。
      • 如果已经是leader,那么leader
      • 否则follower,这些后面会详细分析

    leaderElection

    func (le *leaderElectionSvcImpl) leaderElection() {
       le.logger.Debug(le.id, ": Entering")
       defer le.logger.Debug(le.id, ": Exiting")
       // If we're yielding to other peers, do not participate
       // in leader election
       if le.isYielding() {
          return
       }
       // Propose ourselves as a leader
       le.propose()
       // Collect other proposals
       le.waitForInterrupt(getLeaderElectionDuration())
       // If someone declared itself as a leader, give up
       // on trying to become a leader too
       if le.isLeaderExists() {
          le.logger.Info(le.id, ": Some peer is already a leader")
          return
       }
    
       if le.isYielding() {
          le.logger.Debug(le.id, ": Aborting leader election because yielding")
          return
       }
       // Leader doesn't exist, let's see if there is a better candidate than us
       // for being a leader
       for _, o := range le.proposals.ToArray() {
          id := o.(string)
          if bytes.Compare(peerID(id), le.id) < 0 {
             return
          }
       }
       // If we got here, there is no one that proposed being a leader
       // that's a better candidate than us.
       le.beLeader()
       atomic.StoreInt32(&le.leaderExists, int32(1))
    }
    
    • 如果是yield状态,说明他正在移交权力给别人,所以不要参与选举。

    • 给其他人进行自荐,发出选举提案

    • 等待interruptChan,这里前面提到过,等到的话,说明有新的选举结果产生。当然了,如果等不到,还有超时会触发。

    • 接下来,如果当前leader已经产生,直接返回,因为选举已经结束。

    • 这里又判断一次yield,因为是新的阶段,前面是发起选举前,这里是收到选举结果后。

    • 重点来了,接下来是怎么从众多提案中找到有资格担任leader的节点。

      • 算法也很简单,就是比大小,bytes.Compare(peerID(id), le.id),从收集到的proposals里面看自己是不是最小的。如果是,做beLeader。然后标识leader已经产生。

      • func (le *leaderElectionSvcImpl) beLeader() {
           le.logger.Info(le.id, ": Becoming a leader")
           atomic.StoreInt32(&le.isLeader, int32(1))
           le.callback(true)
        }
        
        • 成为leader的结果,一是托管从orderer拉取block的任务,二是标识自己是leader。

    leader

    func (le *leaderElectionSvcImpl) leader() {
       leaderDeclaration := le.adapter.CreateMessage(true)
       le.adapter.Gossip(leaderDeclaration)
       le.waitForInterrupt(getLeadershipDeclarationInterval())
    }
    
    • 首先当leader不光是前面说的那些任务,还要跟别人分享。回忆下Raft选举算法的核心,怎么跟别人维持自己的leader地位,是需要定时发心跳的。而这里心跳变成了leaderDeclaration。
    • 接下来就是waitForInterrupt,当然了有新的leader发来的Declaration消息会通知到这里,另外,里面每次都会sleep一段时间。再加上外面的for循环,那么效果就是每隔一段时间去发declaration去维持leader的地位,这不就是心跳么?

    follower

    func (le *leaderElectionSvcImpl) follower() {
       le.logger.Debug(le.id, ": Entering")
       defer le.logger.Debug(le.id, ": Exiting")
    
       le.proposals.Clear()
       atomic.StoreInt32(&le.leaderExists, int32(0))
       select {
       case <-time.After(getLeaderAliveThreshold()):
       case <-le.stopChan:
          le.stopChan <- struct{}{}
       }
    }
    
    • 这里还是有讲究的,首先走到这里说明这轮选举很不幸已经结束,该节点没有选上。没关系,再接再厉,清理掉这一轮的选举提案,准备为下轮做准备。
    • 这里很关键的设置了leaderExists为0,说明默认认为leader不存在。有两个时间你需要搞清楚。
      • peer.gossip.election.leaderElectionDuration=5s
      • peer.gossip.election.leaderAliveThreshold=10s
        • 我们前面讲过了,leaderElectionDuration是leader发起心跳的间隔,而这里用到的leaderAliveThreshold是指leader的存活时间
        • 这里就很清楚了,5秒内我能收到心跳,那么就认为leader是好的,否则超过10s,当前还没有leader存在,说明leader大概率挂了,需要重新选举。这里返回后,会重新leaderElection。

    相关文章

      网友评论

          本文标题:Hyperledger-Fabric源码分析(Gossip-El

          本文链接:https://www.haomeiwen.com/subject/tdzwbqtx.html