fabric gossip 源码解析

作者: 糖果果老师 | 来源:发表于2018-06-11 21:22 被阅读31次

    fabric 中的 gossip 接口,最底层通信接口,实际只有两个操作,所有的 Gossip相关操作都是在这两个接口上堆砌出来的,这两个接口定义在
    fabric/protos/gossip/message.proto

    // Gossip
    service Gossip {
    
        // GossipStream is the gRPC stream used for sending and receiving messages
        rpc GossipStream (stream Envelope) returns (stream Envelope) {}
    
        // Ping is used to probe a remote peer's aliveness
        rpc Ping (Empty) returns (Empty) {}
    }
    
    
    1. GossipStream 用来通信
    2. Ping用来判断节点状态

    可以看出来 fabric 通信方式使用了 grpc 的 stream,没有采用udp方式,在特定情况下可能会对性能产生一定的影响

    fabric/gossip/comm/comm_impl.go 实现了最底层接口,我们来分析一下主要实现,只关注核心收发消息,不关注其他细节

    消息的接受

    commImpl作为Gossip模块的基本收发实现,我们只分析如何收发的,其实已经给出了 proto 文件我们应该很容易猜到,接受消息只需要实现 proto 两个接口的 server 端, 发送消息 只需要实现 proto 的client 就可以了。分别看一下收发细节

    收消息 comm_impl.go GossipStream 函数,在这个函数中,前面都是做了一下准备工作,收发消息关键代码有两处

        // fabric/gossip/comm/comm_impl.go GossipStream
    
        h := func(m *proto.SignedGossipMessage) {
            c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
                conn:                conn,
                lock:                conn,
                SignedGossipMessage: m,
                connInfo:            connInfo,
            })
        }
        conn.handler = h
        return conn.serviceConnection()
    
    

    对消息进行分发的 handler ,会在接下来的serviceConnection函数里面调用

    // fabric/gossip/comm/conn.go
    
    func (conn *connection) serviceConnection() error {
        errChan := make(chan error, 1)
        msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
        defer close(msgChan)
    
        // Call stream.Recv() asynchronously in readFromStream(),
        // and wait for either the Recv() call to end,
        // or a signal to close the connection, which exits
        // the method and makes the Recv() call to fail in the
        // readFromStream() method
        go conn.readFromStream(errChan, msgChan)
    
        go conn.writeToStream()
    
        for !conn.toDie() {
            select {
            case stop := <-conn.stopChan:
                conn.logger.Debug("Closing reading from stream")
                conn.stopChan <- stop
                return nil
            case err := <-errChan:
                return err
            case msg := <-msgChan:
                conn.handler(msg)
            }
        }
        return nil
    }
    
    

    开启了两个goroutine, 分别进行收发, go conn.readFromStream(errChan, msgChan),这个函数相当于在 stream 上不停的接收消息,收到了 就给 msgChan, 在下面这个for循环中会调用 conn.handler(msg) 来处理收到的消息, go conn.writeToStream() 是写消息,后续再讲

    conn.handler(msg) 实际就是之前说过的 c.msgPublisher.DeMultiplex 函数,那么我们接下来分析一下 这个函数对消息做了什么

    // fabric/gossip/comm/demux.go
    
    // DeMultiplex broadcasts the message to all channels that were returned
    // by AddChannel calls and that hold the respected predicates.
    func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
        defer func() {
            recover()
        }() // recover from sending on a closed channel
    
        if m.isClosed() {
            return
        }
    
        m.lock.RLock()
        channels := m.channels
        m.lock.RUnlock()
    
        for _, ch := range channels {
            if ch.pred(msg) {
                ch.ch <- msg
            }
        }
    }
    
    

    根据代码可以看出, 其实就是 将所有的 channel 拿出来,每个 调用一下 ch.pred(msg) 如果返回值为真,就将消息 发送给channel,其实到这里已经比较清楚了,相当于 哪个channel 想接受消息,就调用 AddChannel ,然后就可以接收到消息了,我们先看一下 AddChannel 函数

    // fabric/gossip/comm/demux.go
    
    // AddChannel registers a channel with a certain predicate
    func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
        m.lock.Lock()
        defer m.lock.Unlock()
        ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
        m.channels = append(m.channels, ch)
        return ch.ch
    }
    
    

    根据代码可以很清晰的看出来,这个函数值需要一个 common.MessageAcceptor 函数类型的参数,返回 接收到的消息 chan (上面已经分析过了返回的这个 chan 就是接收到的消息的 chan), 对于 common.MessageAcceptor 也很容易看出来就是一个消息过滤器,可以自定义规则想接收哪些消息, 全文搜索一下 AddChannel 一下,可以很容易发现 就是实现 gossip service 的 Accept

    到这里已经很清晰了,接收消息总过就进行如下几个过程,

    1. 实现message.proto 的 GossipStream 接口,启动一个goroutine 不停的在 grpc stream 上面接收消息(go conn.readFromStream(errChan, msgChan))
    2. 接收到消息以后,使用 DeMultiplex 函数 像 注册过的Channel中分发(也就是调用了 AddChannel) 的分发
    3. 在 AddChannel的时候给消息添加了一个过滤器

    消息的发送

    发送代码很明显在下面这个函数

    // fabric/gossip/comm/comm_impl.go
    
    func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
        if c.isStopping() || len(peers) == 0 {
            return
        }
    
        c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
    
        for _, peer := range peers {
            go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
                c.sendToEndpoint(peer, msg)
            }(peer, msg)
        }
    }
    
    

    这个函数很简单就是 msg, peers 两个参数,将 msg 发给 所有的 peer,有一点需要注意下 在发送使用了 go ,这样可以提高发送效率,相当于同时给 所有 peer 发送。接下来 看看 sendToEndpoint 函数

    // fabric/gossip/comm/comm_impl.go
    
    func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
        if c.isStopping() {
            return
        }
        c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
        defer c.logger.Debug("Exiting")
        var err error
    
        conn, err := c.connStore.getConnection(peer)
        if err == nil {
            disConnectOnErr := func(err error) {
                c.logger.Warning(peer, "isn't responsive:", err)
                c.disconnect(peer.PKIID)
            }
            conn.send(msg, disConnectOnErr)
            return
        }
        c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
        c.disconnect(peer.PKIID)
    }
    
    

    这个函数我们只分析两句代码 getConnection, conn.send(), 一个是获取conn, 一个是发送消息

    getConnection

    // fabric/gossip/comm/conn.go
    
    func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
        cs.RLock()
        isClosing := cs.isClosing
        cs.RUnlock()
    
        if isClosing {
            return nil, errors.New("Shutting down")
        }
    
        pkiID := peer.PKIID
        endpoint := peer.Endpoint
    
        cs.Lock()
        destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
        if !hasConnected {
            destinationLock = &sync.RWMutex{}
            cs.destinationLocks[string(pkiID)] = destinationLock
        }
        cs.Unlock()
    
        destinationLock.Lock()
    
        cs.RLock()
        conn, exists := cs.pki2Conn[string(pkiID)]
        if exists {
            cs.RUnlock()
            destinationLock.Unlock()
            return conn, nil
        }
        cs.RUnlock()
    
        createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
    
        destinationLock.Unlock()
    
        cs.RLock()
        isClosing = cs.isClosing
        cs.RUnlock()
        if isClosing {
            return nil, errors.New("ConnStore is closing")
        }
    
        cs.Lock()
        delete(cs.destinationLocks, string(pkiID))
        defer cs.Unlock()
    
        // check again, maybe someone connected to us during the connection creation?
        conn, exists = cs.pki2Conn[string(pkiID)]
    
        if exists {
            if createdConnection != nil {
                createdConnection.close()
            }
            return conn, nil
        }
    
        // no one connected to us AND we failed connecting!
        if err != nil {
            return nil, err
        }
    
        // at this point in the code, we created a connection to a remote peer
        conn = createdConnection
        cs.pki2Conn[string(createdConnection.pkiID)] = conn
    
        go conn.serviceConnection()
    
        return conn, nil
    }
    
    

    这个函数代码很多,对于主逻辑只有两句,一个是 createConnection,一个是 conn.serviceConnection(前文已经分析过,就是打开两个goroutine,分别监听接收,发送) ,其余的都是优化,逻辑完成性的代码,无需关心, 我们看看 createConnection

    // fabric/gossip/comm/comm_impl.go
    
    func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
        var err error
        var cc *grpc.ClientConn
        var stream proto.Gossip_GossipStreamClient
        var pkiID common.PKIidType
        var connInfo *proto.ConnectionInfo
    
        c.logger.Debug("Entering", endpoint, expectedPKIID)
        defer c.logger.Debug("Exiting")
    
        if c.isStopping() {
            return nil, errors.New("Stopping")
        }
        cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
        if err != nil {
            return nil, err
        }
    
        cl := proto.NewGossipClient(cc)
    
        if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
            cc.Close()
            return nil, err
        }
    
        if stream, err = cl.GossipStream(context.Background()); err == nil {
            connInfo, err = c.authenticateRemotePeer(stream)
            if err == nil {
                pkiID = connInfo.ID
                if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
                    // PKIID is nil when we don't know the remote PKI id's
                    c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
                    return nil, errors.New("Authentication failure")
                }
                conn := newConnection(cl, cc, stream, nil)
                conn.pkiID = pkiID
                conn.info = connInfo
                conn.logger = c.logger
    
                h := func(m *proto.SignedGossipMessage) {
                    c.logger.Debug("Got message:", m)
                    c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
                        conn:                conn,
                        lock:                conn,
                        SignedGossipMessage: m,
                        connInfo:            connInfo,
                    })
                }
                conn.handler = h
                return conn, nil
            }
        }
        cc.Close()
        return nil, err
    }
    
    

    代码很长,不过我们只需要简单看一下,可以很明显的看出来就是我们文章开头猜测的,发送消息就是 实现 message.proto 里面的client,其余的都是对这个client的封装,对于理解代码而言不需要太关注

    接下来我们只剩下一个 conn.send()

    // fabric/gossip/comm/conn.go
    
    func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
        conn.Lock()
        defer conn.Unlock()
    
        if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
            go onErr(errSendOverflow)
            return
        }
    
        m := &msgSending{
            envelope: msg.Envelope,
            onErr:    onErr,
        }
    
        conn.outBuff <- m
    }
    
    

    代码异常简单就是 将msg 传送给 conn.outBuff, 其实我们看到这里已经可以确定肯定有另外一个地方在 等着 conn.outBuff 发送过来的消息,然后写进去,聪明的读者已经想到了,没错,就是前面 serviceConnection 中的 go writeToStream()

    func (conn *connection) writeToStream() {
        for !conn.toDie() {
            stream := conn.getStream()
            if stream == nil {
                conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
                return
            }
            select {
            case m := <-conn.outBuff:
                err := stream.Send(m.envelope)
                if err != nil {
                    go m.onErr(err)
                    return
                }
                break
            case stop := <-conn.stopChan:
                conn.logger.Debug("Closing writing to stream")
                conn.stopChan <- stop
                return
            }
        }
    }
    
    

    代码清晰明了, 利用 grpc stream 进行写消息,至此 我们已经把 fabric 中 gossip 模块的最底层收发消息分析清楚了,gossip 模块所有的功能都是在这个代码基础上进行的

    总结

    读代码我一直没有找到特别好的办法,我现在的方法就是 大致浏览一下代码,知道代码结构,然后从底层一层一层的读,读代码的时候一定不能纠结细节,先读懂主线,对于其他的关心的在一点点看。

    相关文章

      网友评论

        本文标题:fabric gossip 源码解析

        本文链接:https://www.haomeiwen.com/subject/xyhkeftx.html