美文网首页
etcd学习笔记2(草稿)

etcd学习笔记2(草稿)

作者: 酱油王0901 | 来源:发表于2020-06-14 18:21 被阅读0次

    etcd初始化流程

    etcd启动时首先会调用startEtcdOrProxyV2, 这个方法里首先会进行config的初始化以及解析传入的配置项,然后检查config中的Dir是否为空,如果为空则根据config中指定的Name来生成data dir,默认如下所示,后面
    再次启动时会检查data dir的类型,目前有三种:member, proxy, empty,分别代表成员,代理,空。然后进入不同的分支调用startEtcd,或者startProxy

                                                                                 +---->startEtcd ---> configurePeerListeners ---> configureClientListeners ----> etcdserver.NewServer
                                                                                 |
    startEtcdOrProxyV2 ---> newConifg ---> cfg.parse ---> identify data dir ---> |
                                                                                 |
                                                                                 +---->startProxy
    

    etcd data dir如下:

    (ENV) [root@ceph-2 etcd]# ls
    10.255.101.74.etcd  10.255.101.74.proxy.etcd  etcd.conf  etcd-proxy.conf
    (ENV) [root@ceph-2 etcd]# tree -h
    .
    ├── [  20]  10.255.101.74.etcd
    │   └── [  29]  member
    │       ├── [ 246]  snap
    │       │   ├── [366K]  0000000000000002-0000000000d5a021.snap
    │       │   ├── [366K]  0000000000000002-0000000000d726c2.snap
    │       │   ├── [366K]  0000000000000002-0000000000d8ad63.snap
    │       │   ├── [366K]  0000000000000002-0000000000da3404.snap
    │       │   ├── [362K]  0000000000000002-0000000000dbbaa5.snap
    │       │   └── [ 20K]  db
    │       └── [ 244]  wal
    │           ├── [ 61M]  000000000000001e-0000000000c0cf3d.wal
    │           ├── [ 61M]  000000000000001f-0000000000c775f1.wal
    │           ├── [ 61M]  0000000000000020-0000000000ce234b.wal
    │           ├── [ 61M]  0000000000000021-0000000000d4ce84.wal
    │           ├── [ 61M]  0000000000000022-0000000000db76f5.wal
    │           └── [ 61M]  0.tmp
    ├── [  19]  10.255.101.74.proxy.etcd
    │   └── [  21]  proxy
    │       └── [  70]  cluster
    ├── [3.6K]  etcd.conf
    └── [ 558]  etcd-proxy.conf
    
    6 directories, 15 files
    

    启动etcd server时会创建store,如果data dir, wal dir和snap dir不存在则创建, snap/db为backend path。如果db存在的话,则用db构建Backend。构建完成后会启动goroutine执行backend.run()

    // file: mvcc/backend/backend.go
    type Backend interface {
        // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
        ReadTx() ReadTx
        BatchTx() BatchTx
        // ConcurrentReadTx returns a non-blocking read transaction.
        ConcurrentReadTx() ReadTx
    
        Snapshot() Snapshot
        Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
        // Size returns the current size of the backend physically allocated.
        // The backend can hold DB space that is not utilized at the moment,
        // since it can conduct pre-allocation or spare unused space for recycling.
        // Use SizeInUse() instead for the actual DB size.
        Size() int64
        // SizeInUse returns the current size of the backend logically in use.
        // Since the backend can manage free space in a non-byte unit such as
        // number of pages, the returned value can be not exactly accurate in bytes.
        SizeInUse() int64
        // OpenReadTxN returns the number of currently open read transactions in the backend.
        OpenReadTxN() int64
        Defrag() error
        ForceCommit()
        Close() error
    }
    

    接着,新创建Transport

    // etcdserver/server.go
    prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
    

    WAL

    如果WAL目录存在,则会打开所有的wal并检验snapshot entries,其通过decoder来对wal进行解码,decoder结构如下

    // wal/decoder.go
    type decoder struct {
         mu  sync.Mutex
         brs []*bufio.Reader
    
         // lastValidOff file offset following the last valid decoded record
         lastValidOff int64
         crc          hash.Hash32
     }
    

    其中brs对应所有的wal文件Reader,分别遍历每个wal文件:

    1. little endian的形式读取wal开头8个字节,例如下面wal文件中开头8个字节为04 00 00 00 00 00 00 84,注意是小端优先序,低56bits代表record字节,值为4; 高8bits的低3位部分代表pad,84的二进制表述为10000100, 低三位的值为4。WAL entry size最大为10MB。每个WAL segment file的默认大小为64MB。
      0000000 04 00 00 00 00 00 00 84 08 04 10 00 00 00 00 00
      0000010 20 00 00 00 00 00 00 00 08 01 10 bf ae e5 db 08
      0000020 1a 16 08 e9 e4 bc b2 8f ba fc 88 82 01 10 c4 cf
      
      var (
          // SegmentSizeBytes is the preallocated size of each wal segment file.
          // The actual size might be larger than this. In general, the default
          // value should be used, but this is defined as an exported variable
          // so that tests can set a different segment size.
          SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
      )
      
    2. 读取record bytes + padding bytes
    3. 将其反序列化为Record,其结构如下,其中包括类型,CRC以及数据,校验时会根据data计算其CRC值,然后与Record中的CRC值进行比较,如果不相等,说明数据已经损坏。
    4. 获取所有Record类型为snapshot且其Index小于Committed hardState
    // wal/walpb/record.pb.go
    type Record struct {
        Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"`
        Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
        Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
        XXX_unrecognized []byte `json:"-"`
    }
    

    如下所示,Record有五种类型

    // wal/wal.go
    const (
        metadataType int64 = iota + 1
        entryType
        stateType
        crcType
        snapshotType
    )
    
    // raft/raftpb/raft.pb.go
    type HardState struct {
        Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
        Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
        Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
        XXX_unrecognized []byte `json:"-"`
    }
    // raft/raftpb/raft.pb.go
    type Entry struct {
        Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
        Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
        Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
        Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
        XXX_unrecognized []byte    `json:"-"`
    }
    // wal/walpb/record.pb.go
    type Snapshot struct {
        Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
        Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
        XXX_unrecognized []byte `json:"-"`
    }
    
    // etcdserver/etcdserverpb/etcdserver.pb.go
    type Metadata struct {
        NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
        ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
        XXX_unrecognized []byte `json:"-"`
    }`
    
    +----------------------------------------------------------------------+
    |  +-------------------------------+---------------------------------+ |
    |  |     record bytes<56bits>      | padding <lower 3 bits of 8bits> | |
    |  |-----------------------------------------------------------------+ |
    |  |                              data                               | |
    |  +-----------------------------------------------------------------+ |
    |                                  ...                                 |
    +----------------------------------------------------------------------+
    

    Snapshot

    snap目录下只包含.snap结尾的文件以及db文件。其中每个.snap文件命名格式为%016x-%016x.wal,即seq-index.wal。其对应snappb.Snapshot结构

    // etcdserver/api/snap/snappb/snap.pb.go
    type Snapshot struct {
        Crc              uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
        Data             []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
        XXX_unrecognized []byte `json:"-"`
    }
    

    snappb.Snapshot中的Data又对应raftpb.Snapshot

    // raft/raftpb/raft.pb.go
    type Snapshot struct {
        Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
        Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
        XXX_unrecognized []byte           `json:"-"`
    }
    // raft/raftpb/raft.pb.go
    type SnapshotMetadata struct {
        ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
        Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
        Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`
        XXX_unrecognized []byte    `json:"-"`
    }
    

    启动/重启节点

    根据是否存在WAL目录,以及是否是new cluster来判断执行启动节点还是重启节点,下面以重启节点为例进行介绍。

    1. 从WAL中读取 metadataraftpb.HardState以及所有的raftpb.Entry
    // etcdserver/etcdserverpb/etcdserver.pb.go
    type Metadata struct {
        NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
        ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
        XXX_unrecognized []byte `json:"-"`
    }
    
    1. 创建RaftCluster对象,其中metadata中的NodeIDClusterID分别对应RaftClusterlocalIDcid
    // file: etcdserver/api/membership/cluster.go
    // RaftCluster is a list of Members that belong to the same raft cluster
    type RaftCluster struct {
       lg *zap.Logger
    
       localID types.ID
       cid     types.ID
       token   string
    
       v2store v2store.Store
       be      backend.Backend
    
       sync.Mutex // guards the fields below
       version    *semver.Version
       members    map[types.ID]*Member
       // removed contains the ids of removed members in the cluster.
       // removed id cannot be reused.
       removed map[types.ID]bool
    
       downgradeInfo *DowngradeInfo
    }
    
    1. 创建MemoryStorage
      • Apply snapshot
      • 设置HardState,步骤一中获取的值
      • 将步骤一中获取的entries append到MemoryStorage
    // file: raft/storage.go
    // MemoryStorage implements the Storage interface backed by an
    // in-memory array.
    type MemoryStorage struct {
        // Protects access to all fields. Most methods of MemoryStorage are
        // run on the raft goroutine, but Append() is run on an application
        // goroutine.
        sync.Mutex
    
        hardState pb.HardState
        snapshot  pb.Snapshot
        // ents[i] has raft log position i+snapshot.Metadata.Index
        ents []pb.Entry
    }
    
    MemoryStorage

    4 . 根据raft.Config配置重启Node
    通常建议ElectionTick = 10 * HeartbeatTick,这样可以避免不必要的leader切换。

    // file: raft/rawnode.go
    // RawNode is a thread-unsafe Node.
    // The methods of this struct correspond to the methods of Node and are described
    // more fully there.
    type RawNode struct {
        raft       *raft
        prevSoftSt *SoftState
        prevHardSt pb.HardState
    }
    
    • 根据raft.Config新建Raft。详见raft/raft.go文件。

      1. 校验Config

      2. 新建raftLog。如下图所示,需要注意的是raftLog的committedapplied初始值为firstIndex - 1log.unstable.offset 等于lastIndex + 1

        raftLog
          60     log := &raftLog{
          61         storage:         storage,
          62         logger:          logger,
          63         maxNextEntsSize: maxNextEntsSize,
          64     }
          65     firstIndex, err := storage.FirstIndex()
          66     if err != nil {
          67         panic(err) // TODO(bdarnell)
          68     }
          69     lastIndex, err := storage.LastIndex()
          70     if err != nil {
          71         panic(err) // TODO(bdarnell)
          72     }
          73     log.unstable.offset = lastIndex + 1
          74     log.unstable.logger = logger
          75     // Initialize our committed and applied pointers to the time of the last compaction.
          76     log.committed = firstIndex - 1
          77     log.applied = firstIndex - 1
        
      3. 从Memory Storage中获取HardStateConfState,前面提到过Memory Storage会Apply snapshot已经获取WAL中记录的Hard State信息。

      4. 构建raft信息,默认情况下每条message的最大size为1MB。

       const (
           // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
           // Assuming the RTT is around 10ms, 1MB max size is large enough.
           maxSizePerMsg = 1 * 1024 * 1024
           // Never overflow the rafthttp buffer, which is 4096.
           // TODO: a better const?
           maxInflightMsgs = 4096 / 8
       )
      
      1. 根据上面步骤3获取的HardState设置raft的VoteTerm以及raftLog的committed
      2. 初始化节点为follower节点。包括为其指定step, tick, 重置Term值,将其Lead置为None,State置为Follower。
        695 func (r *raft) becomeFollower(term uint64, lead uint64) {
        696     r.step = stepFollower
        697     r.reset(term)
        698     r.tick = r.tickElection
        699     r.lead = lead
        700     r.state = StateFollower
        701     r.logger.Infof("%x became follower at term %d", r.id, r.Term)
        702 }
      
    • 根据Raft信息构建RawNode,将raft的HardStateSoft State保存在rawNode的prevSoftStprevHardSt

    // RawNode is a thread-unsafe Node.
    // The methods of this struct correspond to the methods of Node and are described
    // more fully there.
    type RawNode struct {
        raft       *raft
        prevSoftSt *SoftState
        prevHardSt pb.HardState
    }
    
    1. 新建raft node,其中node为Node接口的标准实现。
    // file: raft/node.go
    // Node represents a node in a raft cluster.   
    type Node interface 
    
    // node is the canonical implementation of the Node interface
    type node struct {
        propc      chan msgWithResult
        recvc      chan pb.Message
        confc      chan pb.ConfChangeV2
        confstatec chan pb.ConfState
        readyc     chan Ready
        advancec   chan struct{}
        tickc      chan struct{}
        done       chan struct{}
        stop       chan struct{}
        status     chan chan Status
    
        rn *RawNode
    }
    
    1. 启动goroutine运行node.run()方法。详见raft/node.go文件。
    // file: raft/raft.go
    // StateType represents the role of a node in a cluster.
    type StateType uint64
    
    var stmap = [...]string{
        "StateFollower",
        "StateCandidate",
        "StateLeader",
        "StatePreCandidate",
    }
    

    Transport


    Peer

    远端raft node通过peer来进行表述,本地raft node通过peer来向远端发送messages,每个peer有两种底层的机制来发送messages,分别为streampipeline

    etcd主要采用Stream消息通道和pipeline消息通道,其中Stream消息通道维护HTTP长连接,主要负责数据传输量较小,发送比较频繁的消息,而pipeline消息通道在传输数据完成后会立即关闭连接,主要负责传输数据量较大,发送频率较低的消息,例如传输快照数据。


    Handler

    /raft  --> pipelineHandler
    /raft/stream/ --> streamHandler
    /raft/sanpshot --> snapshotHandler
    /raft/probing --> httpHealth
    

    Message encoder/decoder

    Message的encoder/decoder通过封装io.Writer/Reader,分别对Message进行编码,解码。

    +----------------------------------------------------------------------+
    |  +-------------------------------+---------------------------------+ |
    |  |                    message size (8 bytes)                       | |
    |  |-----------------------------------------------------------------+ |
    |  |                              data                               | |
    |  +-----------------------------------------------------------------+ |
    |                                  ...                                 |
    +----------------------------------------------------------------------+
    

    编码时先写入8字节的message大小,然后才是序列号过后的数据。
    解码正好与之相反,首先读取8字节的message大小,然后判断其是否大于512MB,如果大于则直接返错。如果小于阈值则将其反序列化为Message。也可以通过指定读取的字节大小,例如snapshot信息最大可为1TB。详细见etcdserver/api/rafthttp/msg_codec.go

    // messageEncoder is a encoder that can encode all kinds of messages.
    // It MUST be used with a paired messageDecoder.
    type messageEncoder struct {
        w io.Writer
    }
    
    func (enc *messageEncoder) encode(m *raftpb.Message) error {
        if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
            return err
        }
        _, err := enc.w.Write(pbutil.MustMarshal(m))
        return err
    }
    
    // messageDecoder is a decoder that can decode all kinds of messages.
    type messageDecoder struct {
        r io.Reader
    }
    
    var (
        readBytesLimit     uint64 = 512 * 1024 * 1024 // 512 MB
        ErrExceedSizeLimit        = errors.New("rafthttp: error limit exceeded")
    )
    
    func (dec *messageDecoder) decode() (raftpb.Message, error) {
        return dec.decodeLimit(readBytesLimit)
    }
    
    func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
        var m raftpb.Message
        var l uint64
        if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
            return m, err
        }
        if l > numBytes {
            return m, ErrExceedSizeLimit
        }
        buf := make([]byte, int(l))
        if _, err := io.ReadFull(dec.r, buf); err != nil {
            return m, err
        }
        return m, m.Unmarshal(buf)
    }
    

    本文是基于etcd 3.5.0-pre版本。


    References

    相关文章

      网友评论

          本文标题:etcd学习笔记2(草稿)

          本文链接:https://www.haomeiwen.com/subject/ouidzhtx.html