美文网首页
从 dgraph-io/dgraph 了解 etcd/raft

从 dgraph-io/dgraph 了解 etcd/raft

作者: 逆麟囧 | 来源:发表于2018-01-19 20:51 被阅读339次

    本次代码阅读基于commit 189fdd3

    1. raftwal

    godoc

    之前提到, etcd/raft提供了 MemoryStorage + wal 的方式 来对 raft 中的 HardState, Snapshot 和 Entry 进行持久化. wal 将数据直接写入文件.

    而对于 dgraph 来说, 它的一个物理节点上有多个 raft group, 且 raft group 会自动新建. 此时, 所有 raft group 使用同一套底层存储会相对简单一些.

    本包中, dgraph 使用 badger 这个同属 dgraph-io 出品的 kv 数据库来保存所有 raft group 的日志.

    1.1 Keys

    既然不同 raft 的日志都存在同一个 kv 数据库中, 那么就需要对存储的 key 进行有效地区分.

    对于一个 raft node 来说, 它通过节点 id RaftId(uint64) 和 组 id gid(uint32) 两层 来标识自己

    相应地, raftwal 中的三类 key 都包含这两个 id

    1. snapshotKey:

      func (w *Wal) snapshotKey(gid uint32) []byte {
       b := make([]byte, 14)
       binary.BigEndian.PutUint64(b[0:8], w.id)
       copy(b[8:10], []byte("ss"))
       binary.BigEndian.PutUint32(b[10:14], gid)
       return b
      }
      

    2. hardStateKey:

      func (w *Wal) hardStateKey(gid uint32) []byte {
       b := make([]byte, 14)
       binary.BigEndian.PutUint64(b[0:8], w.id)
       copy(b[8:10], []byte("hs"))
       binary.BigEndian.PutUint32(b[10:14], gid)
       return b
      }
      
    3. entryKey:

      func (w *Wal) entryKey(gid uint32, term, idx uint64) []byte {
       b := make([]byte, 28)
       binary.BigEndian.PutUint64(b[0:8], w.id)
       binary.BigEndian.PutUint32(b[8:12], gid)
       binary.BigEndian.PutUint64(b[12:20], term)
       binary.BigEndian.PutUint64(b[20:28], idx)
       return b
      }
      

    1.2 Wal

    Wal 提供 raft 数据的读写.

    对于 raft 数据的持久化, 最重要的是保证数据的一致性.

    StoreSnapshot
    func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
        txn := w.wals.NewTransactionAt(1, true)
        defer txn.Discard()
        
        // ...
        
        if err := txn.Set(w.snapshotKey(gid), data); err != nil {
            return err
        }
        
        // ...
        
        // 清除 snapshot 数据之前的所有 entry
        // Delete all entries before this snapshot to save disk space.
        start := w.entryKey(gid, 0, 0)
        last := w.entryKey(gid, s.Metadata.Term, s.Metadata.Index)
        
        // 这里利用了 badger 的特性, 在遍历的时候仅读取 key 数据, 减少了读取 value 带来的开销
        opt := badger.DefaultIteratorOptions
        opt.PrefetchValues = false
        itr := txn.NewIterator(opt)
        defer itr.Close()
    
        // 逐一删除不再需要的 entry
        for itr.Seek(start); itr.Valid(); itr.Next() {
            // ...
        }
    
        // Failure to delete entries is not a fatal error, so should be
        // ok to ignore
        if err := txn.CommitAt(1, nil); err != nil {
            x.Printf("Error while storing snapshot %v\n", err)
            return err
        }
        return nil
    }
    
    Store
    // Store stores the hardstate and entries for a given RAFT group.
    func (w *Wal) Store(gid uint32, h raftpb.HardState, es []raftpb.Entry) error {
        txn := w.wals.NewTransactionAt(1, true)
    
        var t, i uint64
        // 逐一保存 entry
        for _, e := range es {
            t, i = e.Term, e.Index
            
            // ...
        }
    
        // 如果有必要, 保存 HardState
        if !raft.IsEmptyHardState(h) {
            // ...
        }
    
        // If we get no entries, then the default value of t and i would be zero. That would
        // end up deleting all the previous valid raft entry logs. This check avoids that.
        if t > 0 || i > 0 {
            // When writing an Entry with Index i, any previously-persisted entries
            // with Index >= i must be discarded.
            // Ideally we should be deleting entries from previous term with index >= i,
            // but to avoid complexity we remove them during reading from wal.
            // 有可能出现某个时间点之后, 由于网络原因, 数据分叉的情形.
            // 为了在网络恢复之后保证数据一致性, 对于每一批 entry, 需要清除逻辑上排在这批数据之后的 entry.
            start := w.entryKey(gid, t, i+1)
            prefix := w.prefix(gid)
            // ...
            
            // 逐一清除
            for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
                // ...
            }
        }
        if err := txn.CommitAt(1, nil); err != nil {
            return err
        }
        return nil
    }
    
    读取
    func (w *Wal) Snapshot(gid uint32) (snap raftpb.Snapshot, rerr error) {
        // ...
    }
    
    func (w *Wal) HardState(gid uint32) (hd raftpb.HardState, rerr error) {
        // ...
    }
    
    func (w *Wal) Entries(gid uint32, fromTerm, fromIndex uint64) (es []raftpb.Entry, rerr error) {
        // ...
    }
    

    1.3 关于badger

    badger 来源于这篇论文 WiscKey: Separating Keys from Values in SSD-conscious Storage. .

    知乎上仅有的评论里, 对它的评价不甚高 如何评价 Badger (fast key-value storage) 😂.

    但不论怎样, 它在一些情况下确实比较 , 也可能非常适合 dgraph 的使用场景.

    2. conn

    godoc

    conn 充当了 etcd/raft 的网络传输层, 基于 gRPC 在 raft 节点之间同步信息.

    2.1 Pool

    看名字是个连接池, 实际上其中的 *grpc.ClienctConn 是复用的.

    一旦创建, 会每隔 10 秒尝试 ping 一下, 根据结果判断当前连接是否可用.

    // "Pool" is used to manage the grpc client connection(s) for communicating with other
    // worker instances.  Right now it just holds one of them.
    type Pool struct {
        sync.RWMutex
        
        // 这段注释说明了 *grpc.ClientConn 可以服用的原因
        // A "pool" now consists of one connection.  gRPC uses HTTP2 transport to combine
        // messages in the same TCP stream.
        conn *grpc.ClientConn
    
        // 上一次 ping 请求成功的时间
        lastEcho time.Time
        
        // 目标节点的地址
        Addr     string
        
        // 发起 ping 请求的 ticker
        ticker   *time.Ticker
    }
    

    2.2 Pools

    Pools 维护了不同地址的 Pool.

    这里是一个单例.

    type Pools struct {
        sync.RWMutex
        all map[string]*Pool
    }
    
    var pi *Pools
    
    func init() {
        pi = new(Pools)
        pi.all = make(map[string]*Pool)
    }
    

    2.3 Node

    Node 的用于维护 raft 成员节点, 以及在各节点之间传输信息.职责包括:

    初始化 / 读取 当前的 raft.Node
    // SetRaft would set the provided raft.Node to this node.
    // It would check fail if the node is already set.
    func (n *Node) SetRaft(r raft.Node) {
        // ...
    }
    
    // Raft would return back the raft.Node stored in the node.
    func (n *Node) Raft() raft.Node {
        // ...
    }
    
    维护 ConfState 即节点 id 列表
    // SetConfState would store the latest ConfState generated by ApplyConfChange.
    func (n *Node) SetConfState(cs *raftpb.ConfState) {
        // ...
    }
    
    // ConfState would return the latest ConfState stored in node.
    func (n *Node) ConfState() *raftpb.ConfState {
        // ...
    }
    
    维护 节点 id - 地址 的对应关系
    func (n *Node) Peer(pid uint64) (string, bool) {
        // ...
    }
    
    // addr must not be empty.
    func (n *Node) SetPeer(pid uint64, addr string) {
        // ...
    }
    
    func (n *Node) DeletePeer(pid uint64) {
        // ...
    }
    
    // Connects the node and makes its peerPool refer to the constructed pool and address
    // (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
    // case this does nothing.)
    func (n *Node) Connect(pid uint64, addr string) {
        // ..
    }
    
    加入和移除节点
    • 加入节点

      // 这个函数是可以认为是 AddCluster 的回调
      // 由使用者在收到 ConfChange 成功 apply 时主动调用
      // dgraph 中调用这个方法的地方 err 都传入了 nil
      func (n *Node) DoneConfChange(id uint64, err error) {
        n.Lock()
        defer n.Unlock()
        ch, has := n.confChanges[id]
        if !has {
            return
        }
        delete(n.confChanges, id)
        ch <- err
      }
      
      func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
        addr, ok := n.Peer(pid)
        // ...
        rcBytes, err := rc.Marshal()
        // ...
      
        ch := make(chan error, 1)
        // 这个函数中, 将 channel 和一个随机生成的 id 映射起来
        // 并在向其他节点同步的信息中带上这个 id
        id := n.storeConfChange(ch)
        err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
            ID:      id,
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  pid,
            Context: rcBytes,
        })
        if err != nil {
            return err
        }
        
        // 等待 ConfChange apply 成功的回调
        err = <-ch
        return err
      }
      
      
    • 移除节点

      func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
        // ...
        
        // 和 AddToCluster 类似, 这里需要等待 ConfChange 完成
        ch := make(chan error, 1)
        pid := n.storeConfChange(ch)
        err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
            ID:     pid,
            Type:   raftpb.ConfChangeRemoveNode,
            NodeID: id,
        })
        
        // ...
      
        return err
      }
      

    节点间同步信息
    func (n *Node) Send(m raftpb.Message) {
        // ...
        select {
        case n.messages <- sendmsg{to: m.To, data: data}:
            // pass
            
        // - -0 为什么这边会有 ignore... 仅仅是为了不阻塞调用者么?
        default:
            // ignore
        }
    }
    
    // 所有通过 n.messages 传递的信息都会积累到一定程度后一起发送
    // 发往同一个节点的信息也会整合
    func (n *Node) BatchAndSendMessages() {
        // 对同一个目标 id, 始终复用一个 *bytes.Buffer
        batches := make(map[uint64]*bytes.Buffer)
        for {
            totalSize := 0
            sm := <-n.messages
        slurp_loop:
            for {
                // 如有必要, 初始化 *bytes.Buffer
                var buf *bytes.Buffer
                
                // ...
                
                // 先将当前 data 的长度写入, 用做 message 之间的分隔.
                // 再写入 data 本体
                // 因此每条 message 占用 4 + len(sm.data)
                totalSize += 4 + len(sm.data)
                x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
                x.Check2(buf.Write(sm.data))
    
                // 如果累积的数据量足够大, 中断此次汇集, 执行发送
                if totalSize > messageBatchSoftLimit {
                    // We limit the batch size, but we aren't pushing back on
                    // n.messages, because the loop below spawns a goroutine
                    // to do its dirty work.  This is good because right now
                    // (*node).send fails(!) if the channel is full.
                    break
                }
    
                // 如果没有新的 message 传入, 同样中断汇集执行发送
                select {
                case sm = <-n.messages:
                default:
                    break slurp_loop
                }
            }
    
            // 执行发送
            for to, buf := range batches {
                if buf.Len() == 0 {
                    continue
                }
                data := make([]byte, buf.Len())
                copy(data, buf.Bytes())
                go n.doSendMessage(to, data)
                
                // 重置 buf 供下一轮 message 汇集循环使用
                buf.Reset()
            }
        }
    }
    
    func (n *Node) doSendMessage(to uint64, data []byte) {
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()
    
        // 获取到指定节点的连接
        addr, has := n.Peer(to)
        pool, err := Get().Get(addr)
        if !has || err != nil {
            x.Printf("No healthy connection found to node Id: %d, err: %v\n", to, err)
            // No such peer exists or we got handed a bogus config (bad addr), so we
            // can't send messages to this peer.
            return
        }
        client := pool.Get()
    
        // ...
    
        ch := make(chan error, 1)
        go func() {
            _, err = c.RaftMessage(ctx, p)
            if err != nil {
                x.Printf("Error while sending message to node Id: %d, err: %v\n", to, err)
            }
            ch <- err
        }()
    
        // 超时或发送完成
        select {
        case <-ctx.Done():
            return
        case <-ch:
            // We don't need to do anything if we receive any error while sending message.
            // RAFT would automatically retry.
            return
        }
    }
    
    保存/恢复 raft 数据
    func (n *Node) SaveSnapshot(s raftpb.Snapshot) {
        // ...
    }
    
    func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry) {
        // ...
    }
    
    func (n *Node) InitFromWal(wal *raftwal.Wal) (idx uint64, restart bool, rerr error) {
        // ...
    }
    
    
    WaitForMinProposal

    这里应该是 LinearRead 相关, 用来确认 Read 对应的 message 已经 apply

    func (n *Node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
        if read == nil || read.Ids == nil {
            return nil
        }
        gid := n.RaftContext.Group
        min := read.Ids[gid]
        return n.Applied.WaitForMark(ctx, min)
    }
    

    2.4 RaftServer

    RaftServer 是 gRPC service Raft 的实现, 内部是对 Node 的操作.

    相关文章

      网友评论

          本文标题:从 dgraph-io/dgraph 了解 etcd/raft

          本文链接:https://www.haomeiwen.com/subject/nbpaoxtx.html