美文网首页区块链技术
比特币源码分析-网络(二)

比特币源码分析-网络(二)

作者: wolf4j | 来源:发表于2018-03-09 16:32 被阅读94次

    众所周知,比特币网络是采用的P2P网络体系,所以,没有明显的客户端与服务端的区别或者是概念,每一个节点既是自身的客户端,又是其它节点的服务端。

    sync.h中,定义了 CSemaphore,它包装了系统底层的信号量机制,对wait(), try_wait(),post()实现了封装,代码如下:

    class CSemaphore {
    private:
        boost::condition_variable condition;
        boost::mutex mutex;
        int value;
    public:
        CSemaphore(int init) : value(init) {}
        void wait() {}
        bool try_wait() {}
        void post() {}
    };
    

    用于控制网络连接时的最大数量,每一个网络节点的最大连接数受限于信号量所允许的最大值。

    下面我们按照一个网络连接从发送到接收到请求返回的这么个思路,来梳理代码逻辑。

    004.png

    CNode

    CNode定义在bitcoin.cpp中,是比较重要的也是较为复杂的一个类,节点的所有信息都包含在内:

    class CNode {
        SOCKET sock; //用来连接的socket句柄
        CDataStream vSend; //发送消息
        CDataStream vRecv; //接收消息
        uint32_t nHeaderStart; //头信息开始
        uint32_t nMessageStart; 
        int nVersion; //版本信息
        std::string strSubVer; 
        int nStartingHeight; //起始高度
        std::vector<CAddress> *vAddr; //ip地址(网络上节点的连接信息)
        int ban; 
        int64_t doneAfter; 
        CAddress you;
    };
    

    在上述定义中,最主要的是 std::vector<CAddress> *vAddr; 它包含了连接的所有节点,如果有节点连接进来,就加入到这个vector中;如果某个节点断开连接,就从这个vector中删除。

    net.h中,对CNode进行了详细的定义(所有关于节点的信息,都进行了详细罗列),由于篇幅较长,只罗列其中的一些关键结构:

    /** Information about a peer */
    class CNode {
        friend class CConnman;
    
    public:
        SOCKET hSocket; //连接的socket句柄
        size_t nSendSize; //所有vSendMsg条目的总大小。
        size_t nSendOffset; //已经发送的第一个vSendMsg内的偏移量。
        std::deque<std::vector<uint8_t>> vSendMsg;//发送消息的数组
         ...
        const CAddress addr;//节点地址信息
        std::atomic<int> nVersion;//版本信息
        CBloomFilter *pfilter;//海量过滤器
        const NodeId id;//节点ID
    
    protected:
        mapMsgCmdSize mapSendBytesPerMsgCmd;
        mapMsgCmdSize mapRecvBytesPerMsgCmd;
    
    public:
        uint256 hashContinue;
        std::atomic<int> nStartingHeight;
    private:
        std::list<CNetMessage> vRecvMsg;//接收消息的数组
    public:
        //用来解析接收到的消息数据
        bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool &complete);
    
        //用来设置接收版本
        void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; }
        int GetRecvVersion() { return nRecvVersion; }
        void SetSendVersion(int nVersionIn);
        int GetSendVersion() const;
    
        //用来发送地址
        void PushAddress(const CAddress &_addr, FastRandomContext &insecure_rand) {
            // Known checking here is only to save space from duplicates.
            // SendMessages will filter it again for knowns that were added
            // after addresses were pushed.
            if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) {
                if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) {
                    vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] =
                        _addr;
                } else {
                    vAddrToSend.push_back(_addr);
                }
            }
        }
    
        //用来发送inventory消息
        void PushInventory(const CInv &inv) {
            LOCK(cs_inventory);
            if (inv.type == MSG_TX) {
                if (!filterInventoryKnown.contains(inv.hash)) {
                    setInventoryTxToSend.insert(inv.hash);
                }
            } else if (inv.type == MSG_BLOCK) {
                vInventoryBlockToSend.push_back(inv.hash);
            }
        }
    
        void PushBlockHash(const uint256 &hash) {
            LOCK(cs_inventory);
            vBlockHashesToAnnounce.push_back(hash);
        }
    };
    

    发送消息

    CDataStream这个类主要是包装了一个带有双向缓冲区的接口, 它重载了 >> 和 <<,使用上述序列化读取和写入未格式化的数据模板,以线性时间填充数据;

    class CDataStream {
    protected:
        typedef CSerializeData vector_type;
        vector_type vch;
        unsigned int nReadPos;
    
        int nType;
        int nVersion;
    
    public:
        template <typename T> CDataStream &operator<<(const T &obj) {
            // Serialize to this stream
            ::Serialize(*this, obj);
            return (*this);
        }
    
        template <typename T> CDataStream &operator>>(T &obj) {
            // Unserialize from this stream
            ::Unserialize(*this, obj);
            return (*this);
        }
        
        void read(char *pch, size_t nSize) {
            if (nSize == 0) {
                return;
            }
    
            // Read from the beginning of the buffer
            unsigned int nReadPosNext = nReadPos + nSize;
            if (nReadPosNext >= vch.size()) {
                if (nReadPosNext > vch.size()) {
                    throw std::ios_base::failure(
                        "CDataStream::read(): end of data");
                }
                memcpy(pch, &vch[nReadPos], nSize);
                nReadPos = 0;
                vch.clear();
                return;
            }
            memcpy(pch, &vch[nReadPos], nSize);
            nReadPos = nReadPosNext;
        }
    
        void write(const char *pch, size_t nSize) {
            // Write to the end of the buffer
            vch.insert(vch.end(), pch, pch + nSize);
        }
    

    所以,当我们需要发送消息时,首先会把数据放到CDataStream的数据流中,构造好完整的消息,但此时的消息格式是网络无法识别的,下一步,将构造好的消息放入到CSerializeData(类似一个消息队列)进行序列化,序列化之后,我们就可以把消息放到SocketSendData中发送出去。

    CSerializeData 的格式如下:

    // Byte-vector that clears its contents before deletion.
    typedef std::vector<char, zero_after_free_allocator<char>> CSerializeData;
    

    SocketSendData 的定义如下:

    size_t CConnman::SocketSendData(CNode *pnode) const {
        AssertLockHeld(pnode->cs_vSend);
        size_t nSentSize = 0;
        size_t nMsgCount = 0;
    
        for (const auto &data : pnode->vSendMsg) {
            assert(data.size() > pnode->nSendOffset);
            int nBytes = 0;
             ...       
        }
        pnode->vSendMsg.erase(pnode->vSendMsg.begin(),
                              pnode->vSendMsg.begin() + nMsgCount);
        if (pnode->vSendMsg.empty()) {
            assert(pnode->nSendOffset == 0);
            assert(pnode->nSendSize == 0);
        }
        return nSentSize;
    }
    

    接收消息

    接收消息的工作,主要是由 ThreadSocketHandler 来完成的,

    if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) {
         pnode->CloseSocketDisconnect();
    }
    

    随后,通过 ReceiveMsgBytes 把从其它节点接收到的数据解析为单个数据,然后放回到消息队列,最后由ThreadMessageHandler来进行最后的处理。

    ReceiveMsgBytes解析数据的主要流程如下,调用的是CNetMessage下的readHeader和readData方法,随后,使用complete()进行一次判定,看解析是否完成:

    // Absorb network data.
            int handled;
            if (!msg.in_data) {
                handled = msg.readHeader(pch, nBytes);
            } else {
                handled = msg.readData(pch, nBytes);
            }
    

    readHeader

    readHeader 主要用来解析消息头,由上一篇文章我们能够知道,一个消息头,至少24字节,如果小于24字节直接退出,如果满足这个条件,先把接收到的数据的开始部分复制到消息头数据流中(hdrbuf),再反格式化成消息头(hdr)。消息数据最大为MAX_SIZE(0x02000000),如果大于这个值,证明出错,直接退出。

    int CNetMessage::readHeader(const char *pch, unsigned int nBytes) {
        // copy data to temporary parsing buffer
        unsigned int nRemaining = 24 - nHdrPos;
        unsigned int nCopy = std::min(nRemaining, nBytes);
    
        memcpy(&hdrbuf[nHdrPos], pch, nCopy);
        nHdrPos += nCopy;
    
        // if header incomplete, exit
        if (nHdrPos < 24) {
            return nCopy;
        }
    
        // deserialize to CMessageHeader
        try {
            hdrbuf >> hdr;
        } catch (const std::exception &) {
            return -1;
        }
    
        // reject messages larger than MAX_SIZE
        if (hdr.nMessageSize > MAX_SIZE) {
            return -1;
        }
    
        // switch state to reading message data
        in_data = true;
    
        return nCopy;
    }
    

    readData

    readData 主要用来解析消息体,消息的数据部分复制到消息数据流中(vRecv)来处理,如果 vRecv 的空间不够,会进行扩容,但最多分配256 KB,不能超过总消息大小。

    int CNetMessage::readData(const char *pch, unsigned int nBytes) {
        unsigned int nRemaining = hdr.nMessageSize - nDataPos;
        unsigned int nCopy = std::min(nRemaining, nBytes);
    
        if (vRecv.size() < nDataPos + nCopy) {
            // Allocate up to 256 KiB ahead, but never more than the total message
            // size.
            vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
        }
    
        hasher.Write((const uint8_t *)pch, nCopy);
        memcpy(&vRecv[nDataPos], pch, nCopy);
        nDataPos += nCopy;
    
        return nCopy;
    }
    

    缓冲区

    在 net.h 文件中,我们能够看到如下定义:

    //接收消息缓冲区
    static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
    //发送消息缓冲区
    static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
    

    我们将接收或者发送的数据放入到缓冲区,我们可以通过如下函数,分别对他们调用,加速我们的处理过程:

    unsigned int CConnman::GetReceiveFloodSize() const {
        return nReceiveFloodSize;
    }
    unsigned int CConnman::GetSendBufferSize() const {
        return nSendBufferMaxSize;
    }
    

    相关文章

      网友评论

        本文标题:比特币源码分析-网络(二)

        本文链接:https://www.haomeiwen.com/subject/ddfmfftx.html