美文网首页区块链教程
兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip

兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip

作者: ab6973df9221 | 来源:发表于2018-10-31 15:17 被阅读0次

  兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

# Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)

### 5.2、commImpl结构体方法(http://bt.itxdl.cn/)

```go

//conn.serviceConnection(),启动连接服务

func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error

//return &proto.Empty{}

func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)

func (c *commImpl) GetPKIid() common.PKIidType

//向指定节点发送消息

func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)

//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})

func (c *commImpl) Probe(remotePeer *RemotePeer) error

//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})

func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)

func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage

func (c *commImpl) PresumedDead() <-chan common.PKIidType

func (c *commImpl) CloseConn(peer *RemotePeer)

func (c *commImpl) Stop()

//创建并启动gRPC Server,以及注册GossipServer实例

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,

//将GossipServer实例注册至peerServer

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,

func extractRemoteAddress(stream stream) string

func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error)

//创建gRPC Server,grpc.NewServer(serverOpts...)

func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)

//创建与服务端连接

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)

//向指定节点发送消息

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)

//return atomic.LoadInt32(&c.stopping) == int32(1)

func (c *commImpl) isStopping() bool

func (c *commImpl) emptySubscriptions()

func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)

func (c *commImpl) disconnect(pkiID common.PKIidType)

func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)

//代码在gossip/comm/comm_impl.go

```

#### 5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)

创建并启动gRPC Server,以及注册GossipServer实例

```go

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,

    secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {

    var ll net.Listener

    var s *grpc.Server

    var certHash []byte

    if len(dialOpts) == 0 {

        //peer.gossip.dialTimeout,gRPC连接拨号的超时

        dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}

    }

    if port > 0 {

        //创建gRPC Server,grpc.NewServer(serverOpts...)

        s, ll, secureDialOpts, certHash = createGRPCLayer(port)

    }

    commInst := &commImpl{

        selfCertHash:   certHash,

        PKIID:          idMapper.GetPKIidOfCert(peerIdentity),

        idMapper:       idMapper,

        logger:         util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),

        peerIdentity:   peerIdentity,

        opts:           dialOpts,

        secureDialOpts: secureDialOpts,

        port:           port,

        lsnr:           ll,

        gSrv:           s,

        msgPublisher:   NewChannelDemultiplexer(),

        lock:           &sync.RWMutex{},

        deadEndpoints:  make(chan common.PKIidType, 100),

        stopping:       int32(0),

        exitChan:       make(chan struct{}, 1),

        subscriptions:  make([]chan proto.ReceivedMessage, 0),

    }

    commInst.connStore = newConnStore(commInst, commInst.logger)

    if port > 0 {

        commInst.stopWG.Add(1)

        go func() {

            defer commInst.stopWG.Done()

            s.Serve(ll) //启动gRPC Server

        }()

        //commInst注册到gRPC Server

        proto.RegisterGossipServer(s, commInst)

    }

    return commInst, nil

}

//代码在gossip/comm/comm_impl.go

```

#### 5.2.2、func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)

将GossipServer实例注册至peerServer

```go

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,

    peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,

    dialOpts ...grpc.DialOption) (Comm, error) {

    dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))

    //构造commImpl

    commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)

    if cert != nil {

        inst := commInst.(*commImpl)

        inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])

    }

    proto.RegisterGossipServer(s, commInst.(*commImpl))

    return commInst, nil

}

//代码在gossip/comm/comm_impl.go

```

//创建与服务端连接

#### 5.2.3、func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)

```go

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {

    var err error

    var cc *grpc.ClientConn

    var stream proto.Gossip_GossipStreamClient

    var pkiID common.PKIidType

    var connInfo *proto.ConnectionInfo

    var dialOpts []grpc.DialOption

    dialOpts = append(dialOpts, c.secureDialOpts()...)

    dialOpts = append(dialOpts, grpc.WithBlock())

    dialOpts = append(dialOpts, c.opts...)

    cc, err = grpc.Dial(endpoint, dialOpts...)

    cl := proto.NewGossipClient(cc)

    if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {

        cc.Close()

        return nil, err

    }

    ctx, cf := context.WithCancel(context.Background())

    stream, err = cl.GossipStream(ctx)

    connInfo, err = c.authenticateRemotePeer(stream)

    pkiID = connInfo.ID

    conn := newConnection(cl, cc, stream, nil)

    conn.pkiID = pkiID

    conn.info = connInfo

    conn.logger = c.logger

    conn.cancel = cf

    h := func(m *proto.SignedGossipMessage) {

        c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{

            conn:                conn,

            lock:                conn,

            SignedGossipMessage: m,

            connInfo:            connInfo,

        })

    }

    conn.handler = h

    return conn, nil

}

//代码在gossip/comm/comm_impl.go

```

## 6、connectionStore和connection结构体及方法

### 6.1、connection结构体及方法

```go

type connection struct {

    cancel       context.CancelFunc

    info         *proto.ConnectionInfo

    outBuff      chan *msgSending

    logger       *logging.Logger                 // logger

    pkiID        common.PKIidType                // pkiID of the remote endpoint

    handler      handler                         // function to invoke upon a message reception

    conn         *grpc.ClientConn                // gRPC connection to remote endpoint

    cl           proto.GossipClient              // gRPC stub of remote endpoint

    clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint

    serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint

    stopFlag     int32                           // indicates whether this connection is in process of stopping

    stopChan     chan struct{}                   // a method to stop the server-side gRPC call from a different go-routine

    sync.RWMutex                                 // synchronizes access to shared variables

}

//构造connection

func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection

//关闭connection

func (conn *connection) close()

//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)

func (conn *connection) toDie() bool

//conn.outBuff <- m,其中m为msgSending{envelope: msg.Envelope,onErr: onErr,}

func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))

//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同时msg := <-msgChan,conn.handler(msg)

func (conn *connection) serviceConnection() error

//循环不间断从conn.outBuff取数据,然后stream.Send(m.envelope)

func (conn *connection) writeToStream()

//循环不间断envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg

func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)

//获取conn.serverStream

func (conn *connection) getStream() stream

//代码在gossip/comm/conn.go

```

### 6.2、connectionStore结构体及方法

```go

type connectionStore struct {

    logger           *logging.Logger          // logger

    isClosing        bool                     // whether this connection store is shutting down

    connFactory      connFactory              // creates a connection to remote peer

    sync.RWMutex                              // synchronize access to shared variables

    pki2Conn         map[string]*connection   //connection map, key为pkiID,value为connection

    destinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,

    // used to prevent concurrent connection establishment to the same remote endpoint

}

//构造connectionStore

func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore

//从connection map中获取连接,如无则创建并启动连接,并写入connection map中

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)

//连接数量

func (cs *connectionStore) connNum() int

//关闭指定连接

func (cs *connectionStore) closeConn(peer *RemotePeer)

//关闭所有连接

func (cs *connectionStore) shutdown()

func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection

//注册连接

func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection

//关闭指定连接

func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType)

//代码在gossip/comm/conn.go

```

#### 6.2.1、func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)

```go

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {

    cs.RLock()

    isClosing := cs.isClosing

    cs.RUnlock()

    pkiID := peer.PKIID

    endpoint := peer.Endpoint

    cs.Lock()

    destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]

    if !hasConnected {

        destinationLock = &sync.RWMutex{}

        cs.destinationLocks[string(pkiID)] = destinationLock

    }

    cs.Unlock()

    destinationLock.Lock()

    cs.RLock()

    //从connection map中获取

    conn, exists := cs.pki2Conn[string(pkiID)]

    if exists {

        cs.RUnlock()

        destinationLock.Unlock()

        return conn, nil

    }

    cs.RUnlock()

    //创建连接

    createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)

    destinationLock.Unlock()

    conn = createdConnection

    cs.pki2Conn[string(createdConnection.pkiID)] = conn

    go conn.serviceConnection() //启动连接的消息接收处理、以及向对方节点发送消息

    return conn, nil

}

//代码在gossip/comm/conn.go

```

## 7、ChannelDeMultiplexer结构体及方法(多路复用器)

```go

type ChannelDeMultiplexer struct {

    channels []*channel

    lock     *sync.RWMutex

    closed   int32

}

//构造ChannelDeMultiplexer

func NewChannelDemultiplexer() *ChannelDeMultiplexer

//atomic.LoadInt32(&m.closed) == int32(1)

func (m *ChannelDeMultiplexer) isClosed() bool

//关闭

func (m *ChannelDeMultiplexer) Close()

//添加通道

func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{}

//挨个通道发送消息

func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{})

```

欢迎继续关注兄弟连区块链教程分享!

相关文章

网友评论

    本文标题:兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip

    本文链接:https://www.haomeiwen.com/subject/bceatqtx.html