背景
Consul 作为HashiCorp 出品的分布式注册中心和配置中心,是cp模型的,即强调一致性,通过raft协议实现
一致性
consul 一致性支持三种模式,即要强一致还是,最终一致, 可以交个用户选择,这才是一个优秀的分布式系统应该具备的,要了解一致性读,需要先了解consul的三种一致性模式,如下:
- Default
用consul 没有做任何改动的话,大部分都是这个模式工作的,default模式consul考虑了读的一致性还是很高的,读写都是通过leader来处理的,只是一种情况出现脑裂时,可能存在2个leader,在服务,但是老的leader肯定是不能写的,但是有可能服务读,读到过期的数据,但也不是一直会这样,leader有个租约,租约到期这个leader就下线了。
-
Consistent
consistent 即强一致读,如果是这个模式,consul 每次读请求都要向集群的超过半数的server检查他是不是leader,就比defalut 模式多一次rtt开销,因为即使你是leader,还要请求server确认是否存在其他的leader,这样肯定不会读到过时的数据。 -
Stale
stale是吞吐量最高的模式,但也是一致性最差的模式,所以一致性和吞吐量是矛盾的,因为stale 模式下,consul 集群的任何节点都能服务读请求,意外着即使集群没有leader,还是可以对外提供读请求。
我们了解了consul支持三种一致性模式,你是不是很好奇,consul是怎么实现的呢,我们平时部署一个consul集群也没有让我指定是那一种啥,consul既然是交给用户来选择,所以consul通过api的参数来确定,需要用那种读一致性。
在哪里指定一致性级别
有聪明的同学就会问,说了这么多,我到底在哪里指定这个一致性级别,别急,下面就开始说
consul 通过http 接口提供服务,就在http的api里可以指定,客户端sdk就不说了,有很多版本,这里只说consul agent端,因为线上一般都是直接请求localhost:8500 访问本地的consul agent的。下面是所有consul agent http接口都要执行的一个逻辑parseConsistency
,就是解析一致性
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b structs.QueryOptionsCompat) bool {
query := req.URL.Query()
//这里默认就认为是default模式。
defaults := true
//解析http请求如果带了stale参数,则是允许读过期的数据,那就server不用转发给leader
if _, ok := query["stale"]; ok {
b.SetAllowStale(true)
defaults = false
}
//解析http请求如果带了consistent参数,代表要读最新的数据。
if _, ok := query["consistent"]; ok {
b.SetRequireConsistent(true)
defaults = false
}
//解析http请求如果带了consistent参数,代表要从leader读。
if _, ok := query["leader"]; ok {
defaults = false
}
//解析http请求如果带了cached参数,代表可以从agent读,不需要请求server
if _, ok := query["cached"]; ok {
b.SetUseCache(true)
defaults = false
}
if maxStale := query.Get("max_stale"); maxStale != "" {
dur, err := time.ParseDuration(maxStale)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
return true
}
b.SetMaxStaleDuration(dur)
if dur.Nanoseconds() > 0 {
b.SetAllowStale(true)
defaults = false
}
}
...
上面解析了客户端的读模式,下面看怎么用的,随便看一个consul读的代码,比如查看健康的service node 的一段代码:
//如果可以用cache的数据,则直接从当前agent响应。
if args.QueryOptions.UseCache {
raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IndexedCheckServiceNodes)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
//否则需要通过rpc请求server节点。
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
}
我们只有指定了cache参数,consul 才会从agent 本地直接响应数据,这里也可以看出,agent 是会缓存数据的,否则就需要请求server节点,这个时候问题又来了,server节点一般我们是一个集群,最少3个节点,那请求那一个呢,有负载均衡吗,带着这个问题,我们看下代码,怎么选server的, 代码如下:
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil.
func (m *Manager) FindServer() *metadata.Server {
l := m.getServerList()
numServers := len(l.servers)
if numServers == 0 {
m.logger.Warn("No servers available")
return nil
}
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return l.servers[0]
}
consul 这里是不是处理的很简单,每次都是取第一个,人家注释也说了,如果这个出现失败了,会移到最后。
Consul Server的逻辑
consul agent 发现不用本地cache的数据,那就要rpc请求server节点,server节点接受到任何请求,都会执行forward方法,来检查是否要转发请求还是就自己响应数据。
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time
// Handle DC forwarding
// 检查dc是否一致,不一致就要转发到正确的dc
dc := info.RequestDatacenter()
if dc != s.config.Datacenter {
// Local tokens only work within the current datacenter. Check to see
// if we are attempting to forward one to a remote datacenter and strip
// it, falling back on the anonymous token on the other end.
if token := info.TokenSecret(); token != "" {
done, ident, err := s.ResolveIdentityFromToken(token)
if done {
if err != nil && !acl.IsErrNotFound(err) {
return false, err
}
if ident != nil && ident.IsLocal() {
// Strip it from the request.
info.SetTokenSecret("")
defer info.SetTokenSecret(token)
}
}
}
err := s.forwardDC(method, dc, args, reply)
return true, err
}
// Check if we can allow a stale read, ensure our local DB is initialized
// 这里server开始检查读一致性,如果允许读过期的数据,则直接用当前server的数据。
// 不需要后面的检查是否为leader了。
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
return false, nil
}
CHECK_LEADER:
// Fail fast if we are in the process of leaving
select {
case <-s.leaveCh:
return true, structs.ErrNoLeader
default:
}
// Find the leader
// 到这里就是要default读或者consistent读,都需要从leader读数据。
isLeader, leader := s.getLeader()
// Handle the case we are the leader
// 如果当前是leader,则不需要再转发到leader了。
if isLeader {
return false, nil
}
// Handle the case of a known leader
// 不是leader,则需要再转发到leader节点,多一次网络请求。
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
leader.Version, method, leader.UseTLS, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
}
return true, rpcErr
}
有同学看到这里,不是还有consistent 模式没有讲吗,这个就不在分析了, 不然文章太长了,没有人看
总结
一写就这么多,总算把consul的一致性读的特性,怎么用的,和背后的原理给说明了,我们默认情况都是default模式,即请求都是需要通过访问agent,agent再请求server,如果server不是leader,还要转发到leader节点。要1次http,2次rpc才能获取到数据,所以如果有consul server压力大的,可以通过cache来缓解server特别是leader的压力。
网友评论