美文网首页
3.etcd源码笔记 - example

3.etcd源码笔记 - example

作者: 兰CC | 来源:发表于2019-08-08 14:49 被阅读0次

    etcd 基于 raft library 写了一个demo,提供了简单的 put、get、节点变更,外界通过 http 访问

    代码都在 etcd/contrib/raftexample 包下,可以先读 README.md 文档看下

    一、整体结构

    3.1 design.png
    • demo 支持3个功能 put、get、节点变更
    • put 请求不是直接调用应用层的接口,而是通过 propose chan,应用层再读取消息异步处理
    • config change 请求同上,通过 config change chan,应用层再读取消息异步处理
    • get 请求是直接通过接口调用,同步返回
    • 支持wal,后续介绍

    二、基础元素

    1. KV Storage

    demo 很暴力的 用一个 map 实现了 kv storage

    代码在 etcd/contrib/raftexample/kvstore.go

    type kvstore struct {
        //只写chan,接收put请求
        proposeC    chan<- string
        mu          sync.RWMutex
        //简单的map实现kv存储
        kvStore     map[string]string
        //操作snapshot的类,这个类不关于数据格式,接收参数是 bytes
        snapshotter *snap.Snapshotter
    }
    
    //这是http put请求调用的方法
    //没有直接更改 kvStore, 而是写入 proposeC
    //再传入 raft 库,然后协商一致后,再写回 kvStore(后续详细介绍)
    func (s *kvstore) Propose(k string, v string) {
        var buf bytes.Buffer
        if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
            log.Fatal(err)
        }
        s.proposeC <- buf.String()
    }
    

    构造时会传入 *commitC <-chan string,只读取 chan,

    应用层接收 raft-libraryready 消息后,会把协商一致的提案写入 commitCkvStore 读取后进行处理

    func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
        //监听commitC
        for data := range commitC {
            //初始启动时,应用层会写入nil
            //触发 kvstore 加载磁盘文件
            if data == nil {
                snapshot, err := s.snapshotter.Load()
                ...
                continue
            }
            
            //如果是正常数据,就尝试用官方库序列化成二进制数据, 
            //成功的话就把原始值塞进map
            //之所以尝试序列化,是为了刷成snapshot做准备
            var dataKv kv
            dec := gob.NewDecoder(bytes.NewBufferString(*data))
            if err := dec.Decode(&dataKv); err != nil {
                log.Fatalf("raftexample: could not decode message (%v)", err)
            }
            s.mu.Lock()
            s.kvStore[dataKv.Key] = dataKv.Val
            s.mu.Unlock()
        }
        if err, ok := <-errorC; ok {
            log.Fatal(err)
        }
    }
    

    2. snapshotter

    上述 kvstore 运行期间,数据是放在map,即内存,需要定期持久化,不然重启后神马数据都没有了。

    持久化的对象是 raftpb.Snapshot,这个类来自etcd的公共包 etcd/raft/raftpb

    type Snapshot struct {
        //数据格式由调用方决定,持久化之后就是二进制数组
        Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
        Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
        XXX_unrecognized []byte           `json:"-"`
    }
    
    type SnapshotMetadata struct {
        ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
        Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
        Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`
        XXX_unrecognized []byte    `json:"-"`
    }
    
    //在这个demo里,数据格式就是把map刷成bytes
    func (s *kvstore) getSnapshot() ([]byte, error) {
        s.mu.Lock()
        defer s.mu.Unlock()
        return json.Marshal(s.kvStore)
    }
    

    在公共包内 etcd/snap 有一个类 snap.Snapshotter ,作用是操作 raftpb.Snapshot,包括以下功能

    • 写磁盘文件
    //raftpb.Snapshot.Metadata 包含 Term、Index 信息
    //文件命名规则是 “Term-Index.snap”,其中 Term、Index 不足16位,在前面补0
    //这个Index表示 snapshot的最后一条记录的 Index
    //不关心格式,参数丢进来就是raftpb.Snapshot
    func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error
    
    • 加载磁盘文件
    //加载的是最新的文件
    func (s *Snapshotter) Load() (*raftpb.Snapshot, error)
    

    3. WAL

    持久化光有 snapshot 不够,因为持久化不是实时的,一般是定时定量。

    一旦重启或宕机,最近一次持久化到服务挂掉这期间的数据还是有可能会丢失, WAL 正是解决这个问题的机制。

    WALwrite ahead log 的缩写,即在执行真正的写操作之前先写一个操作日志,这些日志都会严格保证持久化,即实时持久化,以保证整个操作的一致性和可恢复性。

    服务重启时,kvstoremap 根据 snapshot + WAL (基量 + 增量) 可以得到完整的恢复。

    关于 WAL 有太多的文章介绍,这边就记录阅读源码时遇到的几个问题。

    • 文件命名规则
    //“seq-index.wal”,其中 seq、index 不足16位,在前面补0
    //seq是递增,实际创建文件时基于wal目录下的 “最新文件的文件名里的 seq” + 1
    func walName(seq, index uint64) string {
        return fmt.Sprintf("%016x-%016x.wal", seq, index)
    }
    
    • 日志格式

    文件的读写分别交给 wal/encoder.go:decoderwal/encoder.go:encoder 处理

    写日志时,需要8字节对齐,这是为了性能考虑,cpu缓冲命中率有关系

    具体可以参考 https://stackoverflow.com/questions/21219130/is-8-byte-alignment-for-double-type-necessary

    3.2 wal.png
    • 文件大小 默认是64MB

    三、流程 - 启动

    1. 入口

    func main() {
        //解析参数
        cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
        id := flag.Int("id", 1, "node ID")
        kvport := flag.Int("port", 9121, "key-value server port")
        join := flag.Bool("join", false, "join an existing cluster")
        flag.Parse()
    
        //创建put请求写入的 propose chan
        proposeC := make(chan string)
        defer close(proposeC)
        //创建节点变更请求写入的 config change chan
        confChangeC := make(chan raftpb.ConfChange)
        defer close(confChangeC)
    
        var kvs *kvstore
        //应用层打包snapshot时,使用的是 kvs.getSnapshot(),就是直接序列化map
        getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
        //应用层基于 raft library 创建一个 raftNode
        //返回 commitC 上面有介绍
        //返回 snapshotterReady,这个是为了保证下面的 newKVStore 在raftNode执行完再执行
        commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
    
        //创建kvstore
        kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
    
        //启动http服务
        serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
    }
    

    2. newRaftNode

    func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
        confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
        ...
        go rc.startRaft()
        return commitC, errorC, rc.snapshotterReady
    }
    
    func (rc *raftNode) startRaft() {
        ...
        rc.snapshotter = snap.New(rc.snapdir)
        rc.snapshotterReady <- rc.snapshotter
    
        oldwal := wal.Exist(rc.waldir)
        rc.wal = rc.replayWAL()
        
        //调用 raft library 的 接口,启动一个 node
        if oldwal {
            rc.node = raft.RestartNode(c)
        } else {
            startPeers := rpeers
            if rc.join {
                startPeers = nil
            }
            rc.node = raft.StartNode(c, startPeers)
        }
        //创建传输层
        rc.transport = &rafthttp.Transport{
            ID:          types.ID(rc.id),
            ClusterID:   0x1000,
            Raft:        rc,
            ServerStats: stats.NewServerStats("", ""),
            LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
            ErrorC:      make(chan error),
        }
    
        rc.transport.Start()
        for i := range rc.peers {
            if i+1 != rc.id {
                rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
            }
        }
        
        go rc.serveRaft()
        //监听raft library 的 ready chan
        go rc.serveChannels()
    }
    
    func (rc *raftNode) serveChannels() {
        ...
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        //接收http层的请求
        go func() {
            ..
            for rc.proposeC != nil && rc.confChangeC != nil {
                select {
                case prop, ok := <-rc.proposeC:
                    if !ok {
                        rc.proposeC = nil
                    } else {
                        //put请求最终会在这边进行处理
                        rc.node.Propose(context.TODO(), []byte(prop))
                    }
                case cc, ok := <-rc.confChangeC:
                    if !ok {
                        rc.confChangeC = nil
                    } else {
                        confChangeCount += 1
                        cc.ID = confChangeCount
                        //节点变更请求最终会在这边进行处理
                        rc.node.ProposeConfChange(context.TODO(), cc)
                    }
                }
            }
            // client closed channel; shutdown raft if not already
            close(rc.stopc)
        }()
    
        //监听 raft library 抛出来的事件
        for {
            select {
            //周期性的执行任务,心跳、选举
            case <-ticker.C:
                rc.node.Tick()
            //raft library提交的事件
            case rd := <-rc.node.Ready():
                //先存wal
                rc.wal.Save(rd.HardState, rd.Entries)
                //如果有同步snapshot,则将snapshot存下来
                if !raft.IsEmptySnap(rd.Snapshot) {
                    rc.saveSnap(rd.Snapshot)
                    rc.raftStorage.ApplySnapshot(rd.Snapshot)
                    rc.publishSnapshot(rd.Snapshot)
                }
                rc.raftStorage.Append(rd.Entries)
                //如果有消息需要发给其他节点,发之
                rc.transport.Send(rd.Messages)
                //应用层处理已提交的提案,具体就是写到 commit chan,然后 kvstore 消费存储到其 map 中
                if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                    rc.stop()
                    return
                }
                //如果有需要对数据进行打包,打包之
                rc.maybeTriggerSnapshot()
                //告诉 raft 状态机 可以继续下一步的处理了
                rc.node.Advance()
            ...
            }
        }
    }
    

    3. newKVStore

    可以参考上述的基础元素说明

    4. serveHttpKVAPI

    func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
        srv := http.Server{
            Addr: ":" + strconv.Itoa(port),
            //所有请求的接收都在httpKVAPI
            Handler: &httpKVAPI{
                store:       kv,
                confChangeC: confChangeC,
            },
        }
        ...
    }
    
    func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        key := r.RequestURI
        switch {
        case r.Method == "PUT":
            ...
            //调用的是 kvstore.Propose
            //具体操作就是写到 Propose Chan,应用层再消费之,最终调用 raftLibrary.Propose
            h.store.Propose(key, string(v))
            ...
        case r.Method == "GET":
            ...
            //直接就是访问kvStore的map
            if v, ok := h.store.Lookup(key); ok {
                w.Write([]byte(v))
            } else {
                http.Error(w, "Failed to GET", http.StatusNotFound)
            }
        case r.Method == "POST":
            ...
            //万万没想到 POST 方法是用来进行节点变更的
            cc := raftpb.ConfChange{
                Type:    raftpb.ConfChangeAddNode,
                NodeID:  nodeId,
                Context: url,
            }
            //写入 config Change Chan,应用层再消费之,最终调用 raftLibrary.ProposeConfChange
            h.confChangeC <- cc
            ...
        case r.Method == "DELETE":
            ...
            //删除节点,同POST
            cc := raftpb.ConfChange{
                Type:   raftpb.ConfChangeRemoveNode,
                NodeID: nodeId,
            }
            h.confChangeC <- cc
            ...
        default:
            ...
        }
    }
    

    相关文章

      网友评论

          本文标题:3.etcd源码笔记 - example

          本文链接:https://www.haomeiwen.com/subject/mjotjctx.html