美文网首页
etcd-raft源码分析2-server间通信机制

etcd-raft源码分析2-server间通信机制

作者: 沧行 | 来源:发表于2017-05-10 09:53 被阅读0次

    在etcd的raft实现中,server之前的消息传递并不是简单的request-response模型,而是读写分离模型,即每两个server之间会建立两条链路,对于每一个server来说,一条链路专门用来发送数据,另一条链路专门用来接收数据。在代码实现中,通过streamWriter发送数据,通过streamReader接收数据。即通过streamReader接收数据接收到数据后会直接响应,在处理完数据后通过streamWriter将响应发送到对端。

    对于每个server来说,不管是leader、candicate还是follower,都会维持一个peers数组,每个peer对应集群中的一个server,负责处理server之间的一些数据交互。

    server间数据交互的框图如下:

    server间数据交互.png

    当server需要向其他server发送数据时,只需要找到其他server对应的peer,然后向peer的streamWriter的msgc通道发送数据即可,streamWriter会监听msgc通道的数据并发送到对端server;而streamReader会在一个goroutine中循环读取对端发送来的数据,一旦接收到数据,就发送到peer的p.propc或p.recvc通道,而peer会监听这两个通道的事件,写入到node的n.propc或n.recvc通道,node只需要监听这两个通道的数据并处理即可。这就是在etcd的raft实现中server间数据交互的流程。

    对于每个server,都会创建一个raftNode,并且启动一个goroutine,执行raftNode的serveRaft方法,这个方法的代码如下:

    func (rc *raftNode) serveRaft() {
    url, err := url.Parse(rc.peers[rc.id-1])
    if err != nil {
        log.Fatalf("raftexample: Failed parsing URL (%v)", err)
    }
    
    ln, err := newStoppableListener(url.Host, rc.httpstopc)
    if err != nil {
        log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
    }
    
    err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
    select {
    case <-rc.httpstopc:
    default:
        log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
    }
    close(rc.httpdonec)
    }
    

    这个方法主要是建立一个httpserver,监听其他server的连接,处理函数为rc.transport.Handler(),下面看下该处代码:

    func (t *Transport) Handler() http.Handler {
    pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
    streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
    snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
    mux := http.NewServeMux()
    mux.Handle(RaftPrefix, pipelineHandler)
    mux.Handle(RaftStreamPrefix+"/", streamHandler)
    mux.Handle(RaftSnapshotPrefix, snapHandler)
    mux.Handle(ProbingPrefix, probing.NewHandler())
    return mux
    }
    

    下面重点看下streamHandler,这个handler用于处理server之间的心跳、投票、附加日志等请求的发送,该handler的ServeHTTP代码为:

    func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != "GET" {
        w.Header().Set("Allow", "GET")
        http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
        return
    }
    
    w.Header().Set("X-Server-Version", version.Version)
    w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
    
    if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
        http.Error(w, err.Error(), http.StatusPreconditionFailed)
        return
    }
    
    var t streamType
    switch path.Dir(r.URL.Path) {
    case streamTypeMsgAppV2.endpoint():
        t = streamTypeMsgAppV2
    case streamTypeMessage.endpoint():
        t = streamTypeMessage
    default:
        plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
        http.Error(w, "invalid path", http.StatusNotFound)
        return
    }
    
    fromStr := path.Base(r.URL.Path)
    from, err := types.IDFromString(fromStr)
    if err != nil {
        plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
        http.Error(w, "invalid from", http.StatusNotFound)
        return
    }
    if h.r.IsIDRemoved(uint64(from)) {
        plog.Warningf("rejected the stream from peer %s since it was removed", from)
        http.Error(w, "removed member", http.StatusGone)
        return
    }
    p := h.peerGetter.Get(from)
    if p == nil {
        // This may happen in following cases:
        // 1. user starts a remote peer that belongs to a different cluster
        // with the same cluster ID.
        // 2. local etcd falls behind of the cluster, and cannot recognize
        // the members that joined after its current progress.
        if urls := r.Header.Get("X-PeerURLs"); urls != "" {
            h.tr.AddRemote(from, strings.Split(urls, ","))
        }
        plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
        http.Error(w, "error sender not found", http.StatusNotFound)
        return
    }
    
    wto := h.id.String()
    if gto := r.Header.Get("X-Raft-To"); gto != wto {
        plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
        http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
        return
    }
    
    w.WriteHeader(http.StatusOK)
    w.(http.Flusher).Flush()
    
    c := newCloseNotifier()
    conn := &outgoingConn{
        t:       t,
        Writer:  w,
        Flusher: w.(http.Flusher),
        Closer:  c,
    }
    //一旦接收到对端的连接,则把该连接attach到自己encoder的writer中,这样自己encoder和对端decoder就能协同工作了,
    // 对于每个节点,会主动去连接其他节点,连接成功后便通过自己的decoder循环读取该连接的数据,该节点通过该decoder读取其他节点发来的数据;
    // 当某节点收到其他节点连接请求并连接成功后便把该连接attach到该节点的encoder,该节点通过该encoder向其他节点发送数据;
    p.attachOutgoingConn(conn)
    <-c.closeNotify()
    }
    

    当监听到其他server的连接建立请求并建立连接成功后,其核心处理逻辑是这一行代码:

    p.attachOutgoingConn(conn)
    

    下面看下其函数实现:

    func (p *peer) attachOutgoingConn(conn *outgoingConn) {
    var ok bool
    switch conn.t {
    case streamTypeMsgAppV2:
        ok = p.msgAppV2Writer.attach(conn)
    case streamTypeMessage:
        ok = p.writer.attach(conn)
    default:
        plog.Panicf("unhandled stream type %s", conn.t)
    }
    if !ok {
        conn.Close()
    }
    }
    

    其中调用了streamWriter的attach方法,如下:

    func (cw *streamWriter) attach(conn *outgoingConn) bool {
    select {
    case cw.connc <- conn:
        return true
    case <-cw.done:
        return false
    }
    }
    

    最终将该连接写入到cw.connc通道,下面看下streamWriter监听该通道的goroutine:

    case conn := <-cw.connc:
            cw.mu.Lock()
            closed := cw.closeUnlocked()
            t = conn.t
            switch conn.t {
            case streamTypeMsgAppV2:
                enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
            case streamTypeMessage:
                enc = &messageEncoder{w: conn.Writer}
            default:
                plog.Panicf("unhandled stream type %s", conn.t)
            }
            flusher = conn.Flusher
            unflushed = 0
            cw.status.activate()
            cw.closer = conn.Closer
            cw.working = true
            cw.mu.Unlock()
    
            if closed {
                plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
            }
            plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
            heartbeatc, msgc = tickc.C, cw.msgc
    

    当监听到cw.connc通道有数据时,获取该数据,即与其他某个server的连接,然后获取conn.Writer封装成一个encoder,用来将要发送的数据发送出去。

    上面说了server的连接监听,下面看下server与其他server的连接建立。
    在startRaft这个goroutine中,有如下代码段:

    rc.transport = &rafthttp.Transport{
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: ss,
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }
    
    rc.transport.Start()
    for i := range rc.peers {
        if i+1 != rc.id {
            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }
    

    在rc.transport.AddPeer方法中调用了startPeer方法,里面创建了streamReader,并开启了一个goroutine:

    func (cr *streamReader) run() {
    t := cr.typ
    plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
    for {
        //与对端建立连接
        rc, err := cr.dial(t)
        if err != nil {
            if err != errUnsupportedStreamType {
                cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
            }
        } else {
            cr.status.activate()
            plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
            //循环读取对端发过来的数据并处理
            err := cr.decodeLoop(rc, t)
            plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
            switch {
            // all data is read out
            case err == io.EOF:
            // connection is closed by the remote
            case transport.IsClosedConnError(err):
            default:
                cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
            }
        }
        select {
        // Wait 100ms to create a new stream, so it doesn't bring too much
        // overhead when retry.
        case <-time.After(100 * time.Millisecond):
        case <-cr.stopc:
            plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
            close(cr.done)
            return
        }
    }
    }
    

    通过rc, err := cr.dial(t)与对端建立连接,在err := cr.decodeLoop(rc, t)中循环读取该连接的数据:

    func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
    var dec decoder
    cr.mu.Lock()
    switch t {
    case streamTypeMsgAppV2:
        dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
    case streamTypeMessage:
        dec = &messageDecoder{r: rc}
    default:
        plog.Panicf("unhandled stream type %s", t)
    }
    select {
    case <-cr.stopc:
        cr.mu.Unlock()
        if err := rc.Close(); err != nil {
            return err
        }
        return io.EOF
    default:
        cr.closer = rc
    }
    cr.mu.Unlock()
    for {
        m, err := dec.decode()
        if err != nil {
            cr.mu.Lock()
            cr.close()
            cr.mu.Unlock()
            return err
        }
        receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
    
        cr.mu.Lock()
        paused := cr.paused
        cr.mu.Unlock()
    
        if paused {
            continue
        }
    
        if isLinkHeartbeatMessage(&m) {
            // raft is not interested in link layer
            // heartbeat message, so we should ignore
            // it.
            continue
        }
    
        recvc := cr.recvc
        if m.Type == raftpb.MsgProp {
            recvc = cr.propc
        }
        select {
        case recvc <- m:
        default:
            if cr.status.isActive() {
                plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
            }
            plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
            recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
        }
    }
    }
    

    这里创建了decoder,并在一个for循环中循环执行m, err := dec.decode(),读取对端发送过来的数据,写入cr.recvc或cr.propc通道。

    相关文章

      网友评论

          本文标题:etcd-raft源码分析2-server间通信机制

          本文链接:https://www.haomeiwen.com/subject/hqtstxtx.html