// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
// config is the agent configuration.
config *config.RuntimeConfig
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
func (a *Agent) Start() error {
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
a.delegate = server
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
a.delegate = client
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {
// Initialize the Raft server.
if err := s.setupRaft(); err != nil {
return nil, fmt.Errorf("Failed to start Raft: %v", err)
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// Setup the Raft store.
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
return err
return nil
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *fsm.FSM
// The raft instance is used among Consul nodes within the DC to protect
// operations that require strong consistency.
// the state directly.
raft *raft.Raft
raftLayer *RaftLayer
raftStore *raftboltdb.BoltStore
raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore
func (a *Agent) serveHTTP(srv *HTTPServer) error {
// https://github.com/golang/go/issues/20239
// In go.8.1 there is a race between Serve and Shutdown. If
// Shutdown is called before the Serve go routine was scheduled then
// the Serve go routine never returns. This deadlocks the agent
// shutdown for some tests since it will wait forever.
notif := make(chan net.Addr)
go func() {
defer a.wgServers.Done()
notif <- srv.ln.Addr()
err := srv.Serve(srv.ln)
if err != nil && err != http.ErrServerClosed {
select {
case addr := <-notif:
if srv.proto == "https" {
a.logger.Printf("[INFO] agent: Started HTTPS server on %s (%s)", addr.String(), addr.Network())
} else {
a.logger.Printf("[INFO] agent: Started HTTP server on %s (%s)", addr.String(), addr.Network())
return nil
case <-time.After(time.Second):
return fmt.Errorf("agent: timeout starting HTTP servers")
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err != nil && err.Error() != "not found" {
return nil, fmt.Errorf("failed to load current term: %v", err)
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
userSnapshotCh: make(chan *userSnapshotFuture),
userRestoreCh: make(chan *userRestoreFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
// Initialize as a follower.启动的时候默认是follower的角色
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
// Start the background work.这几个核心的线程我们后面重点来关注