美文网首页
consul源码笔记

consul源码笔记

作者: leiwingqueen | 来源:发表于2018-11-25 19:07 被阅读0次

一、概要

从本周开始对consul的源码做一个简单的阅读和了解,希望能持续下去吧。
consul是使用go编写的,在阅读过程中可能会涉及到对go语法的相关笔记,这个会分开两个系列文章去更新。

二、分析

1.agent的定义(agent.go)

// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
    // config is the agent configuration.
    config *config.RuntimeConfig
    // delegate is either a *consul.Server or *consul.Client
    // depending on the configuration
    delegate delegate

agent是一个常驻进程部署在所有机器上,可以分client和server两种模式运行(client模式只负责转发请求,轻量级)。
config为agent的配置,包括nodeID等核心的配置都在里
delegate为Server或者client的对象,取决于进程启动选择的方式
2.程序入口(agent.go)

func (a *Agent) Start() error {
...
// Setup either the client or the server.
    if c.ServerMode {
        server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
        if err != nil {
            return fmt.Errorf("Failed to start Consul server: %v", err)
        }
        a.delegate = server
    } else {
        client, err := consul.NewClientLogger(consulCfg, a.logger)
        if err != nil {
            return fmt.Errorf("Failed to start Consul client: %v", err)
        }
        a.delegate = client
    }
...

这里主要是构建一个Server或者client的对象,我们接下来看一个server对象是如何构建的

#server.go
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {
...
// Initialize the Raft server.
    if err := s.setupRaft(); err != nil {
        s.Shutdown()
        return nil, fmt.Errorf("Failed to start Raft: %v", err)
    }

// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
...
// Setup the Raft store.
    s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
    if err != nil {
        return err
    }
    return nil

我们看下server对象的重要属性,跟raft协议相关的对象封装在这里。

// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
...
// fsm is the state machine used with Raft to provide
    // strong consistency.
    fsm *fsm.FSM
// The raft instance is used among Consul nodes within the DC to protect
    // operations that require strong consistency.
    // the state directly.
    raft          *raft.Raft
    raftLayer     *RaftLayer
    raftStore     *raftboltdb.BoltStore
    raftTransport *raft.NetworkTransport
    raftInmem     *raft.InmemStore
func (a *Agent) serveHTTP(srv *HTTPServer) error {
    // https://github.com/golang/go/issues/20239
    //
    // In go.8.1 there is a race between Serve and Shutdown. If
    // Shutdown is called before the Serve go routine was scheduled then
    // the Serve go routine never returns. This deadlocks the agent
    // shutdown for some tests since it will wait forever.
    notif := make(chan net.Addr)
    a.wgServers.Add(1)
    go func() {
        defer a.wgServers.Done()
        notif <- srv.ln.Addr()
        err := srv.Serve(srv.ln)
        if err != nil && err != http.ErrServerClosed {
            a.logger.Print(err)
        }
    }()

    select {
    case addr := <-notif:
        if srv.proto == "https" {
            a.logger.Printf("[INFO] agent: Started HTTPS server on %s (%s)", addr.String(), addr.Network())
        } else {
            a.logger.Printf("[INFO] agent: Started HTTP server on %s (%s)", addr.String(), addr.Network())
        }
        return nil
    case <-time.After(time.Second):
        return fmt.Errorf("agent: timeout starting HTTP servers")
    }
}

这里做的事情很简单,启动服务器并输出日志。

最后我们看起构建一个raft对象具体做了哪些事情。

#api.go
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
...
// Try to restore the current term.
    currentTerm, err := stable.GetUint64(keyCurrentTerm)
    if err != nil && err.Error() != "not found" {
        return nil, fmt.Errorf("failed to load current term: %v", err)
    }
//这里会尝试获得当前的任期(currentTerm),这里取决于实现的方式,如果是inmem的方式实现则会从内存中获取,BoltStore也是用kv的格式保存,但是会持久化到硬盘。
// Create Raft struct.
    r := &Raft{
        protocolVersion: protocolVersion,
        applyCh:         make(chan *logFuture),
        conf:            *conf,
        fsm:             fsm,
        fsmMutateCh:     make(chan interface{}, 128),
        fsmSnapshotCh:   make(chan *reqSnapshotFuture),
        leaderCh:        make(chan bool),
        localID:         localID,
        localAddr:       localAddr,
        logger:          logger,
        logs:            logs,
        configurationChangeCh: make(chan *configurationChangeFuture),
        configurations:        configurations{},
        rpcCh:                 trans.Consumer(),
        snapshots:             snaps,
        userSnapshotCh:        make(chan *userSnapshotFuture),
        userRestoreCh:         make(chan *userRestoreFuture),
        shutdownCh:            make(chan struct{}),
        stable:                stable,
        trans:                 trans,
        verifyCh:              make(chan *verifyFuture, 64),
        configurationsCh:      make(chan *configurationsFuture, 8),
        bootstrapCh:           make(chan *bootstrapFuture),
        observers:             make(map[uint64]*Observer),
    }
//构建一个raft的结构体
// Initialize as a follower.启动的时候默认是follower的角色
    r.setState(Follower)
// Setup a heartbeat fast-path to avoid head-of-line
    // blocking where possible. It MUST be safe for this
    // to be called concurrently with a blocking RPC.
    trans.SetHeartbeatHandler(r.processHeartbeat)

    // Start the background work.这几个核心的线程我们后面重点来关注
    r.goFunc(r.run)
    r.goFunc(r.runFSM)
    r.goFunc(r.runSnapshots)

相关文章

网友评论

      本文标题:consul源码笔记

      本文链接:https://www.haomeiwen.com/subject/phtmqqtx.html