美文网首页
Consul 的一致性读分析

Consul 的一致性读分析

作者: 绝尘驹 | 来源:发表于2023-07-13 00:04 被阅读0次

    背景

    Consul 作为HashiCorp 出品的分布式注册中心和配置中心,是cp模型的,即强调一致性,通过raft协议实现

    一致性

    consul 一致性支持三种模式,即要强一致还是,最终一致, 可以交个用户选择,这才是一个优秀的分布式系统应该具备的,要了解一致性读,需要先了解consul的三种一致性模式,如下:

    • Default
      用consul 没有做任何改动的话,大部分都是这个模式工作的,default模式consul考虑了读的一致性还是很高的,读写都是通过leader来处理的,只是一种情况出现脑裂时,可能存在2个leader,在服务,但是老的leader肯定是不能写的,但是有可能服务读,读到过期的数据,但也不是一直会这样,leader有个租约,租约到期这个leader就下线了。
    • Consistent
      consistent 即强一致读,如果是这个模式,consul 每次读请求都要向集群的超过半数的server检查他是不是leader,就比defalut 模式多一次rtt开销,因为即使你是leader,还要请求server确认是否存在其他的leader,这样肯定不会读到过时的数据。

    • Stale
      stale是吞吐量最高的模式,但也是一致性最差的模式,所以一致性和吞吐量是矛盾的,因为stale 模式下,consul 集群的任何节点都能服务读请求,意外着即使集群没有leader,还是可以对外提供读请求。

    我们了解了consul支持三种一致性模式,你是不是很好奇,consul是怎么实现的呢,我们平时部署一个consul集群也没有让我指定是那一种啥,consul既然是交给用户来选择,所以consul通过api的参数来确定,需要用那种读一致性。

    在哪里指定一致性级别

    有聪明的同学就会问,说了这么多,我到底在哪里指定这个一致性级别,别急,下面就开始说

    consul 通过http 接口提供服务,就在http的api里可以指定,客户端sdk就不说了,有很多版本,这里只说consul agent端,因为线上一般都是直接请求localhost:8500 访问本地的consul agent的。下面是所有consul agent http接口都要执行的一个逻辑parseConsistency,就是解析一致性

    func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b structs.QueryOptionsCompat) bool {
        query := req.URL.Query()
        //这里默认就认为是default模式。
        defaults := true
        //解析http请求如果带了stale参数,则是允许读过期的数据,那就server不用转发给leader
        if _, ok := query["stale"]; ok {
            b.SetAllowStale(true)
            defaults = false
        }
        //解析http请求如果带了consistent参数,代表要读最新的数据。
        if _, ok := query["consistent"]; ok {
            b.SetRequireConsistent(true)
            defaults = false
        }
        //解析http请求如果带了consistent参数,代表要从leader读。
        if _, ok := query["leader"]; ok {
            defaults = false
        }
        //解析http请求如果带了cached参数,代表可以从agent读,不需要请求server
        if _, ok := query["cached"]; ok {
            b.SetUseCache(true)
            defaults = false
        }
        if maxStale := query.Get("max_stale"); maxStale != "" {
            dur, err := time.ParseDuration(maxStale)
            if err != nil {
                resp.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
                return true
            }
            b.SetMaxStaleDuration(dur)
            if dur.Nanoseconds() > 0 {
                b.SetAllowStale(true)
                defaults = false
            }
        }
    ...
    

    上面解析了客户端的读模式,下面看怎么用的,随便看一个consul读的代码,比如查看健康的service node 的一段代码:

    //如果可以用cache的数据,则直接从当前agent响应。
        if args.QueryOptions.UseCache {
            raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
            if err != nil {
                return nil, err
            }
            defer setCacheMeta(resp, &m)
            reply, ok := raw.(*structs.IndexedCheckServiceNodes)
            if !ok {
                // This should never happen, but we want to protect against panics
                return nil, fmt.Errorf("internal error: response type not correct")
            }
            out = *reply
        } else {
        //否则需要通过rpc请求server节点。  
        RETRY_ONCE:
            if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
                return nil, err
            }
            if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
                args.AllowStale = false
                args.MaxStaleDuration = 0
                goto RETRY_ONCE
            }
        }
    

    我们只有指定了cache参数,consul 才会从agent 本地直接响应数据,这里也可以看出,agent 是会缓存数据的,否则就需要请求server节点,这个时候问题又来了,server节点一般我们是一个集群,最少3个节点,那请求那一个呢,有负载均衡吗,带着这个问题,我们看下代码,怎么选server的, 代码如下:

    // FindServer takes out an internal "read lock" and searches through the list
    // of servers to find a "healthy" server.  If the server is actually
    // unhealthy, we rely on Serf to detect this and remove the node from the
    // server list.  If the server at the front of the list has failed or fails
    // during an RPC call, it is rotated to the end of the list.  If there are no
    // servers available, return nil.
    func (m *Manager) FindServer() *metadata.Server {
        l := m.getServerList()
        numServers := len(l.servers)
        if numServers == 0 {
            m.logger.Warn("No servers available")
            return nil
        }
    
        // Return whatever is at the front of the list because it is
        // assumed to be the oldest in the server list (unless -
        // hypothetically - the server list was rotated right after a
        // server was added).
        return l.servers[0]
    }
    

    consul 这里是不是处理的很简单,每次都是取第一个,人家注释也说了,如果这个出现失败了,会移到最后。

    Consul Server的逻辑

    consul agent 发现不用本地cache的数据,那就要rpc请求server节点,server节点接受到任何请求,都会执行forward方法,来检查是否要转发请求还是就自己响应数据。

    func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
        var firstCheck time.Time
    
        // Handle DC forwarding
        // 检查dc是否一致,不一致就要转发到正确的dc
        dc := info.RequestDatacenter()
        if dc != s.config.Datacenter {
            // Local tokens only work within the current datacenter. Check to see
            // if we are attempting to forward one to a remote datacenter and strip
            // it, falling back on the anonymous token on the other end.
            if token := info.TokenSecret(); token != "" {
                done, ident, err := s.ResolveIdentityFromToken(token)
                if done {
                    if err != nil && !acl.IsErrNotFound(err) {
                        return false, err
                    }
                    if ident != nil && ident.IsLocal() {
                        // Strip it from the request.
                        info.SetTokenSecret("")
                        defer info.SetTokenSecret(token)
                    }
                }
            }
    
            err := s.forwardDC(method, dc, args, reply)
            return true, err
        }
    
        // Check if we can allow a stale read, ensure our local DB is initialized
        // 这里server开始检查读一致性,如果允许读过期的数据,则直接用当前server的数据。
        // 不需要后面的检查是否为leader了。
        if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
            return false, nil
        }
    
    CHECK_LEADER:
        // Fail fast if we are in the process of leaving
        select {
        case <-s.leaveCh:
            return true, structs.ErrNoLeader
        default:
        }
    
        // Find the leader
        // 到这里就是要default读或者consistent读,都需要从leader读数据。
        isLeader, leader := s.getLeader()
    
        // Handle the case we are the leader
        // 如果当前是leader,则不需要再转发到leader了。
        if isLeader {
            return false, nil
        }
    
        // Handle the case of a known leader
        // 不是leader,则需要再转发到leader节点,多一次网络请求。
        rpcErr := structs.ErrNoLeader
        if leader != nil {
            rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
                leader.Version, method, leader.UseTLS, args, reply)
            if rpcErr != nil && canRetry(info, rpcErr) {
                goto RETRY
            }
            return true, rpcErr
        }
    

    有同学看到这里,不是还有consistent 模式没有讲吗,这个就不在分析了, 不然文章太长了,没有人看

    总结

    一写就这么多,总算把consul的一致性读的特性,怎么用的,和背后的原理给说明了,我们默认情况都是default模式,即请求都是需要通过访问agent,agent再请求server,如果server不是leader,还要转发到leader节点。要1次http,2次rpc才能获取到数据,所以如果有consul server压力大的,可以通过cache来缓解server特别是leader的压力。

    相关文章

      网友评论

          本文标题:Consul 的一致性读分析

          本文链接:https://www.haomeiwen.com/subject/rtvuudtx.html