Btcd区块在P2P网络上的传播之ConnMgr

作者: oceanken | 来源:发表于2018-03-15 18:06 被阅读199次

    上一篇文章我们介绍了Peer收发消息的机制,它是以Peer之间建立TCP连接为前提的;本文将介绍Peer之间如何建立及维护TCP接连。节点之间可以直接建立连接,也可以通过代理(Proxy)连接;特别地,它们之间还可以通过洋葱代理(Onion Proxy)建立TCP连接,节点也可以将自己隐藏在“暗网”中以洋葱地址的(.onion address)的形式供其他节点连接。接下来,我们将通过代码来分析这些连接方式是如何实现的。

    btcd/connmgr包中的文件包括:

    • connmanager.go: 处理建立新的连接、通知连接状态、重连及断开连接等主要逻辑;
    • dynamicbanscore.go:实现了一个动态计分器,用于记录Peer之间消息交换的频率,当分数大于设定的门限时会主动断开连接,这是为了防止类似于DDoS攻击;
    • seed.go: 负责将内置于全节点客户端里的种子节点的地址解析成Bitcoin协议里定义的网络地址;
    • tor.go: 通过洋葱代理建立连接的节点,需要在Tor网络上的最后一跳,即退出节点(exit node)上进行DNS解析,然后将解析结果通过洋葱代理返回给节点,tor.go主要实现了通过洋葱代理进行DNS解析的SOCKS消息交换。需要注意的是,这里的DNS解析并不是解析洋葱地址,而是解析公网上的域名或者hostname,解析洋葱地址是不能成功而且无意义的。
    • log.go: 提供logger初始化及设定logger等方法;
    • doc.go: 包btcd/connmgr的doc文件;
    • connmanager_test.go、dynamicbanscore_test.go: 定义了相应的Test方法;

    通过代理或者洋葱代理进行TCP连接的代码位于btcsuite/go-socks(btcd项目的btcsuite/btcd/vendor/github.com/btcsuite/go-socks目录),它实现了SOCKS 5协议的client部分,包含的文件有:

    • addr.go: 定义了ProxiedAddr,用于描述代理的外部地址,包括网络类型(如tcp),主机名或地址及端口号;
    • conn.go: 定义了proxiedConn,用于描述被代理的连接,提供了读、写代理连接的方法等;
    • dial.go: 实现了建立代理连接的逻辑;

    虽然ConnMgr支持通过洋葱代理与“明网”或者“暗网”中的节点连接,但本文暂不深入介绍Tor网络相关的知识,我们将在后文《Bitcoin网络与Tor网络的匿名性讨论》中详细介绍。接下来,我们先分析btcd/connmgr来了解连接建立及管理的机制,然后分析btcsuite/go-socks来了解通过代理进行连接的过程。btcd/connmgr中的主要类型包括: ConnManager、Config和ConnReq,它们的定义如下:

    //btcd/connmgr/connmanager.go
    
    // ConnManager provides a manager to handle network connections.
    type ConnManager struct {
        // The following variables must only be used atomically.
        connReqCount uint64
        start        int32
        stop         int32
    
        cfg            Config
        wg             sync.WaitGroup
        failedAttempts uint64
        requests       chan interface{}
        quit           chan struct{}
    }
    

    各字段的意义如下:

    • connReqCount: 记录主动连接其他节点的连接数量;
    • start: 标识connmgr已经启动;
    • stop: 标识connmgr已经结束;
    • cfg: 设定相关的配置,在Config的定义中介绍;
    • wg: 用于同步connmgr的退出状态,调用方可以阻塞等待connmgr的工作协程退出;
    • failedAttempts: 某个连接失败后,ConnMgr尝试选择新的Peer地址连接的总次数;
    • requests:用于与connmgr工作协程通信的管道;
    • quit: 用于通知工作协程退出;

    ConnManager依赖于Config:

    //btcd/connmgr/connmanager.go
    
    // Config holds the configuration options related to the connection manager.
    type Config struct {
        // Listeners defines a slice of listeners for which the connection
        // manager will take ownership of and accept connections.  When a
        // connection is accepted, the OnAccept handler will be invoked with the
        // connection.  Since the connection manager takes ownership of these
        // listeners, they will be closed when the connection manager is
        // stopped.
        //
        // This field will not have any effect if the OnAccept field is not
        // also specified.  It may be nil if the caller does not wish to listen
        // for incoming connections.
        Listeners []net.Listener
    
        // OnAccept is a callback that is fired when an inbound connection is
        // accepted.  It is the caller's responsibility to close the connection.
        // Failure to close the connection will result in the connection manager
        // believing the connection is still active and thus have undesirable
        // side effects such as still counting toward maximum connection limits.
        //
        // This field will not have any effect if the Listeners field is not
        // also specified since there couldn't possibly be any accepted
        // connections in that case.
        OnAccept func(net.Conn)
    
        // TargetOutbound is the number of outbound network connections to
        // maintain. Defaults to 8.
        TargetOutbound uint32
    
        // RetryDuration is the duration to wait before retrying connection
        // requests. Defaults to 5s.
        RetryDuration time.Duration
    
        // OnConnection is a callback that is fired when a new outbound
        // connection is established.
        OnConnection func(*ConnReq, net.Conn)
    
        // OnDisconnection is a callback that is fired when an outbound
        // connection is disconnected.
        OnDisconnection func(*ConnReq)
    
        // GetNewAddress is a way to get an address to make a network connection
        // to.  If nil, no new connections will be made automatically.
        GetNewAddress func() (net.Addr, error)
    
        // Dial connects to the address on the named network. It cannot be nil.
        Dial func(net.Addr) (net.Conn, error)
    }
    

    各字段意义如下:

    • Listeners: 节点上所有等待外部连接的监听点;
    • OnAccept: 节点应答并接受外部连接后的回调函数;
    • TargetOutbound:节点主动向外连接Peer的最大个数;
    • RetryDuration: 连接失败后发起重连的等待时间,默认为5s,默认的最大重连等待时间为5min;
    • OnConnection: 连接建立成功后的回调函数;
    • OnDisconnection: 连接关闭后的回调函数;
    • GetNewAddress: 连接失败后,ConnMgr可能会选择新的Peer进行连接,GetNewAddress函数提供获取新Peer地址的方法,它最终会调用addrManager的GetAddress()来分配新地址,我们将在介绍addrmgr时详细介绍;
    • Dial: 定义建立TCP连接的方式,是直连还是通过代理连接;

    ConnReq描述了一个连接,它的定义如下:

    //btcd/connmgr/connmanager.go
    
    // ConnReq is the connection request to a network address. If permanent, the
    // connection will be retried on disconnection.
    type ConnReq struct {
        // The following variables must only be used atomically.
        id uint64
    
        Addr      net.Addr
        Permanent bool
    
        conn       net.Conn
        state      ConnState
        stateMtx   sync.RWMutex
        retryCount uint32
    }
    
    • id: 连接的序号,用于索引;
    • Addr: 连接的目的地址;
    • Permanent: 标识是否与Peer保持永久连接,如果为true,则连接失败后,继续尝试与该Peer连接,而不是选择新的Peer地址重新连接;
    • conn: 连接成功后,真实的net.Conn对象;
    • state: 连接的状态,有ConnPending、ConnEstablished、ConnDisconnected及ConnFailed等;
    • stateMtx: 保护state状态的读写锁;
    • retryCount: 如果Permanent为true,retryCount记录该连接重复重连的次数;

    我们先从ConnManager的Start()方法入手来分析它的工作机制:

    //btcd/connmgr/connmanager.go
    
    // Start launches the connection manager and begins connecting to the network.
    func (cm *ConnManager) Start() {
        // Already started?
        if atomic.AddInt32(&cm.start, 1) != 1 {
            return
        }
    
        log.Trace("Connection manager started")
        cm.wg.Add(1)
        go cm.connHandler()                                                                          (1)
    
        // Start all the listeners so long as the caller requested them and
        // provided a callback to be invoked when connections are accepted.
        if cm.cfg.OnAccept != nil {
            for _, listner := range cm.cfg.Listeners {
                cm.wg.Add(1)
                go cm.listenHandler(listner)                                                         (2)
            }
        }
    
        for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
            go cm.NewConnReq()                                                                       (3)
        }
    }
    

    可以看出,ConnMgr启动时主要有如下过程:

    1. 启动工作协程connHandler;
    2. 启动监听协程listenHandler,等待其他节点连接;
    3. 启动建立连接的协程,选择Peer地址并主动连接;

    ConnMgr中各协程及其通信的channel示意如下图所示:

    其中caller是指调用协程,onConnect、OnDisconnect和OnAccept均在新的协程中回调,以免阻塞ConnMgr的工作协程和监听协程。在开始分析上述三个协程之前,我们先来看看Connect()和Disconnect()方法了解建立和断开连接的实现:

    //btcd/connmgr/connmanager.go
    
    // Connect assigns an id and dials a connection to the address of the
    // connection request.
    func (cm *ConnManager) Connect(c *ConnReq) {
    
        ......
    
        conn, err := cm.cfg.Dial(c.Addr)
        if err != nil {
            cm.requests <- handleFailed{c, err}
        } else {
            cm.requests <- handleConnected{c, conn}
        }
    }
    
    // Disconnect disconnects the connection corresponding to the given connection
    // id. If permanent, the connection will be retried with an increasing backoff
    // duration.
    func (cm *ConnManager) Disconnect(id uint64) {
        if atomic.LoadInt32(&cm.stop) != 0 {
            return
        }
        cm.requests <- handleDisconnected{id, true}
    }
    

    可以看出,建立连接的过程就是调用指定的Dial()方法来进行TCP握手,如果与Peer直连(指不经过代理),则直接调用net.Dial()进行连接;如果通过代理与Peer连接,则会调用SOCKS Proxy的Dial()方法,我们将在分析go-socks中看到。然后,根据是否连接成功向connHandler发送成功或者失败的消息,让connHandler进一步处理。调用Disconnect断开连接则向connHandler发送handleDisconnected消息让connHandler进一步处理。看来,连接或者断开连接的主要处理逻辑在connHandler中,我们来看看它的实现:

    //btcd/connmgr/connmanager.go
    
    // connHandler handles all connection related requests.  It must be run as a
    // goroutine.
    //
    // The connection handler makes sure that we maintain a pool of active outbound
    // connections so that we remain connected to the network.  Connection requests
    // are processed and mapped by their assigned ids.
    func (cm *ConnManager) connHandler() {
        conns := make(map[uint64]*ConnReq, cm.cfg.TargetOutbound)
    out:
        for {
            select {
            case req := <-cm.requests:
                switch msg := req.(type) {
    
                case handleConnected:
                    connReq := msg.c
                    connReq.updateState(ConnEstablished)
                    connReq.conn = msg.conn
                    conns[connReq.id] = connReq
                    log.Debugf("Connected to %v", connReq)
                    connReq.retryCount = 0
                    cm.failedAttempts = 0
    
                    if cm.cfg.OnConnection != nil {
                        go cm.cfg.OnConnection(connReq, msg.conn)
                    }
    
                case handleDisconnected:
                    if connReq, ok := conns[msg.id]; ok {
                        connReq.updateState(ConnDisconnected)
                        if connReq.conn != nil {
                            connReq.conn.Close()
                        }
                        log.Debugf("Disconnected from %v", connReq)
                        delete(conns, msg.id)
    
                        if cm.cfg.OnDisconnection != nil {
                            go cm.cfg.OnDisconnection(connReq)
                        }
    
                        if uint32(len(conns)) < cm.cfg.TargetOutbound && msg.retry {
                            cm.handleFailedConn(connReq)
                        }
                    } else {
                        log.Errorf("Unknown connection: %d", msg.id)
                    }
    
                case handleFailed:
                    connReq := msg.c
                    connReq.updateState(ConnFailed)
                    log.Debugf("Failed to connect to %v: %v", connReq, msg.err)
                    cm.handleFailedConn(connReq)
                }
    
            case <-cm.quit:
                break out
            }
        }
    
        cm.wg.Done()
        log.Trace("Connection handler done")
    }
    

    connHandler主要处理连接建立成功、失败和断连这三种情况:

    1. 如果连接成功,首先更新连接的状态为ConnEstablished,同时将该连接添加到conns中以跟踪它的后续状态,并将retryCount和failedAttempts重置,随后在新的goroutine中回调OnConnection;
    2. 如果要断开连接,先从conns找到要断开的connReq,更新连接状态为ConnDisconnected,调用net.Conn的Close()方法断开TCP连接,随后在新的goroutine中回调OnDisconnection;最后,如果是当前的活跃连接数少于设定的最大门限且retry设为true,则调用handleFailedConn进行重连或者选择新的Peer连接;
    3. 如果连接失败,则将连接状态更新为ConnFailed,同时调用handleFailedConn进行重连或者选择新的Peer连接;

    需要注意的是,ConnMgr只处理了连接建立成功或者失败的情况,并没有专门处理连接成功一段时间后连接中断的情况,这是因为TCP socket虽然有keepalive选项开启心跳,但并没有心跳超时的回调,只有当调用write()方法写入数据返回错误时才能检测到连接中断,所以一般需要应用层协议通过心跳的方式检测网络中断的情形。我们在《Btcd区块在P2P网络上的传播之Peer》中介绍过,Peer之间会发送Ping/Pong心跳来维持及检测连接。如果Pong消息超时或者outHandler向net.Conn写数据出错时,Peer的Disconnect()方法会被调用以主动断开连接,并退出Peer的工作协程。当Peer连接建立成功并回调OnConnect()时,server会新起一个goroutine守护与Peer的连接状态;当Peer断连并退出时,server随即会调用ConnMgr的Disconnect()方法以清除该连接。

    接下来,我们看看handleFailedConn的实现:

    //btcd/connmgr/connmanager.go
    
    // handleFailedConn handles a connection failed due to a disconnect or any
    // other failure. If permanent, it retries the connection after the configured
    // retry duration. Otherwise, if required, it makes a new connection request.
    // After maxFailedConnectionAttempts new connections will be retried after the
    // configured retry duration.
    func (cm *ConnManager) handleFailedConn(c *ConnReq) {
        if atomic.LoadInt32(&cm.stop) != 0 {
            return
        }
        if c.Permanent {
            c.retryCount++
            d := time.Duration(c.retryCount) * cm.cfg.RetryDuration
            if d > maxRetryDuration {
                d = maxRetryDuration
            }
            log.Debugf("Retrying connection to %v in %v", c, d)
            time.AfterFunc(d, func() {
                cm.Connect(c)
            })
        } else if cm.cfg.GetNewAddress != nil {
            cm.failedAttempts++
            if cm.failedAttempts >= maxFailedAttempts {
                ......
                time.AfterFunc(cm.cfg.RetryDuration, func() {
                    cm.NewConnReq()
                })
            } else {
                go cm.NewConnReq()()
            }
        }
    }
    

    handleFailedConn主要处理重连逻辑,它的主要思想为:

    1. 如果连接的Permanent为true,即该连接为“持久”连接,连接失败进需要重连;需要注意的时,重连的等待时间是与重连的次数成正比的,即第1次重连需等待5s,第2次重连需要等待10s,以次类推,最大等待时间为5min;
    2. 如果连接不是“持久”连接,则选择新的Peer进行连接,如果尝试新连接的次数超限(默认为25次),则表明节点的出口网络可能断连,需要延时连接,默认延时5s;

    动态选择Peer并发起连接的过程在NewConnReq()中实现:

    //btcd/connmgr/connmanager.go
    
    / NewConnReq creates a new connection request and connects to the
    // corresponding address.
    func (cm *ConnManager) NewConnReq() {
    
        ......
    
        c := &ConnReq{}
        atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
    
        addr, err := cm.cfg.GetNewAddress()
        if err != nil {
            cm.requests <- handleFailed{c, err}
            return
        }
    
        c.Addr = addr
    
        cm.Connect(c)
    }
    

    其主要过程为:

    1. 新建ConnReq对象,并为其分配一个id;
    2. 通过GetNewAddress()从addrmgr维护的地址仓库中随机选择一个Peer的可达地址,如果地址选择失败,则由connHandler再次发起新的连接;
    3. 调用Connect()方法开始与Peer建立连接;

    上面各方法已经展示了ConnMgr主动与Peer建立连接,及失败后重连或者选择新地址连接的过程,接下来,我们通过listenHandler来看它被动等待连接的实现:

    //btcd/connmgr/connmanager.go
    
    // listenHandler accepts incoming connections on a given listener.  It must be
    // run as a goroutine.
    func (cm *ConnManager) listenHandler(listener net.Listener) {
        log.Infof("Server listening on %s", listener.Addr())
        for atomic.LoadInt32(&cm.stop) == 0 {
            conn, err := listener.Accept()
            if err != nil {
                // Only log the error if not forcibly shutting down.
                if atomic.LoadInt32(&cm.stop) == 0 {
                    log.Errorf("Can't accept connection: %v", err)
                }
                continue
            }
            go cm.cfg.OnAccept(conn)
        }
    
        cm.wg.Done()
        log.Tracef("Listener handler done for %s", listener.Addr())
    }
    

    可以看出,listenHandler主要是等待连接,连接成功后在新协程中回调OnAccept。实际上,OnConnect和OnAccept回调将在server中实现,而是创建Peer并调用Peer的AssociateConnection()方法的入口,我们将在分析server.go中详细介绍。

    以上就是ConnMgr建立及维护连接的主要过程。接下来,我们来分析用于防止DDoS攻击的动态计分器是如何实现的,先看DynamicBanScore的定义:

    //btcd/connmgr/dynamicbanscore.go
    
    // DynamicBanScore provides dynamic ban scores consisting of a persistent and a
    // decaying component. The persistent score could be utilized to create simple
    // additive banning policies similar to those found in other bitcoin node
    // implementations.
    //
    // The decaying score enables the creation of evasive logic which handles
    // misbehaving peers (especially application layer DoS attacks) gracefully
    // by disconnecting and banning peers attempting various kinds of flooding.
    // DynamicBanScore allows these two approaches to be used in tandem.
    //
    // Zero value: Values of type DynamicBanScore are immediately ready for use upon
    // declaration.
    type DynamicBanScore struct {
        lastUnix   int64
        transient  float64
        persistent uint32
        mtx        sync.Mutex
    }
    

    其各字段意义如下:

    • lastUnix: 上一次调整分值的Unix时间点;
    • transient: 分值的浮动衰减部分;
    • persistent: 分值中不会自动衰减的部分;
    • mtx: 保护transient和persistent的互斥锁;

    从上面的定义看,DynamicBanScore提供的分值是由一个不变值和瞬时值构成的,那么这两值到底是如何起作用的呢,我们可以看看它的int()方法:

    //btcd/connmgr/dynamicbanscore.go
    
    // int returns the ban score, the sum of the persistent and decaying scores at a
    // given point in time.
    //
    // This function is not safe for concurrent access. It is intended to be used
    // internally and during testing.
    func (s *DynamicBanScore) int(t time.Time) uint32 {
        dt := t.Unix() - s.lastUnix
        if s.transient < 1 || dt < 0 || Lifetime < dt {
            return s.persistent
        }
        return s.persistent + uint32(s.transient*decayFactor(dt))
    }
    
    

    可以看出,最后的分值等于persistent加上transient乘以一个衰减系数后的和。其中衰减系数随时间变化,它由decayFactor()决定:

    //btcd/connmgr/dynamicbanscore.go
    
    // decayFactor returns the decay factor at t seconds, using precalculated values
    // if available, or calculating the factor if needed.
    func decayFactor(t int64) float64 {
        if t < precomputedLen {
            return precomputedFactor[t]
        }
        return math.Exp(-1.0 * float64(t) * lambda)
    }
    

    可以看出,衰减系数是按时间间隔呈指数分布的,其中Lambda=ln2/60。动态分值随时间时隔变化的曲线如下图所示:


    这里的时间间隔是指当前取值时刻距上一次主动调节persistent或者transistent值的时间差。

    //btcd/connmgr/dynamicbanscore.go
    
    // increase increases the persistent, the decaying or both scores by the values
    // passed as parameters. The resulting score is calculated as if the action was
    // carried out at the point time represented by the third parameter. The
    // resulting score is returned.
    //
    // This function is not safe for concurrent access.
    func (s *DynamicBanScore) increase(persistent, transient uint32, t time.Time) uint32 {
        s.persistent += persistent
        tu := t.Unix()
        dt := tu - s.lastUnix
    
        if transient > 0 {
            if Lifetime < dt {
                s.transient = 0
            } else if s.transient > 1 && dt > 0 {
                s.transient *= decayFactor(dt)
            }
            s.transient += float64(transient)
            s.lastUnix = tu
        }
        return s.persistent + uint32(s.transient)
    }
    

    可以看出,主动调节score值时,先将persistent值直接相加,然后算出传入时刻t的transient值,再与传入的transient值相加后得到新的transient值,新的persistent与新的transient值相加后得到新的score。实际上,就是t时刻的score加上传入的persistent和transient即得到新的score。

    Peer之间交换消息时,每一个Peer连接会有一个动态计分器来监控它们之间收发消息的频率,太频繁地收到某个Peer发过来的消息时,将被怀疑遭到DDoS攻击,从而主动断开与它的连接,我们将在分析协议消息的收发时看到这一点。

    通过前面的分析,我们知道ConnMgr会通过GetNewAddress()来选取Peer的地址,但一个新的节点接入时,它还没有与任何Peer交换过地址信息,所以它的地址仓库是空的,那它该与哪些节点先建立连接呢?实际上,节点会内置一些种子节点的地址:

    //btcd/chaincfg/params.go
    
    // MainNetParams defines the network parameters for the main Bitcoin network.
    var MainNetParams = Params{
        Name:        "mainnet",
        Net:         wire.MainNet,
        DefaultPort: "8333",
        DNSSeeds: []DNSSeed{
            {"seed.bitcoin.sipa.be", true},
            {"dnsseed.bluematt.me", true},
            {"dnsseed.bitcoin.dashjr.org", false},
            {"seed.bitcoinstats.com", true},
            {"seed.bitnodes.io", false},
            {"seed.bitcoin.jonasschnelli.ch", true},
        },
    
        ......
    }
    

    Btcd节点内置了如上6个种子节点的域名。然而,在ConnMgr连接种子节点之前,必须进行DNS Lookup查询它们对应的IP地址,这是在SeedFromDNS()中完成的:

    //btcd/connmgr/seed.go
    
    // SeedFromDNS uses DNS seeding to populate the address manager with peers.
    func SeedFromDNS(chainParams *chaincfg.Params, reqServices wire.ServiceFlag,
        lookupFn LookupFunc, seedFn OnSeed) {
    
        for _, dnsseed := range chainParams.DNSSeeds {
            var host string
            if !dnsseed.HasFiltering || reqServices == wire.SFNodeNetwork {
                host = dnsseed.Host
            } else {
                host = fmt.Sprintf("x%x.%s", uint64(reqServices), dnsseed.Host)
            }
    
            go func(host string) {
                randSource := mrand.New(mrand.NewSource(time.Now().UnixNano()))
    
                seedpeers, err := lookupFn(host)                                        (1)
                if err != nil {
                    log.Infof("DNS discovery failed on seed %s: %v", host, err)
                    return
                }
                numPeers := len(seedpeers)
    
                log.Infof("%d addresses found from DNS seed %s", numPeers, host)
    
                if numPeers == 0 {
                    return
                }
                addresses := make([]*wire.NetAddress, len(seedpeers))
                // if this errors then we have *real* problems
                intPort, _ := strconv.Atoi(chainParams.DefaultPort)
                for i, peer := range seedpeers {
                    addresses[i] = wire.NewNetAddressTimestamp(                         (2)
                        // bitcoind seeds with addresses from
                        // a time randomly selected between 3
                        // and 7 days ago.
                        time.Now().Add(-1*time.Second*time.Duration(secondsIn3Days+
                            randSource.Int31n(secondsIn4Days))),
                        0, peer, uint16(intPort))
                }
    
                seedFn(addresses)
            }(host)
        }
    }
    

    它的主要步骤为:

    1. 调用lookupFn()进行DNS resolve,将种子节点的域名解析了IP地址;
    2. 将种子节点的IP地址封装为协议地址wire.NetAddress,其中主要是增加了地址的时效性,这里将地址的时效随机地设为3到7天。

    这里传入的lookupFn()根据配置,有可能是节点自己访问DNS Server解析,也有可能通过洋葱代理进行解析:

    //btcd/config.go
    
    func loadConfig() (*config, []string, error) {
    
        ......
    
        // Setup dial and DNS resolution (lookup) functions depending on the
        // specified options.  The default is to use the standard
        // net.DialTimeout function as well as the system DNS resolver.  When a
        // proxy is specified, the dial function is set to the proxy specific
        // dial function and the lookup is set to use tor (unless --noonion is
        // specified in which case the system DNS resolver is used).
        cfg.dial = net.DialTimeout
        cfg.lookup = net.LookupIP
        if cfg.Proxy != "" {
            _, _, err := net.SplitHostPort(cfg.Proxy)
    
            ......
    
            // Tor isolation flag means proxy credentials will be overridden
            // unless there is also an onion proxy configured in which case
            // that one will be overridden.
            torIsolation := false
            if cfg.TorIsolation && cfg.OnionProxy == "" &&
                (cfg.ProxyUser != "" || cfg.ProxyPass != "") {
    
                torIsolation = true
                fmt.Fprintln(os.Stderr, "Tor isolation set -- "+
                    "overriding specified proxy user credentials")
            }
    
            proxy := &socks.Proxy{
                Addr:         cfg.Proxy,
                Username:     cfg.ProxyUser,
                Password:     cfg.ProxyPass,
                TorIsolation: torIsolation,
            }
            cfg.dial = proxy.DialTimeout
    
            // Treat the proxy as tor and perform DNS resolution through it
            // unless the --noonion flag is set or there is an
            // onion-specific proxy configured.
            if !cfg.NoOnion && cfg.OnionProxy == "" {
                cfg.lookup = func(host string) ([]net.IP, error) {
                    return connmgr.TorLookupIP(host, cfg.Proxy)
                }
            }
        }
    
        // Setup onion address dial function depending on the specified options.
        // The default is to use the same dial function selected above.  However,
        // when an onion-specific proxy is specified, the onion address dial
        // function is set to use the onion-specific proxy while leaving the
        // normal dial function as selected above.  This allows .onion address
        // traffic to be routed through a different proxy than normal traffic.
        if cfg.OnionProxy != "" {
            _, _, err := net.SplitHostPort(cfg.OnionProxy)
    
            ......
    
            cfg.oniondial = func(network, addr string, timeout time.Duration) (net.Conn, error) {
                proxy := &socks.Proxy{
                    Addr:         cfg.OnionProxy,
                    Username:     cfg.OnionProxyUser,
                    Password:     cfg.OnionProxyPass,
                    TorIsolation: cfg.TorIsolation,
                }
                return proxy.DialTimeout(network, addr, timeout)
            }
    
            // When configured in bridge mode (both --onion and --proxy are
            // configured), it means that the proxy configured by --proxy is
            // not a tor proxy, so override the DNS resolution to use the
            // onion-specific proxy.
            if cfg.Proxy != "" {
                cfg.lookup = func(host string) ([]net.IP, error) {
                    return connmgr.TorLookupIP(host, cfg.OnionProxy)
                }
            }
        } else {
            cfg.oniondial = cfg.dial
        }
    
        // Specifying --noonion means the onion address dial function results in
        // an error.
        if cfg.NoOnion {
            cfg.oniondial = func(a, b string, t time.Duration) (net.Conn, error) {
                return nil, errors.New("tor has been disabled")
            }
        }
    
        ......
    }
    
    

    从上述代码可以看出:

    1. 默认的DNS Lookup和Dial方法就是标准的net.LookupIP和net.DialTimeout;
    2. 如果设置了代理,Dial方法将使用SOCKS Proxy的DialTimeout(),如果未禁用洋葱代理,则默认代理为洋葱代理,DNS查询将通过connmgr的TorLookupIP()实现;
    3. 如果专门设置了洋葱代理,则设定对“暗网”服务(hidden service)的连接采用SOCKS Proxy的DialTimeout(),DNS Lookup将使用connmgr的TorLookupIP();请注意,即使设置了洋葱代理,对“明网”地址的连接仍是根据是否设置了普通SOCKS代理(非Tor代理)来决定采用标准的net.DialTimeout还是Proxy的DialTimeout;

    无论是通过普通代理还是洋葱代理连接Peer,对节点来讲,它们均是SOCKS代理服务器,节点与它们之间通过SOCKS协议来通信。与普通代理相比,洋葱代理扩展了SOCKS协议,加入了对Name lookup、Stream Isolation等的支持。SOCKS协议位于会话层,在传输层与应用层之间,所以它不仅可以代理HTTP流量,也可以代理如FTP、XMPP等等的其他应用流量。SOCKS协议比较简单,我们不再展开介绍,读者可以阅读RFC1928RFC1929来了解它的消息格式。为了了解Btcd如何通过SOCKS代理建立连接,我们来看看Proxy的dial()方法:

    //btcd/vendor/github.com/btcsuite/go-socks/dial.go
    
    func (p *Proxy) dial(network, addr string, timeout time.Duration) (net.Conn, error) {
        host, strPort, err := net.SplitHostPort(addr)
        if err != nil {
            return nil, err
        }
        port, err := strconv.Atoi(strPort)
        if err != nil {
            return nil, err
        }
    
        conn, err := net.DialTimeout("tcp", p.Addr, timeout)                 (1)
        if err != nil {
            return nil, err
        }
    
        var user, pass string
        if p.TorIsolation {                                                  (2)
            var b [16]byte
            _, err := io.ReadFull(rand.Reader, b[:])
            if err != nil {
                conn.Close()
                return nil, err
            }
            user = hex.EncodeToString(b[0:8])
            pass = hex.EncodeToString(b[8:16])
        } else {
            user = p.Username
            pass = p.Password
        }
        buf := make([]byte, 32+len(host)+len(user)+len(pass))
    
        // Initial greeting
        buf[0] = protocolVersion                                             (3)
        if user != "" {
            buf = buf[:4]
            buf[1] = 2 // num auth methods
            buf[2] = authNone
            buf[3] = authUsernamePassword
        } else {
            buf = buf[:3]
            buf[1] = 1 // num auth methods
            buf[2] = authNone
        }
    
        _, err = conn.Write(buf)
        if err != nil {
            conn.Close()
            return nil, err
        }
    
        // Server's auth choice
    
        if _, err := io.ReadFull(conn, buf[:2]); err != nil {
            conn.Close()
            return nil, err
        }
        if buf[0] != protocolVersion {
            conn.Close()
            return nil, ErrInvalidProxyResponse
        }
        err = nil
        switch buf[1] {
        default:
            err = ErrInvalidProxyResponse
        case authUnavailable:
            err = ErrNoAcceptableAuthMethod
        case authGssApi:
            err = ErrNoAcceptableAuthMethod
        case authUsernamePassword:
            buf = buf[:3+len(user)+len(pass)]                                (4)
            buf[0] = 1 // version
            buf[1] = byte(len(user))
            copy(buf[2:], user)
            buf[2+len(user)] = byte(len(pass))
            copy(buf[3+len(user):], pass)
            if _, err = conn.Write(buf); err != nil {
                conn.Close()
                return nil, err
            }
            if _, err = io.ReadFull(conn, buf[:2]); err != nil {
                conn.Close()
                return nil, err
            }
            if buf[0] != 1 { // version
                err = ErrInvalidProxyResponse
            } else if buf[1] != 0 { // 0 = succes, else auth failed
                err = ErrAuthFailed
            }
        case authNone:
            // Do nothing
        }
        if err != nil {
            conn.Close()
            return nil, err
        }
    
        // Command / connection request
    
        buf = buf[:7+len(host)]                                              (5)
        buf[0] = protocolVersion
        buf[1] = commandTcpConnect
        buf[2] = 0 // reserved
        buf[3] = addressTypeDomain
        buf[4] = byte(len(host))
        copy(buf[5:], host)
        buf[5+len(host)] = byte(port >> 8)
        buf[6+len(host)] = byte(port & 0xff)
        if _, err := conn.Write(buf); err != nil {
            conn.Close()
            return nil, err
        }
    
        // Server response
    
        if _, err := io.ReadFull(conn, buf[:4]); err != nil {
            conn.Close()
            return nil, err
        }
    
        if buf[0] != protocolVersion {
            conn.Close()
            return nil, ErrInvalidProxyResponse
        }
    
        if buf[1] != statusRequestGranted {
            conn.Close()
            err := statusErrors[buf[1]]
            if err == nil {
                err = ErrInvalidProxyResponse
            }
            return nil, err
        }
    
        paddr := &ProxiedAddr{Net: network}
    
        switch buf[3] {                                                      (6)
        default:
            conn.Close()
            return nil, ErrInvalidProxyResponse
        case addressTypeIPv4:
            if _, err := io.ReadFull(conn, buf[:4]); err != nil {
                conn.Close()
                return nil, err
            }
            paddr.Host = net.IP(buf).String()
        case addressTypeIPv6:
            if _, err := io.ReadFull(conn, buf[:16]); err != nil {
                conn.Close()
                return nil, err
            }
            paddr.Host = net.IP(buf).String()
        case addressTypeDomain:
            if _, err := io.ReadFull(conn, buf[:1]); err != nil {
                conn.Close()
                return nil, err
            }
            domainLen := buf[0]
            if _, err := io.ReadFull(conn, buf[:domainLen]); err != nil {
                conn.Close()
                return nil, err
            }
            paddr.Host = string(buf[:domainLen])
        }
    
        if _, err := io.ReadFull(conn, buf[:2]); err != nil {
            conn.Close()
            return nil, err
        }
        paddr.Port = int(buf[0])<<8 | int(buf[1])
    
        return &proxiedConn{                                                 (7)
            conn:       conn,
            boundAddr:  paddr,
            remoteAddr: &ProxiedAddr{network, host, port},
        }, nil
    }
    

    由于Btcd节点之间均通过TCP连接,因此这里实现的是SOCKS代理TCP连接的情形。建立代理连接的主要步骤为:

    1. 与SOCKS代理服务器建立TCP连接,如代码(1)处所示;
    2. 客户端向代理服务器发送协议版本和METHOD集合的协商请求,如代码(3)处所示,客户端选择版本5,选择的认证方法为不验证或者用户名/密码验证,或者仅仅是不认证;
    3. 然后等待SOCKS服务器响应。如果SOCKS服务器不支持SOCKS 5,则协商失败;如果SOCKS服务器支持SOCKS 5,并同意不验证,则客户端可以直接发送后续请求,如果SOCKS服务器指定采用用户名/密码认证,则客户端随后向服务器提交用户名和密码,服务器将验证并返回结果,如代码(4)所示;
    4. 无需要认证或者用户名/密码验证通过后,客户端向SOCKS服务器发送CONNECT请求,并指明目的IP和端口号,如代码(5)处所示;
    5. SOCKS服务器响应CONNECT请求,如果代理连接成功,则返回外部的代理地址和端口。根据响应消息中指明的代理地址类型,代理地址可能是IPv4、IPv6或者Domain Name。
    6. 创建并返回一个代理连接对象proxiedConn,它的conn字段描述客户端与SOCKS服务器的TCP连接,该连接上的TCP报文将通过代理服务器转发给目的地址,boundAddr描述代理的外部地址和端口,remoteAddr描述目的地址与端口。

    特别地,如果客户端连接一个Tor代理,并且希望开启Stream Isolation特性,则随机生成用户名和密码并发往Tor代理服务器。Stream Isolation是为了禁止Tor网络在同一个“虚电路”上中继不同的TCP流,Tor代理服务器支持通过IsolateClientAddr、IsolateSOCKSAuth、IsolateClientProtocol、IsolateDestPort及IsolateDestAddr等方式来标识不同的TCP流。Btcd选择通过IsolateSOCKSAuth来支持Stream Isolation,使得同一节点在连接不同Peer或者重连相同Peer时的TCP在Tor网络中均能被“隔离”。然而,读者可能会产生疑问: 随机生成的用户名和密码如何被Tor代理服务器验证?实际上,Btcd这里使用随机用户名和密码,是要求Tor代理服务器作如下配置: 选择“NO AUTHENTICATION REQUIRED”作为验证方式,并且只通过username来标识不同代理请求。

    了解了通过SOCKS代理或者Tor代理与Peer建立TCP连接的机制后,我们就可以来看看如何通过Tor代理来进行DNS查询。再次强调一下,通过Tor代理进行DNS查询不是解析洋葱地址,而是解析“明网”中的域名。例如,用户通过Tor代理访问www.google.com时,用户可以选择先通过DNS查询到IP地址后,再通过Tor代理连接该IP地址;也可以将该域名作为目的地址发给Tor代理,让Tor网络的退出结点进行DNS查询,并建立与目的地址的连接。如果某些客户端不希望向DNS Server暴露自己的目标访问域名,同时又希望进行域名解析,那它可以通过Tor代理进行DNS解析。

    //btcd/connmgr/tor.go
    
    // TorLookupIP uses Tor to resolve DNS via the SOCKS extension they provide for
    // resolution over the Tor network. Tor itself doesn't support ipv6 so this
    // doesn't either.
    func TorLookupIP(host, proxy string) ([]net.IP, error) {
        conn, err := net.Dial("tcp", proxy)
        if err != nil {
            return nil, err
        }
        defer conn.Close()
    
        buf := []byte{'\x05', '\x01', '\x00'}                      (1)
        _, err = conn.Write(buf)
        if err != nil {
            return nil, err
        }
    
        buf = make([]byte, 2)
        _, err = conn.Read(buf)
        if err != nil {
            return nil, err
        }
        if buf[0] != '\x05' {
            return nil, ErrTorInvalidProxyResponse
        }
        if buf[1] != '\x00' {
            return nil, ErrTorUnrecognizedAuthMethod
        }
    
        buf = make([]byte, 7+len(host))
        buf[0] = 5      // protocol version
        buf[1] = '\xF0' // Tor Resolve                             (2)
        buf[2] = 0      // reserved
        buf[3] = 3      // Tor Resolve
        buf[4] = byte(len(host))
        copy(buf[5:], host)
        buf[5+len(host)] = 0 // Port 0
    
        _, err = conn.Write(buf)
        if err != nil {
            return nil, err
        }
    
        buf = make([]byte, 4)
        _, err = conn.Read(buf)
        if err != nil {
            return nil, err
        }
        if buf[0] != 5 {
            return nil, ErrTorInvalidProxyResponse
        }
        if buf[1] != 0 {
            if int(buf[1]) >= len(torStatusErrors) {
                return nil, ErrTorInvalidProxyResponse
            } else if err := torStatusErrors[buf[1]]; err != nil {
                return nil, err
            }
            return nil, ErrTorInvalidProxyResponse
        }
        if buf[3] != 1 {                                           (3)
            err := torStatusErrors[torGeneralError]
            return nil, err
        }
    
        buf = make([]byte, 4)
        bytes, err := conn.Read(buf)
        if err != nil {
            return nil, err
        }
        if bytes != 4 {
            return nil, ErrTorInvalidAddressResponse
        }
    
        r := binary.BigEndian.Uint32(buf)
    
        addr := make([]net.IP, 1)
        addr[0] = net.IPv4(byte(r>>24), byte(r>>16), byte(r>>8), byte(r))
    
        return addr, nil
    }
    

    其过程与建立代理连接的方程类似,即先协商版本与认证方式,再发送请求与等待响应。不同的地方在于:

    1. 选择不认证的方式,如代码(1)处所示;
    2. 请求的命令是'FO',它是Tor代理扩展的命令,指明用于Name Lookup,同时目标地址类型指定为DOMAINNAME,如代码(2)处所示;
    3. Tor退出节点进行DNS查询后,由Tor代码返回。这里仅接受IPv4地址,如代码(3)处所示;

    到此,我们就完整分析了Bitcoin P2P网络中Peer节点之间建立、维持和断开TCP连接的所有过程,包括了通过SOCKS代理或Tor代理进行连接或DNS查询的实现。然而,我们也了解到,除了节点内置的种子节点的地址,节点接入网络时并不知道其他节点的地址,那么节点是如何知道网络中其他节点的地址,以及如何选择Peer节点地址建立连接呢?我们将在《Btcd区块在P2P网络上的传播之AddrManager》中分析。由于本文涉及到了Tor网络,有些读者可能希望进一步了解Tor,同时,Bitcoin网络与Tor网络均做到了对源或者账户匿名,所以我们在分析AddrManager之前,下一篇文章将讨论Bitcoin网络与Tor网络匿名性。

    ==大家可以关注我的微信公众号,后续文章将在公众号中同步更新:==

    相关文章

      网友评论

        本文标题:Btcd区块在P2P网络上的传播之ConnMgr

        本文链接:https://www.haomeiwen.com/subject/edlfqftx.html