一、概要
从本周开始对consul的源码做一个简单的阅读和了解,希望能持续下去吧。
consul是使用go编写的,在阅读过程中可能会涉及到对go语法的相关笔记,这个会分开两个系列文章去更新。
二、分析
1.agent的定义(agent.go)
// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
// config is the agent configuration.
config *config.RuntimeConfig
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
agent是一个常驻进程部署在所有机器上,可以分client和server两种模式运行(client模式只负责转发请求,轻量级)。
config为agent的配置,包括nodeID等核心的配置都在里
delegate为Server或者client的对象,取决于进程启动选择的方式
2.程序入口(agent.go)
func (a *Agent) Start() error {
...
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
a.delegate = client
}
...
这里主要是构建一个Server或者client的对象,我们接下来看一个server对象是如何构建的
#server.go
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {
...
// Initialize the Raft server.
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
...
// Setup the Raft store.
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
return err
}
return nil
我们看下server对象的重要属性,跟raft协议相关的对象封装在这里。
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
...
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *fsm.FSM
// The raft instance is used among Consul nodes within the DC to protect
// operations that require strong consistency.
// the state directly.
raft *raft.Raft
raftLayer *RaftLayer
raftStore *raftboltdb.BoltStore
raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore
func (a *Agent) serveHTTP(srv *HTTPServer) error {
// https://github.com/golang/go/issues/20239
//
// In go.8.1 there is a race between Serve and Shutdown. If
// Shutdown is called before the Serve go routine was scheduled then
// the Serve go routine never returns. This deadlocks the agent
// shutdown for some tests since it will wait forever.
notif := make(chan net.Addr)
a.wgServers.Add(1)
go func() {
defer a.wgServers.Done()
notif <- srv.ln.Addr()
err := srv.Serve(srv.ln)
if err != nil && err != http.ErrServerClosed {
a.logger.Print(err)
}
}()
select {
case addr := <-notif:
if srv.proto == "https" {
a.logger.Printf("[INFO] agent: Started HTTPS server on %s (%s)", addr.String(), addr.Network())
} else {
a.logger.Printf("[INFO] agent: Started HTTP server on %s (%s)", addr.String(), addr.Network())
}
return nil
case <-time.After(time.Second):
return fmt.Errorf("agent: timeout starting HTTP servers")
}
}
这里做的事情很简单,启动服务器并输出日志。
最后我们看起构建一个raft对象具体做了哪些事情。
#api.go
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
...
// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err != nil && err.Error() != "not found" {
return nil, fmt.Errorf("failed to load current term: %v", err)
}
//这里会尝试获得当前的任期(currentTerm),这里取决于实现的方式,如果是inmem的方式实现则会从内存中获取,BoltStore也是用kv的格式保存,但是会持久化到硬盘。
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
userSnapshotCh: make(chan *userSnapshotFuture),
userRestoreCh: make(chan *userRestoreFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
}
//构建一个raft的结构体
// Initialize as a follower.启动的时候默认是follower的角色
r.setState(Follower)
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)
// Start the background work.这几个核心的线程我们后面重点来关注
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
网友评论