美文网首页区块链研习社区块链研究
btcd 源码分析系列:3 - connmanager

btcd 源码分析系列:3 - connmanager

作者: tpkeeper | 来源:发表于2019-09-28 08:44 被阅读0次

    参考:btcd

    • connmanamger 负责节点的连接处理,包括监听来自其他节点的连接请求和主动向其他节点发起连接请求,并将获取到的连接对象conn交给回调函数处理,实际上这些回调函数是由server实现的,它们会根据conn生成对应的peer,交由peer处理之后的逻辑
    • 默认outbound peer数量为8
    • 节点发现
      • address database (第一次启动时为空)
      • -addnode -connect (手动)
      • dns seed (通过dns获取addr)
      • hard-coded seeds(代码中的seed)
      • getaddr from other peers (通过其他节点同步)

    一、创建connmanager对象

    • 主要是根据传入的config结构体参数返回一个指针,config结构体主要 包含一些回调方法和[]listener,这样便于灵活实现和调用
    // New returns a new connection manager.
    // Use Start to start connecting to the network.
    func New(cfg *Config) (*ConnManager, error) {
        if cfg.Dial == nil {
            return nil, ErrDialNil
        }
        // Default to sane values
        if cfg.RetryDuration <= 0 {
            cfg.RetryDuration = defaultRetryDuration
        }
        if cfg.TargetOutbound == 0 {
            cfg.TargetOutbound = defaultTargetOutbound
        }
        cm := ConnManager{
            cfg:      *cfg, // Copy so caller can't mutate
            requests: make(chan interface{}),
            quit:     make(chan struct{}),
        }
        return &cm, nil
    }
    
    
    // Config holds the configuration options related to the connection manager.
    type Config struct {
        // Listeners defines a slice of listeners for which the connection
        // manager will take ownership of and accept connections.  When a
        // connection is accepted, the OnAccept handler will be invoked with the
        // connection.  Since the connection manager takes ownership of these
        // listeners, they will be closed when the connection manager is
        // stopped.
        //
        // This field will not have any effect if the OnAccept field is not
        // also specified.  It may be nil if the caller does not wish to listen
        // for incoming connections.
        Listeners []net.Listener
    
        // OnAccept is a callback that is fired when an inbound connection is
        // accepted.  It is the caller's responsibility to close the connection.
        // Failure to close the connection will result in the connection manager
        // believing the connection is still active and thus have undesirable
        // side effects such as still counting toward maximum connection limits.
        //
        // This field will not have any effect if the Listeners field is not
        // also specified since there couldn't possibly be any accepted
        // connections in that case.
        OnAccept func(net.Conn)
    
        // TargetOutbound is the number of outbound network connections to
        // maintain. Defaults to 8.
        TargetOutbound uint32
    
        // RetryDuration is the duration to wait before retrying connection
        // requests. Defaults to 5s.
        RetryDuration time.Duration
    
        // OnConnection is a callback that is fired when a new outbound
        // connection is established.
        OnConnection func(*ConnReq, net.Conn)
    
        // OnDisconnection is a callback that is fired when an outbound
        // connection is disconnected.
        OnDisconnection func(*ConnReq)
    
        // GetNewAddress is a way to get an address to make a network connection
        // to.  If nil, no new connections will be made automatically.
        GetNewAddress func() (net.Addr, error)
    
        // Dial connects to the address on the named network. It cannot be nil.
        Dial func(net.Addr) (net.Conn, error)
    }
    
    cmgr, err := connmgr.New(&connmgr.Config{
            Listeners:      listeners,
            OnAccept:       s.inboundPeerConnected,
            RetryDuration:  connectionRetryInterval,
            TargetOutbound: uint32(targetOutbound),
            Dial:           hcdDial,
            OnConnection:   s.outboundPeerConnected,
            GetNewAddress:  newAddressFunc,
        })
    

    二、start

    • 启动connhandler 处理connrequest请求结果,即处理主动向其他节点发起请求的结果
    • 启动listenhandler 监听来自其他节点的连接
    • connrequest 向其他节点发起连接
    // Start launches the connection manager and begins connecting to the network.
    func (cm *ConnManager) Start() {
        // Already started?
        if atomic.AddInt32(&cm.start, 1) != 1 {
            return
        }
    
        log.Trace("Connection manager started")
        cm.wg.Add(1)
        go cm.connHandler()
    
        // Start all the listeners so long as the caller requested them and
        // provided a callback to be invoked when connections are accepted.
        if cm.cfg.OnAccept != nil {
            for _, listner := range cm.cfg.Listeners {
                cm.wg.Add(1)
                go cm.listenHandler(listner)
            }
        }
    
        for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
            go cm.NewConnReq()
        }
    }
    

    三、对监听到的连接的处理

    • 当有新的连接请求,会将此连接对象直接交给回调函数onAccept()处理,该函数的具体实现是在server中s.inboundPeerConnected()方法
    • s.inboundPeerConnected() 会创建一个inbound peer,并进入peer的处理逻辑
    // listenHandler accepts incoming connections on a given listener.  It must be
    // run as a goroutine.
    func (cm *ConnManager) listenHandler(listener net.Listener) {
        log.Infof("Server listening on %s", listener.Addr())
        for atomic.LoadInt32(&cm.stop) == 0 {
            conn, err := listener.Accept()
            if err != nil {
                // Only log the error if not forcibly shutting down.
                if atomic.LoadInt32(&cm.stop) == 0 {
                    log.Errorf("Can't accept connection: %v", err)
                }
                continue
            }
            go cm.cfg.OnAccept(conn)
        }
    
        cm.wg.Done()
        log.Tracef("Listener handler done for %s", listener.Addr())
    }
    

    onaccept的具体实现

    // inboundPeerConnected is invoked by the connection manager when a new inbound
    // connection is established.  It initializes a new inbound server peer
    // instance, associates it with the connection, and starts a goroutine to wait
    // for disconnection.
    func (s *server) inboundPeerConnected(conn net.Conn) {
        sp := newServerPeer(s, false)
        sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
        sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
        sp.AssociateConnection(conn)
        go s.peerDoneHandler(sp)
    }
    

    四、对主动发起的连接的处理

    • 生成一个新的请求id
    • 然后将请求的结果统一发给connhandler处理
    • connhandler 根据不同的结果分别做不同的处理
      • 如果成功则调用OnConnection回调函数,该函数由server的s.outboundPeerConnected()实现
      • 如果请求失败则调用自身的handleFailedConn()函数,尝试再次请求等逻辑
    • s.outboundPeerConnected 同样是先创建一个outbound peer,然后进入peer的处理逻辑
    // Connect assigns an id and dials a connection to the address of the
    // connection request.
    func (cm *ConnManager) Connect(c *ConnReq) {
        if atomic.LoadInt32(&cm.stop) != 0 {
            return
        }
        if atomic.LoadUint64(&c.id) == 0 {
            atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
        }
        log.Debugf("Attempting to connect to %v", c)
        conn, err := cm.cfg.Dial(c.Addr)
        if err != nil {
            cm.requests <- handleFailed{c, err}
        } else {
            cm.requests <- handleConnected{c, conn}
        }
    }
    
    // connHandler handles all connection related requests.  It must be run as a
    // goroutine.
    //
    // The connection handler makes sure that we maintain a pool of active outbound
    // connections so that we remain connected to the network.  Connection requests
    // are processed and mapped by their assigned ids.
    func (cm *ConnManager) connHandler() {
        conns := make(map[uint64]*ConnReq, cm.cfg.TargetOutbound)
    out:
        for {
            select {
            case req := <-cm.requests:
                switch msg := req.(type) {
    
                case handleConnected:
                    connReq := msg.c
                    connReq.updateState(ConnEstablished)
                    connReq.conn = msg.conn
                    conns[connReq.id] = connReq
                    log.Debugf("Connected to %v", connReq)
                    connReq.retryCount = 0
                    cm.failedAttempts = 0
    
                    if cm.cfg.OnConnection != nil {
                        go cm.cfg.OnConnection(connReq, msg.conn)
                    }
    
                case handleDisconnected:
                    if connReq, ok := conns[msg.id]; ok {
                        connReq.updateState(ConnDisconnected)
                        if connReq.conn != nil {
                            connReq.conn.Close()
                        }
                        log.Debugf("Disconnected from %v", connReq)
                        delete(conns, msg.id)
    
                        if cm.cfg.OnDisconnection != nil {
                            go cm.cfg.OnDisconnection(connReq)
                        }
    
                        if uint32(len(conns)) < cm.cfg.TargetOutbound && msg.retry {
                            cm.handleFailedConn(connReq)
                        }
                    } else {
                        log.Errorf("Unknown connection: %d", msg.id)
                    }
    
                case handleFailed:
                    connReq := msg.c
                    connReq.updateState(ConnFailed)
                    log.Debugf("Failed to connect to %v: %v", connReq, msg.err)
                    cm.handleFailedConn(connReq)
                }
    
            case <-cm.quit:
                break out
            }
        }
    
        cm.wg.Done()
        log.Trace("Connection handler done")
    }
    
    

    OnConnection的具体实现

    // outboundPeerConnected is invoked by the connection manager when a new
    // outbound connection is established.  It initializes a new outbound server
    // peer instance, associates it with the relevant state such as the connection
    // request instance and the connection itself, and finally notifies the address
    // manager of the attempt.
    func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
        sp := newServerPeer(s, c.Permanent)
        p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
        if err != nil {
            srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
            s.connManager.Disconnect(c.ID())
        }
        sp.Peer = p
        sp.connReq = c
        sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
        sp.AssociateConnection(conn)
        go s.peerDoneHandler(sp)
        s.addrManager.Attempt(sp.NA())
    }
    

    a -> b a向b节点发起连接

    b:
    某场景下触发了ban()和disconnect(),b会在banduration(默认24小时)之内,不允许a连接(onVersion里 最后调用addpeer()时判断,然后断掉连接)

    a:在被b ban了之后,peer.start()会失败(在协商version时被b断开),接着调用disconnect()

    server作为枢纽,处理connmanager和peer之间的协作

    相关文章

      网友评论

        本文标题:btcd 源码分析系列:3 - connmanager

        本文链接:https://www.haomeiwen.com/subject/jquhyctx.html