美文网首页
consul源码笔记

consul源码笔记

作者: leiwingqueen | 来源:发表于2018-11-25 19:07 被阅读0次

    一、概要

    从本周开始对consul的源码做一个简单的阅读和了解,希望能持续下去吧。
    consul是使用go编写的,在阅读过程中可能会涉及到对go语法的相关笔记,这个会分开两个系列文章去更新。

    二、分析

    1.agent的定义(agent.go)

    // The agent is the long running process that is run on every machine.
    // It exposes an RPC interface that is used by the CLI to control the
    // agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
    // However, it can run in either a client, or server mode. In server
    // mode, it runs a full Consul server. In client-only mode, it only forwards
    // requests to other Consul servers.
    type Agent struct {
        // config is the agent configuration.
        config *config.RuntimeConfig
        // delegate is either a *consul.Server or *consul.Client
        // depending on the configuration
        delegate delegate
    

    agent是一个常驻进程部署在所有机器上,可以分client和server两种模式运行(client模式只负责转发请求,轻量级)。
    config为agent的配置,包括nodeID等核心的配置都在里
    delegate为Server或者client的对象,取决于进程启动选择的方式
    2.程序入口(agent.go)

    func (a *Agent) Start() error {
    ...
    // Setup either the client or the server.
        if c.ServerMode {
            server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
            if err != nil {
                return fmt.Errorf("Failed to start Consul server: %v", err)
            }
            a.delegate = server
        } else {
            client, err := consul.NewClientLogger(consulCfg, a.logger)
            if err != nil {
                return fmt.Errorf("Failed to start Consul client: %v", err)
            }
            a.delegate = client
        }
    ...
    

    这里主要是构建一个Server或者client的对象,我们接下来看一个server对象是如何构建的

    #server.go
    // NewServer is used to construct a new Consul server from the
    // configuration, potentially returning an error
    func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {
    ...
    // Initialize the Raft server.
        if err := s.setupRaft(); err != nil {
            s.Shutdown()
            return nil, fmt.Errorf("Failed to start Raft: %v", err)
        }
    
    // setupRaft is used to setup and initialize Raft
    func (s *Server) setupRaft() error {
    ...
    // Setup the Raft store.
        s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
        if err != nil {
            return err
        }
        return nil
    

    我们看下server对象的重要属性,跟raft协议相关的对象封装在这里。

    // Server is Consul server which manages the service discovery,
    // health checking, DC forwarding, Raft, and multiple Serf pools.
    type Server struct {
    ...
    // fsm is the state machine used with Raft to provide
        // strong consistency.
        fsm *fsm.FSM
    // The raft instance is used among Consul nodes within the DC to protect
        // operations that require strong consistency.
        // the state directly.
        raft          *raft.Raft
        raftLayer     *RaftLayer
        raftStore     *raftboltdb.BoltStore
        raftTransport *raft.NetworkTransport
        raftInmem     *raft.InmemStore
    
    func (a *Agent) serveHTTP(srv *HTTPServer) error {
        // https://github.com/golang/go/issues/20239
        //
        // In go.8.1 there is a race between Serve and Shutdown. If
        // Shutdown is called before the Serve go routine was scheduled then
        // the Serve go routine never returns. This deadlocks the agent
        // shutdown for some tests since it will wait forever.
        notif := make(chan net.Addr)
        a.wgServers.Add(1)
        go func() {
            defer a.wgServers.Done()
            notif <- srv.ln.Addr()
            err := srv.Serve(srv.ln)
            if err != nil && err != http.ErrServerClosed {
                a.logger.Print(err)
            }
        }()
    
        select {
        case addr := <-notif:
            if srv.proto == "https" {
                a.logger.Printf("[INFO] agent: Started HTTPS server on %s (%s)", addr.String(), addr.Network())
            } else {
                a.logger.Printf("[INFO] agent: Started HTTP server on %s (%s)", addr.String(), addr.Network())
            }
            return nil
        case <-time.After(time.Second):
            return fmt.Errorf("agent: timeout starting HTTP servers")
        }
    }
    

    这里做的事情很简单,启动服务器并输出日志。

    最后我们看起构建一个raft对象具体做了哪些事情。

    #api.go
    // NewRaft is used to construct a new Raft node. It takes a configuration, as well
    // as implementations of various interfaces that are required. If we have any
    // old state, such as snapshots, logs, peers, etc, all those will be restored
    // when creating the Raft node.
    func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
    ...
    // Try to restore the current term.
        currentTerm, err := stable.GetUint64(keyCurrentTerm)
        if err != nil && err.Error() != "not found" {
            return nil, fmt.Errorf("failed to load current term: %v", err)
        }
    //这里会尝试获得当前的任期(currentTerm),这里取决于实现的方式,如果是inmem的方式实现则会从内存中获取,BoltStore也是用kv的格式保存,但是会持久化到硬盘。
    // Create Raft struct.
        r := &Raft{
            protocolVersion: protocolVersion,
            applyCh:         make(chan *logFuture),
            conf:            *conf,
            fsm:             fsm,
            fsmMutateCh:     make(chan interface{}, 128),
            fsmSnapshotCh:   make(chan *reqSnapshotFuture),
            leaderCh:        make(chan bool),
            localID:         localID,
            localAddr:       localAddr,
            logger:          logger,
            logs:            logs,
            configurationChangeCh: make(chan *configurationChangeFuture),
            configurations:        configurations{},
            rpcCh:                 trans.Consumer(),
            snapshots:             snaps,
            userSnapshotCh:        make(chan *userSnapshotFuture),
            userRestoreCh:         make(chan *userRestoreFuture),
            shutdownCh:            make(chan struct{}),
            stable:                stable,
            trans:                 trans,
            verifyCh:              make(chan *verifyFuture, 64),
            configurationsCh:      make(chan *configurationsFuture, 8),
            bootstrapCh:           make(chan *bootstrapFuture),
            observers:             make(map[uint64]*Observer),
        }
    //构建一个raft的结构体
    // Initialize as a follower.启动的时候默认是follower的角色
        r.setState(Follower)
    // Setup a heartbeat fast-path to avoid head-of-line
        // blocking where possible. It MUST be safe for this
        // to be called concurrently with a blocking RPC.
        trans.SetHeartbeatHandler(r.processHeartbeat)
    
        // Start the background work.这几个核心的线程我们后面重点来关注
        r.goFunc(r.run)
        r.goFunc(r.runFSM)
        r.goFunc(r.runSnapshots)
    

    相关文章

      网友评论

          本文标题:consul源码笔记

          本文链接:https://www.haomeiwen.com/subject/phtmqqtx.html