项目源码地址
https://github.com/etcd-io/etcd
源码解析
入口梳理
build.sh
etcd_build() {
out="bin"
if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; fi
toggle_failpoints_default
run rm -f "${out}/etcd"
(
cd ./server
# Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK
# shellcheck disable=SC2086
run env "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
-installsuffix=cgo \
"-ldflags=${GO_LDFLAGS[*]}" \
-o="../${out}/etcd" . || return 2
) || return 2
.......
从build.sh可以看出 项目的启动文件在./server目录下
启动梳理
这里会忽略无意义的跳转 如果感兴趣可以自己下载源码跟踪一下
func Main(args []string) {
checkSupportArch()
if len(args) > 1 {
cmd := args[1]
switch cmd {
case "gateway", "grpc-proxy":
if err := rootCmd.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
return
}
}
startEtcdOrProxyV2(args)
}
如果传递了参数 可以以proxy启动 这里先忽略
func startEtcdOrProxyV2(args []string) {
grpc.EnableTracing = false
cfg := newConfig()
//根据配置初始化日志 集群信息等内容
.....
var stopped <-chan struct{}
var errc <-chan error
//这里对数据存储目录做了校验 禁止proxy和data同时配置
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
lg.Info(
"server has been already initialized",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
switch which {
case dirMember:
// 开始启动etcd
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
err = startProxy(cfg)
default:
lg.Panic(
"unknown directory type",
zap.String("dir-type", string(which)),
)
}
} else {
shouldProxy := cfg.isProxy()
//如果是作为proxy 才会执行这个
......
if shouldProxy {
err = startProxy(cfg)
}
}
//对异常和中断通知做处理 忽略
......
osutil.Exit(0)
}
初始化配置 下面开始正式启动流程
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Close)
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}
开始启动etcd:
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = inCfg.Validate(); err != nil {
return nil, err
}
serving := false
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
defer func() {
if e == nil || err == nil {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.serversC)
}
}
e.Close()
e = nil
}()
if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
)
// 开始其他member发送信息的监听 即默认的2380
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err
}
e.cfg.logger.Info(
"configuring client listeners",
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
)
// 开启client服务监听 即默认的2379
if e.sctxs, err = configureClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
var (
urlsmap types.URLsMap
token string
)
memberInitialized := true
// 通过wal文件是否存在 来判断是否进行过初始化
if !isMemberInitialized(cfg) {
memberInitialized = false
//获取集群的地址和集群令牌
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
if err != nil {
return e, fmt.Errorf("error setting up initial cluster: %v", err)
}
}
// AutoCompactionRetention defaults to "0" if not set.
if len(cfg.AutoCompactionRetention) == 0 {
cfg.AutoCompactionRetention = "0"
}
// 获取自动压缩版本的周期 mode: Revision根据配置获得 Periodic 配置时间*1 hour
autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
if err != nil {
return e, err
}
// boltdb 后端存储方式 数组/map 这个应该是对应阿里之前的boltdb优化分享
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := config.ServerConfig{
.....
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized {
if err = e.Server.CheckInitialHashKV(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
e.Server.Cleanup()
e.Server = nil
return e, err
}
}
e.Server.Start()
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
e.cfg.logger.Info(
"now serving peer/client/metrics",
zap.String("local-member-id", e.Server.ID().String()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
)
serving = true
return e, nil
对配置参数转换 并开启监听 这里先进入 etcdserver.NewServer 分析新建Server的过程
NewServer
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
b.be.Close()
}
}()
sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss),
id: b.raft.wal.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.raft.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.be
srv.beHooks = b.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
time.Duration(cfg.TokenTTL)*time.Second,
)
if err != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
// closing backend without first closing kv can cause
// resumed compactions to fail with closed tx errors
if err != nil {
newSrv.kv.Close()
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
return nil, err
}
srv.compactor.Run()
}
srv.applyV3Base = srv.newApplierV3Backend()
srv.applyV3Internal = srv.newApplierV3Internal()
if err = srv.restoreAlarms(); err != nil {
return nil, err
}
if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
})
}
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(),
ID: b.raft.wal.id,
URLs: cfg.PeerURLs,
ClusterID: b.raft.cl.ID(),
Raft: srv,
Snapshotter: b.ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err
}
// add all remotes into transport
for _, m := range b.remotes {
if m.ID != b.raft.wal.id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range b.raft.cl.Members() {
if m.ID != b.raft.wal.id {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.r.transport = tr
return srv, nil
}
创建server的大体流程如下:
- 存储数据的初始化
- 配置server的参数
- 新建mvcc服务 (todo 具体分析)
- 启动transport服务
bootstrap
// 新建存储目录,快照目录 后端存储
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
cfg.Logger.Warn(
"exceeded recommended request limit",
zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
zap.String("recommended-request-size", recommendedMaxRequestBytesString),
)
}
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
haveWAL := wal.Exist(cfg.WALDir())
ss := bootstrapSnapshot(cfg)
be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
be.Close()
}
}()
// 获取用于发送请求的客户端
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil {
return nil, err
}
switch {
case !haveWAL && !cfg.NewCluster:
/*
1. 检查配置分集群地址是否正确
2. 根据配置 创建集群服务 并且根据配置添加member 并生成自己的id
3. 根据配置 请求其他已存在的etcd服务 获取集群和member信息
4. 将步骤2,3的集群信息比对 核实集群信息和配置的是一致的
5. 验证集群中的其他etcd的版本是否与本地etcd版本兼容
6. 创建符合raft协议的存储服务
*/
b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
case !haveWAL && cfg.NewCluster:
/*
1. 检查配置分集群地址是否正确
2. 根据配置 创建集群服务 并且根据配置添加member 并生成自己的id
3. 根据配置 验证集群中是否有相同的member已经启动
4. 创建符合raft协议的存储服务
*/
b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
case haveWAL:
/*
1. 验证快照是否可用
2. 读取快照 尝试从快照回复etcd
3. 创建符合raft协议的存储服务
*/
b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
default:
be.Close()
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err
}
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
b.prt = prt
b.ci = ci
b.st = st
b.be = be
b.ss = ss
b.beHooks = beHooks
return b, nil
}
网友评论