参考:btcd
- connmanamger 负责节点的连接处理,包括监听来自其他节点的连接请求和主动向其他节点发起连接请求,并将获取到的连接对象conn交给回调函数处理,实际上这些回调函数是由server实现的,它们会根据conn生成对应的peer,交由peer处理之后的逻辑
- 默认outbound peer数量为8
- 节点发现
- address database (第一次启动时为空)
- -addnode -connect (手动)
- dns seed (通过dns获取addr)
- hard-coded seeds(代码中的seed)
- getaddr from other peers (通过其他节点同步)
一、创建connmanager对象
- 主要是根据传入的config结构体参数返回一个指针,config结构体主要 包含一些回调方法和[]listener,这样便于灵活实现和调用
// New returns a new connection manager.
// Use Start to start connecting to the network.
func New(cfg *Config) (*ConnManager, error) {
if cfg.Dial == nil {
return nil, ErrDialNil
}
// Default to sane values
if cfg.RetryDuration <= 0 {
cfg.RetryDuration = defaultRetryDuration
}
if cfg.TargetOutbound == 0 {
cfg.TargetOutbound = defaultTargetOutbound
}
cm := ConnManager{
cfg: *cfg, // Copy so caller can't mutate
requests: make(chan interface{}),
quit: make(chan struct{}),
}
return &cm, nil
}
// Config holds the configuration options related to the connection manager.
type Config struct {
// Listeners defines a slice of listeners for which the connection
// manager will take ownership of and accept connections. When a
// connection is accepted, the OnAccept handler will be invoked with the
// connection. Since the connection manager takes ownership of these
// listeners, they will be closed when the connection manager is
// stopped.
//
// This field will not have any effect if the OnAccept field is not
// also specified. It may be nil if the caller does not wish to listen
// for incoming connections.
Listeners []net.Listener
// OnAccept is a callback that is fired when an inbound connection is
// accepted. It is the caller's responsibility to close the connection.
// Failure to close the connection will result in the connection manager
// believing the connection is still active and thus have undesirable
// side effects such as still counting toward maximum connection limits.
//
// This field will not have any effect if the Listeners field is not
// also specified since there couldn't possibly be any accepted
// connections in that case.
OnAccept func(net.Conn)
// TargetOutbound is the number of outbound network connections to
// maintain. Defaults to 8.
TargetOutbound uint32
// RetryDuration is the duration to wait before retrying connection
// requests. Defaults to 5s.
RetryDuration time.Duration
// OnConnection is a callback that is fired when a new outbound
// connection is established.
OnConnection func(*ConnReq, net.Conn)
// OnDisconnection is a callback that is fired when an outbound
// connection is disconnected.
OnDisconnection func(*ConnReq)
// GetNewAddress is a way to get an address to make a network connection
// to. If nil, no new connections will be made automatically.
GetNewAddress func() (net.Addr, error)
// Dial connects to the address on the named network. It cannot be nil.
Dial func(net.Addr) (net.Conn, error)
}
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(targetOutbound),
Dial: hcdDial,
OnConnection: s.outboundPeerConnected,
GetNewAddress: newAddressFunc,
})
二、start
- 启动connhandler 处理connrequest请求结果,即处理主动向其他节点发起请求的结果
- 启动listenhandler 监听来自其他节点的连接
- connrequest 向其他节点发起连接
// Start launches the connection manager and begins connecting to the network.
func (cm *ConnManager) Start() {
// Already started?
if atomic.AddInt32(&cm.start, 1) != 1 {
return
}
log.Trace("Connection manager started")
cm.wg.Add(1)
go cm.connHandler()
// Start all the listeners so long as the caller requested them and
// provided a callback to be invoked when connections are accepted.
if cm.cfg.OnAccept != nil {
for _, listner := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenHandler(listner)
}
}
for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq()
}
}
三、对监听到的连接的处理
- 当有新的连接请求,会将此连接对象直接交给回调函数onAccept()处理,该函数的具体实现是在server中s.inboundPeerConnected()方法
- s.inboundPeerConnected() 会创建一个inbound peer,并进入peer的处理逻辑
// listenHandler accepts incoming connections on a given listener. It must be
// run as a goroutine.
func (cm *ConnManager) listenHandler(listener net.Listener) {
log.Infof("Server listening on %s", listener.Addr())
for atomic.LoadInt32(&cm.stop) == 0 {
conn, err := listener.Accept()
if err != nil {
// Only log the error if not forcibly shutting down.
if atomic.LoadInt32(&cm.stop) == 0 {
log.Errorf("Can't accept connection: %v", err)
}
continue
}
go cm.cfg.OnAccept(conn)
}
cm.wg.Done()
log.Tracef("Listener handler done for %s", listener.Addr())
}
onaccept的具体实现
// inboundPeerConnected is invoked by the connection manager when a new inbound
// connection is established. It initializes a new inbound server peer
// instance, associates it with the connection, and starts a goroutine to wait
// for disconnection.
func (s *server) inboundPeerConnected(conn net.Conn) {
sp := newServerPeer(s, false)
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
sp.AssociateConnection(conn)
go s.peerDoneHandler(sp)
}
四、对主动发起的连接的处理
- 生成一个新的请求id
- 然后将请求的结果统一发给connhandler处理
- connhandler 根据不同的结果分别做不同的处理
- 如果成功则调用OnConnection回调函数,该函数由server的s.outboundPeerConnected()实现
- 如果请求失败则调用自身的handleFailedConn()函数,尝试再次请求等逻辑
- s.outboundPeerConnected 同样是先创建一个outbound peer,然后进入peer的处理逻辑
// Connect assigns an id and dials a connection to the address of the
// connection request.
func (cm *ConnManager) Connect(c *ConnReq) {
if atomic.LoadInt32(&cm.stop) != 0 {
return
}
if atomic.LoadUint64(&c.id) == 0 {
atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
}
log.Debugf("Attempting to connect to %v", c)
conn, err := cm.cfg.Dial(c.Addr)
if err != nil {
cm.requests <- handleFailed{c, err}
} else {
cm.requests <- handleConnected{c, conn}
}
}
// connHandler handles all connection related requests. It must be run as a
// goroutine.
//
// The connection handler makes sure that we maintain a pool of active outbound
// connections so that we remain connected to the network. Connection requests
// are processed and mapped by their assigned ids.
func (cm *ConnManager) connHandler() {
conns := make(map[uint64]*ConnReq, cm.cfg.TargetOutbound)
out:
for {
select {
case req := <-cm.requests:
switch msg := req.(type) {
case handleConnected:
connReq := msg.c
connReq.updateState(ConnEstablished)
connReq.conn = msg.conn
conns[connReq.id] = connReq
log.Debugf("Connected to %v", connReq)
connReq.retryCount = 0
cm.failedAttempts = 0
if cm.cfg.OnConnection != nil {
go cm.cfg.OnConnection(connReq, msg.conn)
}
case handleDisconnected:
if connReq, ok := conns[msg.id]; ok {
connReq.updateState(ConnDisconnected)
if connReq.conn != nil {
connReq.conn.Close()
}
log.Debugf("Disconnected from %v", connReq)
delete(conns, msg.id)
if cm.cfg.OnDisconnection != nil {
go cm.cfg.OnDisconnection(connReq)
}
if uint32(len(conns)) < cm.cfg.TargetOutbound && msg.retry {
cm.handleFailedConn(connReq)
}
} else {
log.Errorf("Unknown connection: %d", msg.id)
}
case handleFailed:
connReq := msg.c
connReq.updateState(ConnFailed)
log.Debugf("Failed to connect to %v: %v", connReq, msg.err)
cm.handleFailedConn(connReq)
}
case <-cm.quit:
break out
}
}
cm.wg.Done()
log.Trace("Connection handler done")
}
OnConnection的具体实现
// outboundPeerConnected is invoked by the connection manager when a new
// outbound connection is established. It initializes a new outbound server
// peer instance, associates it with the relevant state such as the connection
// request instance and the connection itself, and finally notifies the address
// manager of the attempt.
func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
sp := newServerPeer(s, c.Permanent)
p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
s.connManager.Disconnect(c.ID())
}
sp.Peer = p
sp.connReq = c
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.AssociateConnection(conn)
go s.peerDoneHandler(sp)
s.addrManager.Attempt(sp.NA())
}
a -> b a向b节点发起连接
b:
某场景下触发了ban()和disconnect(),b会在banduration(默认24小时)之内,不允许a连接(onVersion里 最后调用addpeer()时判断,然后断掉连接)
a:在被b ban了之后,peer.start()会失败(在协商version时被b断开),接着调用disconnect()
server作为枢纽,处理connmanager和peer之间的协作
网友评论