美文网首页
Consul 一致性读Consistent 篇

Consul 一致性读Consistent 篇

作者: 绝尘驹 | 来源:发表于2023-07-17 22:04 被阅读0次

上偏文章我们分析了consul 一致性default 和stale 两种模式的一致性读的实现原理,让我们回忆下,

Stale模式

链接任何一个server节点都可以读,容忍过期的数据,

Default 模式

这个是我们大多数人用的模式,需要从leader返回数据,如果agent链接到server是follower节点,则需要转发给leader来处理。

Consistent 模式

而且我们也说了Consistent 一致性读比default模式还要严格,除了需要leader返回数据外,还需要确认当前leader是否唯一,怎么确认,就需要和follower 通信来确认,就是问下几个从节点,嘿,我还是leader吗,如果超过半数的人都响应是,则恭喜你,还是leader,就可以返回数据了。否则返回失败。

下面我们分析下原理是怎么实现的,初步摸排了下,这个过程还挺复杂的,所以需要单独再开一篇文章来说明。

检查入口
consul server 出来所有的请求基本上都支持阻塞查询,即blockingQuery方法,consul 在计算好超时时间后,会做是否为Consistent模式的读。

// Validate
// If the read must be consistent we verify that we are still the leader.
// queryOpts 这里检查是否一致性读
if queryOpts.GetRequireConsistent() {
   //如果是则通过consistentRead来实现
   if err := s.consistentRead(); err != nil {
      return err
   }
}
//不是本地之间返回

我们来分析consistentRead的逻辑, 关键就时发起一个leader验证的请求,然后等待结果,代码如下:

// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
   defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
   future := s.raft.VerifyLeader()
   //查询请求会阻塞在这里,即future.Error(),需要等过半验证成功才返回。
   if err := future.Error(); err != nil {
      return err //fail fast if leader verification fails
   }
   // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
   if s.isReadyForConsistentReads() {
      return nil
   }
   jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
   deadline := time.Now().Add(s.config.RPCHoldTimeout)

   for time.Now().Before(deadline) {

      select {
      case <-time.After(jitter):
         // Drop through and check before we loop again.

      case <-s.shutdownCh:
         return fmt.Errorf("shutdown waiting for leader")
      }

      if s.isReadyForConsistentReads() {
         return nil
      }
   }

   return structs.ErrNotReadyForConsistentReads
}

我们的查询就阻塞在future.Error() 这里,没有验证完,这里是不会响应的。

那关键就在s.raft.VerifyLeader()这里,我们继续往下挖,下面是实现:

// VerifyLeader is used to ensure the current node is still
// the leader. This can be done to prevent stale reads when a
// new leader has potentially been elected.
func (r *Raft) VerifyLeader() Future {
   metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
   verifyFuture := &verifyFuture{}
     //init 会初始化errCh,页就是前面error会阻塞在这个channel上。
   verifyFuture.init()
   select {
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}

   case r.verifyCh <- verifyFuture:
      //写一个future到verifyCh,consul leader主协程会watch这个verifyCh,会触发验证的逻辑
      return verifyFuture
   }
}

老外写的代码还是很友好的,注释写的很清楚,看代码的同时还能看写的注释,页大概明白这个方法的意图,这个VerifyLeader 的核心逻辑就是创建一个verifyFuture,这个future很关键,关键指标都在这里,定义如下:

/ verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
   deferError
   notifyCh   chan *verifyFuture
   quorumSize int //过半大小,默认是0
   votes      int //验证leader时follower响应ok,时votes会+1,如果超过quorumSize,则认为还是leader。
   voteLock   sync.Mutex
}

votes 是每个follower返回验证成功时,会对votes+1,然后判断是否大于过半quorumSize,默认是0,如果是过半,那恭喜你,目前你还是leader,可以执行当前这次读请求。

下面我们要看consul是怎么给follower发送验证请求的,通过上面的代码,可以看出,是向leader的verifyCh 写了一个future,所以肯定有一个go routing 会阻塞在这个verifyCh channel上,这就是异步通知, 下面就是leader的主协程登场了。

Leader 主协程
leader 完成初始化后,最后会启动一个循环函数,先看下定义,同样注释也说明的很清楚。

// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
   ...
这个leaderLoop会监听很多channel,比如rpc请求,commit等,其中一个就有verifyCh的channel,代码如下:

case v := <-r.verifyCh:
   if v.quorumSize == 0 {
      // Just dispatched, start the verification
      r.verifyLeader(v)

   } else if v.votes < v.quorumSize {
      // Early return, means there must be a new leader
      r.logger.Warn("new leader elected, stepping down")
      r.setState(Follower)
      delete(r.leaderState.notify, v)
      for _, repl := range r.leaderState.replState {
         repl.cleanNotify(v)
      }
      v.respond(ErrNotLeader)

   } else {
      // Quorum of members agree, we are still leader
      delete(r.leaderState.notify, v)
      for _, repl := range r.leaderState.replState {
         repl.cleanNotify(v)
      }
      v.respond(nil)
   }

我们前面查询的goroutine 通过发了一个verifyFuture 给verifyCh,leader 的main goroutine 就监听在这里,我们前面也说了,quorumSize 默认是0,所以第一次是会触发verifyLeader的逻辑,什么时候触发另外两个的逻辑呢,要等验证超过一半的请求返还了,就会再给verifyCh发一个消息,这时候,正常情况下就是找v.respond的逻辑,最终通知最前面的query go routine 阻塞就会被唤醒。

verifyLeader的核心逻辑就是初始化了quorumSize,另外就是对follower循环,consul 有个 replicate go routine 会和follower发心跳信息,每个follower一个,除了定时发心跳外,还支持实时触发心态,也就是监听notifyCh 这个channel,这个leader的go routine会发一个空的struct给这个channel来触发,会给每个follower都发一个类型为rpcAppendEntries的消息,核心代码如下:

// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
   repl.notifyLock.Lock()
   repl.notify[v] = struct{}{}
   repl.notifyLock.Unlock()
   //通知主动发一个心调到follower server
   asyncNotifyCh(repl.notifyCh)
}

consul leader 对每个follower 维持一个heartbeat ,核心代码如下:

for {
   // Wait for the next heartbeat interval or forced notify
   select {
   case <-s.notifyCh://通知即刻执行
   case <-randomTimeout(r.conf.HeartbeatTimeout / 10)://定时执行
   case <-stopCh:
      return
   }

   start := time.Now()
   if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
      r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
      failures++
      select {
      case <-time.After(backoff(failureWait, failures, maxFailureScale)):
      case <-stopCh:
      }
   } else {
     //更新时间
      s.setLastContact()
      failures = 0
      metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
      s.notifyAll(resp.Success)
   }
}

绕了这么大半圈,真正验证的关键代码总算出来了,也就是 r.trans.AppendEntries执行的,这里就不用分析了,就是发一个rpc请求给follower。如果成功。通过s.notifyAll(resp.Success) 来通知前面的future,核心代码如下:

// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
   v.voteLock.Lock()
   defer v.voteLock.Unlock()

   // Guard against having notified already
   if v.notifyCh == nil {
      return
   }

   if leader {
      v.votes++
      //防止一个follower响应就通知了leader,比如5台的时候,一台响应了+自己也就是2
      if v.votes >= v.quorumSize {
         v.notifyCh <- v
         v.notifyCh = nil
      }
   } else {
      v.notifyCh <- v
      v.notifyCh = nil
   }
}

如果follower响应成功,也就是认为你还是leader,则对votes加1,v.notifyCh 这里其实就是我们前面leader 的verifych, 通过幻想我们前面的查询请求阻塞在future.error哪里,整个经过这么一个复杂的流程才能完成一次正常的读请求,如果请求follower超时,则会等待一定的时间。继续请求。

总结

consul 官方文档说Consistent 模式读,为了实现这个强一致性读,consul 在背后做了这么多的事情,详细你看了这篇文章,以及前面的一篇文章,对consul的三种模式的读,应该有了一个全面的了解,在用的时候也能根据你的业务场景做出正确的选择,对consul 感兴趣的同学可以点关注,后面再继续分享consul的文章,带你看明白consul的世界。

相关文章

网友评论

      本文标题:Consul 一致性读Consistent 篇

      本文链接:https://www.haomeiwen.com/subject/qdogudtx.html