etcd初始化流程
etcd启动时首先会调用startEtcdOrProxyV2
, 这个方法里首先会进行config的初始化以及解析传入的配置项,然后检查config中的Dir是否为空,如果为空则根据config中指定的Name来生成data dir
,默认如下所示,后面
再次启动时会检查data dir
的类型,目前有三种:member
, proxy
, empty
,分别代表成员,代理,空。然后进入不同的分支调用startEtcd
,或者startProxy
。
+---->startEtcd ---> configurePeerListeners ---> configureClientListeners ----> etcdserver.NewServer
|
startEtcdOrProxyV2 ---> newConifg ---> cfg.parse ---> identify data dir ---> |
|
+---->startProxy
etcd data dir如下:
(ENV) [root@ceph-2 etcd]# ls
10.255.101.74.etcd 10.255.101.74.proxy.etcd etcd.conf etcd-proxy.conf
(ENV) [root@ceph-2 etcd]# tree -h
.
├── [ 20] 10.255.101.74.etcd
│ └── [ 29] member
│ ├── [ 246] snap
│ │ ├── [366K] 0000000000000002-0000000000d5a021.snap
│ │ ├── [366K] 0000000000000002-0000000000d726c2.snap
│ │ ├── [366K] 0000000000000002-0000000000d8ad63.snap
│ │ ├── [366K] 0000000000000002-0000000000da3404.snap
│ │ ├── [362K] 0000000000000002-0000000000dbbaa5.snap
│ │ └── [ 20K] db
│ └── [ 244] wal
│ ├── [ 61M] 000000000000001e-0000000000c0cf3d.wal
│ ├── [ 61M] 000000000000001f-0000000000c775f1.wal
│ ├── [ 61M] 0000000000000020-0000000000ce234b.wal
│ ├── [ 61M] 0000000000000021-0000000000d4ce84.wal
│ ├── [ 61M] 0000000000000022-0000000000db76f5.wal
│ └── [ 61M] 0.tmp
├── [ 19] 10.255.101.74.proxy.etcd
│ └── [ 21] proxy
│ └── [ 70] cluster
├── [3.6K] etcd.conf
└── [ 558] etcd-proxy.conf
6 directories, 15 files
启动etcd server时会创建store,如果data dir, wal
dir和snap
dir不存在则创建, snap/db
为backend path。如果db
存在的话,则用db构建Backend
。构建完成后会启动goroutine执行backend.run()
。
// file: mvcc/backend/backend.go
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
}
接着,新创建Transport
// etcdserver/server.go
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
WAL
如果WAL目录存在,则会打开所有的wal并检验snapshot entries,其通过decoder来对wal进行解码,decoder结构如下
// wal/decoder.go
type decoder struct {
mu sync.Mutex
brs []*bufio.Reader
// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32
}
其中brs对应所有的wal文件Reader,分别遍历每个wal文件:
- 以
little endian
的形式读取wal开头8个字节,例如下面wal文件中开头8个字节为04 00 00 00 00 00 00 84
,注意是小端优先序,低56bits代表record字节,值为4; 高8bits的低3位部分代表pad,84的二进制表述为10000100, 低三位的值为4。WAL entry size
最大为10MB。每个WAL segment file
的默认大小为64MB。0000000 04 00 00 00 00 00 00 84 08 04 10 00 00 00 00 00 0000010 20 00 00 00 00 00 00 00 08 01 10 bf ae e5 db 08 0000020 1a 16 08 e9 e4 bc b2 8f ba fc 88 82 01 10 c4 cf
var ( // SegmentSizeBytes is the preallocated size of each wal segment file. // The actual size might be larger than this. In general, the default // value should be used, but this is defined as an exported variable // so that tests can set a different segment size. SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB )
- 读取record bytes + padding bytes
- 将其反序列化为Record,其结构如下,其中包括类型,CRC以及数据,校验时会根据data计算其CRC值,然后与Record中的CRC值进行比较,如果不相等,说明数据已经损坏。
- 获取所有Record类型为snapshot且其Index小于
Committed hardState
。
// wal/walpb/record.pb.go
type Record struct {
Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
如下所示,Record
有五种类型
// wal/wal.go
const (
metadataType int64 = iota + 1
entryType
stateType
crcType
snapshotType
)
// raft/raftpb/raft.pb.go
type HardState struct {
Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
// wal/walpb/record.pb.go
type Snapshot struct {
Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}`
+----------------------------------------------------------------------+
| +-------------------------------+---------------------------------+ |
| | record bytes<56bits> | padding <lower 3 bits of 8bits> | |
| |-----------------------------------------------------------------+ |
| | data | |
| +-----------------------------------------------------------------+ |
| ... |
+----------------------------------------------------------------------+
Snapshot
snap
目录下只包含.snap
结尾的文件以及db
文件。其中每个.snap
文件命名格式为%016x-%016x.wal
,即seq-index.wal
。其对应snappb.Snapshot
结构
// etcdserver/api/snap/snappb/snap.pb.go
type Snapshot struct {
Crc uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
snappb.Snapshot
中的Data
又对应raftpb.Snapshot
// raft/raftpb/raft.pb.go
type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
启动/重启节点
根据是否存在WAL目录,以及是否是new cluster来判断执行启动节点还是重启节点,下面以重启节点为例进行介绍。
- 从WAL中读取
metadata
,raftpb.HardState
以及所有的raftpb.Entry
。
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}
- 创建
RaftCluster
对象,其中metadata
中的NodeID
和ClusterID
分别对应RaftCluster
的localID
和cid
。
// file: etcdserver/api/membership/cluster.go
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
lg *zap.Logger
localID types.ID
cid types.ID
token string
v2store v2store.Store
be backend.Backend
sync.Mutex // guards the fields below
version *semver.Version
members map[types.ID]*Member
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool
downgradeInfo *DowngradeInfo
}
- 创建
MemoryStorage
。- Apply snapshot
- 设置HardState,步骤一中获取的值
- 将步骤一中获取的entries append到
MemoryStorage
。
// file: raft/storage.go
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
// Protects access to all fields. Most methods of MemoryStorage are
// run on the raft goroutine, but Append() is run on an application
// goroutine.
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
}
MemoryStorage
4 . 根据raft.Config配置重启Node
通常建议ElectionTick = 10 * HeartbeatTick
,这样可以避免不必要的leader切换。
// file: raft/rawnode.go
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
-
根据raft.Config新建Raft。详见
raft/raft.go
文件。-
校验Config
-
新建raftLog。如下图所示,需要注意的是raftLog的
raftLogcommitted
和applied
初始值为firstIndex - 1
,log.unstable.offset
等于lastIndex + 1
。
60 log := &raftLog{ 61 storage: storage, 62 logger: logger, 63 maxNextEntsSize: maxNextEntsSize, 64 } 65 firstIndex, err := storage.FirstIndex() 66 if err != nil { 67 panic(err) // TODO(bdarnell) 68 } 69 lastIndex, err := storage.LastIndex() 70 if err != nil { 71 panic(err) // TODO(bdarnell) 72 } 73 log.unstable.offset = lastIndex + 1 74 log.unstable.logger = logger 75 // Initialize our committed and applied pointers to the time of the last compaction. 76 log.committed = firstIndex - 1 77 log.applied = firstIndex - 1
-
从Memory Storage中获取
HardState
和ConfState
,前面提到过Memory Storage会Apply snapshot已经获取WAL中记录的Hard State信息。 -
构建raft信息,默认情况下每条message的最大size为1MB。
const ( // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). // Assuming the RTT is around 10ms, 1MB max size is large enough. maxSizePerMsg = 1 * 1024 * 1024 // Never overflow the rafthttp buffer, which is 4096. // TODO: a better const? maxInflightMsgs = 4096 / 8 )
- 根据上面步骤3获取的
HardState
设置raft的Vote
,Term
以及raftLog的committed
。 - 初始化节点为follower节点。包括为其指定
step
,tick
, 重置Term
值,将其Lead
置为None,State
置为Follower。
695 func (r *raft) becomeFollower(term uint64, lead uint64) { 696 r.step = stepFollower 697 r.reset(term) 698 r.tick = r.tickElection 699 r.lead = lead 700 r.state = StateFollower 701 r.logger.Infof("%x became follower at term %d", r.id, r.Term) 702 }
-
-
根据Raft信息构建RawNode,将raft的
HardState
和Soft State
保存在rawNode的prevSoftSt
和prevHardSt
。
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
- 新建raft node,其中node为
Node
接口的标准实现。
// file: raft/node.go
// Node represents a node in a raft cluster.
type Node interface
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode
}
- 启动goroutine运行
node.run()
方法。详见raft/node.go
文件。
// file: raft/raft.go
// StateType represents the role of a node in a cluster.
type StateType uint64
var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
"StatePreCandidate",
}
Transport
Peer
远端raft node通过peer
来进行表述,本地raft node通过peer来向远端发送messages,每个peer有两种底层的机制来发送messages,分别为stream
和pipeline
。
etcd主要采用Stream
消息通道和pipeline
消息通道,其中Stream消息通道维护HTTP长连接,主要负责数据传输量较小,发送比较频繁的消息,而pipeline消息通道在传输数据完成后会立即关闭连接,主要负责传输数据量较大,发送频率较低的消息,例如传输快照数据。
Handler
/raft --> pipelineHandler
/raft/stream/ --> streamHandler
/raft/sanpshot --> snapshotHandler
/raft/probing --> httpHealth
Message encoder/decoder
Message的encoder/decoder
通过封装io.Writer/Reader,分别对Message进行编码,解码。
+----------------------------------------------------------------------+
| +-------------------------------+---------------------------------+ |
| | message size (8 bytes) | |
| |-----------------------------------------------------------------+ |
| | data | |
| +-----------------------------------------------------------------+ |
| ... |
+----------------------------------------------------------------------+
编码时先写入8字节的message大小,然后才是序列号过后的数据。
解码正好与之相反,首先读取8字节的message大小,然后判断其是否大于512MB
,如果大于则直接返错。如果小于阈值则将其反序列化为Message。也可以通过指定读取的字节大小,例如snapshot信息最大可为1TB。详细见etcdserver/api/rafthttp/msg_codec.go
。
// messageEncoder is a encoder that can encode all kinds of messages.
// It MUST be used with a paired messageDecoder.
type messageEncoder struct {
w io.Writer
}
func (enc *messageEncoder) encode(m *raftpb.Message) error {
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
_, err := enc.w.Write(pbutil.MustMarshal(m))
return err
}
// messageDecoder is a decoder that can decode all kinds of messages.
type messageDecoder struct {
r io.Reader
}
var (
readBytesLimit uint64 = 512 * 1024 * 1024 // 512 MB
ErrExceedSizeLimit = errors.New("rafthttp: error limit exceeded")
)
func (dec *messageDecoder) decode() (raftpb.Message, error) {
return dec.decodeLimit(readBytesLimit)
}
func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
var m raftpb.Message
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
if l > numBytes {
return m, ErrExceedSizeLimit
}
buf := make([]byte, int(l))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
return m, m.Unmarshal(buf)
}
本文是基于etcd 3.5.0-pre
版本。
网友评论