美文网首页区块链技术
比特币源码分析-网络(三)

比特币源码分析-网络(三)

作者: wolf4j | 来源:发表于2018-03-11 17:27 被阅读234次

    前两篇文章主要从整体逻辑上对代码进行了梳理,这篇文章将主要讲述网络模块主要的函数,以及其具体实现。

    监听连接:ThreadSocketHandler

    断开没有使用的节点,首先遍历节点数组vNodesCopy,如果节点标识断开连接(fDisconnect),或者没有任何引用、发送接收消息,则移除节点,关闭socket,添加到断开连接节点数组 (vNodesDisconnected)

    // Disconnect unused nodes
    std::vector<CNode *> vNodesCopy = vNodes;
       for (CNode *pnode : vNodesCopy) {
           if (pnode->fDisconnect) {
              // remove from vNodes
                vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode),
                             vNodes.end());
    
              // release outbound grant (if any)
              pnode->grantOutbound.Release();
    
              // close socket and cleanup
              pnode->CloseSocketDisconnect();
    
              // hold in disconnected pool until all refs are released
              pnode->Release();
              vNodesDisconnected.push_back(pnode);
           }
       }
    

    遍历断开连接节点数组,当节点没有引用,且能获取节点的相关锁,则移除节点,删除此节点。

    // Delete disconnected nodes
    std::list<CNode *> vNodesDisconnectedCopy = vNodesDisconnected;
       for (CNode *pnode : vNodesDisconnectedCopy) {
          // wait until threads are done using it
          if (pnode->GetRefCount() <= 0) {
             bool fDelete = false;
             {
                 TRY_LOCK(pnode->cs_inventory, lockInv);
                 if (lockInv) {
                    TRY_LOCK(pnode->cs_vSend, lockSend);
                    if (lockSend) {
                        fDelete = true;
                    }
                 }
             }
             if (fDelete) {
                 vNodesDisconnected.remove(pnode);
                 DeleteNode(pnode);
             }
         }
      }
    

    重新设置节点数量。

    size_t vNodesSize;
    {
        LOCK(cs_vNodes);
        vNodesSize = vNodes.size();
    }
    if (vNodesSize != nPrevNodeCount) {
        nPrevNodeCount = vNodesSize;
        if (clientInterface) {
           clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
        }
    }
    

    遍历监听SOCKET数组(vhListenSocket)、节点数组,为每个节点的socket 设置发送、接收fd_set

    for (const ListenSocket &hListenSocket : vhListenSocket) {
        FD_SET(hListenSocket.socket, &fdsetRecv);
        hSocketMax = std::max(hSocketMax, hListenSocket.socket);
        have_fds = true;
    }
    

    调用select函数监听socket。

    int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv,
                 &fdsetSend, &fdsetError, &timeout);
    

    遍历监听socket数组,当接收到数据,且socket有效时,接受连接,新建节点,添加到节点数组

    for (const ListenSocket &hListenSocket : vhListenSocket) {
        if (hListenSocket.socket != INVALID_SOCKET &&
            FD_ISSET(hListenSocket.socket, &fdsetRecv)) {
            AcceptConnection(hListenSocket);
        }
    }
    

    遍历节点数组,增加节点的引用次数。

    std::vector<CNode *> vNodesCopy;
    {
       LOCK(cs_vNodes);
       vNodesCopy = vNodes;
       for (CNode *pnode : vNodesCopy) {
           pnode->AddRef();
       }
    }
    

    遍历节点数组,接收网络数据,解析成消息,添加到节点的接收消息数组 (vRecvMsg)。当发送集合有数据时,把节点的发送消息(vSendMsg)发送出去:

    for (CNode *pnode : vNodesCopy) {                                          
        if (interruptNet) {                                                    
            return;                                                            
        }                                                                                                                                         
        // Receive                                                                                                                                 
        bool recvSet = false;                                                  
        bool sendSet = false;                                                  
        bool errorSet = false;                                                 
        {                                                                      
            LOCK(pnode->cs_hSocket);                                           
            if (pnode->hSocket == INVALID_SOCKET) {                            
                continue;                                                      
            }                                                                  
            recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);                    
            sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);                    
            errorSet = FD_ISSET(pnode->hSocket, &fdsetError);                  
        }                                                                      
        if (recvSet || errorSet) {                                             
            ....                                                        
        }                                                                                                                                         
        // Send                                                                                                                                    
        if (sendSet) {                                                         
            LOCK(pnode->cs_vSend);                                             
            size_t nBytes = SocketSendData(pnode);                             
            if (nBytes) {                                                      
                RecordBytesSent(nBytes);                                       
            }                                                                  
        }                                                                      
    

    满足以下几点是不活跃的socke连接。

    • 没有任何发送、接收消息(nLastSend、nLastRecv)。
    • 距离上次发送消息超过了90分钟(nLastSend),且距离上次把全部消息发送
      出去也超过了90分钟(nLastSendEmpty)。
    • 距离上次接收消息超过了90分钟(nLastRecv)
    int64_t nTime = GetSystemTimeInSeconds();                              
    if (nTime - pnode->nTimeConnected > 60) {                              
        if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) {              
            LogPrint("net", "socket no message in first 60 seconds, %d "   
                            "%d from %d\n",                                
                     pnode->nLastRecv != 0, pnode->nLastSend != 0,         
                     pnode->id);                                           
            pnode->fDisconnect = true;                                     
        } else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) {          
            LogPrintf("socket sending timeout: %is\n",                     
                      nTime - pnode->nLastSend);                           
            pnode->fDisconnect = true;                                     
        } else if (nTime - pnode->nLastRecv >                              
                   (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL   
                                                      : 90 * 60)) {        
            LogPrintf("socket receive timeout: %is\n",                     
                      nTime - pnode->nLastRecv);                           
            pnode->fDisconnect = true;                                     
        } else if (pnode->nPingNonceSent &&                                
                   pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 <    
                       GetTimeMicros()) {                                  
            LogPrintf("ping timeout: %fs\n",                               
                      0.000001 *                                           
                          (GetTimeMicros() - pnode->nPingUsecStart));      
            pnode->fDisconnect = true;                                     
        } else if (!pnode->fSuccessfullyConnected) {                       
            LogPrintf("version handshake timeout from %d\n", pnode->id);   
            pnode->fDisconnect = true;                                     
        }                                                                  
    }                                                                      
    

    发送接收消息:ThreadMessageHandler

    此线程为无限循环,每隔0.1秒循环一次:

    while (!flagInterruptMsgProc) {
    }
    

    遍历节点数组,增加节点引用次数,检查节点是否需要同步。

    std::vector<CNode *> vNodesCopy;      
    {                                     
        LOCK(cs_vNodes);                  
        vNodesCopy = vNodes;              
        for (CNode *pnode : vNodesCopy) { 
            pnode->AddRef();              
        }                                 
    }                                     
    

    遍历节点数组, 处理接收的消息,发送消息。遍历节点数组,减少节点引用次数:

    for (CNode *pnode : vNodesCopy) { 
    }
    

    接收消息

    节点接收消息后,循环遍历节点的消息缓冲区,解析消息头、数据,校验消息的有效性,再处理消息数据,处理过程中如果发生异常,则节点发送拒绝消息,此节点停止发送、接收消息。
    消息的有效性检查主要检查以下4项:

    1. 接收到的消息是否完整。
      类CNetMessage的complete()函数校验完整性,in_data为TRUE,且消
      息大小与消息数据偏移相等,则消息是完整的。
    2. 消息头开始字符串是否与环境参数中的字符串相同。
    3. 消息头是否有效。
    4. 消息头校验和是否正确。

    重新计算数据的校验和,检验与消息头中的校验和是否相等。

    if (pnode->fDisconnect) {                                
        continue;                                            
    }                                                        
                                                             
    // Receive messages                                                                                           
    bool fMoreNodeWork = GetNodeSignals().ProcessMessages(   
        *config, pnode, *this, flagInterruptMsgProc);        
    fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);      
    if (flagInterruptMsgProc) {                              
        return;                                              
    }                                                        
    

    发送消息

    {                                                        
        LOCK(pnode->cs_sendProcessing);                      
        GetNodeSignals().SendMessages(*config, pnode, *this, 
                                      flagInterruptMsgProc); 
    }                                                        
    if (flagInterruptMsgProc) {                              
        return;                                              
    }                                                        
    

    调用这个函数:

    bool SendMessages(const Config &config, CNode *pto, CConnman &connman,
                      const std::atomic<bool> &interruptMsgProc) {
                      
         ...
         ...
    }
    

    发送消息时,检验5个命令是否需要发送,分别是:ping、addr、getblocks、 inv、getdata。

    当用户在RPC中请求ping命令(节点的fPingQueued为TRUE),或者距离上一次发送命令超过了30分钟,且发送消息队列为空,则发送ping命令。当节点的版本大于BIP0031_VERSION时,设置发送ping命令的开始时间,发送ping命令时带随机数,随机数保存在节点的nPingNonceSent中。低于此版本时,不设置开始时间,不带随机数。

    随机选择一个节点,发送addr消息发送节点的发送地址数组中的地址,同时把地址插入到已知地址集中,每次最多发送1000个地址。发送完毕后,清空节点的发送地址数组。

    当节点需要同步时,且不处于导入、重建索引,则发送getblocks消息获取区块。获取区块时,指定开始区块、结束区块。同时保存开始的区块索引到节点的 pindexLastGetBlocksBegin中,保存结束区块索引值到节点hashLastGetBlocksEnd中。当获取区块时,检查这2项,如果已经获取了最新的区块,则不再获取区块,避免重复发送getblocks消息。

    遍历节点的发送Inventory数组,发送inv消息。发送的Inventory必须时尚未处理的,即不在节点已知Inventory集中(setInventoryKnown),发送后添加到已知 Inventory集中。发送inv消息时最多发送1000个inventory。如果是随机节点,且 inventory的类型时MSG_TX,则只发送1/4的inventory,先计算随机值,随机值的低 2bit位为0的inventory发送出去,不为0的inventory保留在节点的发送inventory数组 (vInventoryToSend)中。

    如果节点存在当前时间早的延迟的inventory,则发送getdata消息。延迟的 inventory保存在节点的mapAskFor数组。发送的inventory必须是节点中没有的。每次发送getdata消息时最多发送1000个inventory。同时删除节点的mapAskFor中的一项。

    主动与外部节点建立连接:ThreadOpenConnections

    void CConnman::ThreadOpenConnections() {
        ...
    }
    

    此线程也是无限循环,每隔500毫秒连接一次。

    while (!interruptNet) {
     if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) {  
         return;                                                     
     }                                                               
    }
    

    连接地址时注意以下几点:

    • 不连接无效的、地址数组中的外部连接地址中已有的、本地的地址。
    • 最多尝试连接100个地址。
    • 不连接受限制的地址。
    • 尝试连接次数达到30次时,才尝试连接距离上一次尝试连接10分钟以内的地址。
    • 尝试连接次数达到50次时,才尝试连接地址端不是参数默认端的地址。
    while (!interruptNet) {                                                     
        CAddrInfo addr = addrman.Select(fFeeler);                                                          
        if (!addr.IsValid() || setConnected.count(addr.GetGroup()) ||           
            IsLocal(addr)) {                                                    
            break;                                                              
        }                                                                                                                                                                                                        
        nTries++;                                                               
        if (nTries > 100) {                                                     
            break;                                                              
        }                                                                                                                                               
        if (IsLimited(addr)) {                                                  
            continue;                                                           
        }                                                                                                                                                                                        
        if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) {        
            continue;                                                           
        }                                                                                                                                                 
        // only consider very recently tried nodes after 30 failed attempts     
        if (nANow - addr.nLastTry < 600 && nTries < 30) {                       
            continue;                                                           
        }                                                                                   
        if ((addr.nServices & nRelevantServices) != nRelevantServices &&        
            (nTries < 40 || nOutbound >= (nMaxOutbound >> 1))) {                
            continue;                                                           
        }                                                                                                                       
        if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50) {       
            continue;                                                           
        }                                                                                                                                                
        addrConnect = addr;                                                     
        break;                                                                  
    }                                                                           
                                                                                
    if (addrConnect.IsValid()) {                                                
                                                                                
        if (fFeeler) {                                                          
            // Add small amount of random noise before connection to avoid      
            // synchronization.                                                 
            int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);             
            if (!interruptNet.sleep_for(                                        
                    std::chrono::milliseconds(randsleep))) {                    
                return;                                                         
            }                                                                   
            LogPrint("net", "Making feeler connection to %s\n",                 
                     addrConnect.ToString());                                   
        }                                                                       
                                                                                
        OpenNetworkConnection(addrConnect,                                      
                              (int)setConnected.size() >=                       
                                  std::min(nMaxConnections - 1, 2),             
                              &grant, nullptr, false, fFeeler);                 
    }                                                                           
    

    获取主机ip:GetLocal

    节点启用时,获取主机名称、IP地址,添加到地址、服务对应的数组中(mapLocalHost)。获取本地地址,保存到节点的本地地址(addrLocal),且向节点网络广播地址(AdvertizeLocal)。这样获取的地址大于监听(LOCAL_IF)。

    bool GetLocal(CService &addr, const CNetAddr *paddrPeer) {                      
        if (!fListen) return false;                                                 
                                                                                    
        int nBestScore = -1;                                                        
        int nBestReachability = -1;                                                 
        {                                                                           
            LOCK(cs_mapLocalHost);                                                  
            for (std::map<CNetAddr, LocalServiceInfo>::iterator it =                
                     mapLocalHost.begin();                                          
                 it != mapLocalHost.end(); it++) {                                  
                int nScore = (*it).second.nScore;                                   
                int nReachability = (*it).first.GetReachabilityFrom(paddrPeer);     
                if (nReachability > nBestReachability ||                            
                    (nReachability == nBestReachability && nScore > nBestScore)) {  
                    addr = CService((*it).first, (*it).second.nPort);               
                    nBestReachability = nReachability;                              
                    nBestScore = nScore;                                            
                }                                                                   
            }                                                                       
        }                                                                           
        return nBestScore >= 0;                                                     
    }                                                                               
    

    系统定义了几种本地地址类型,优先采用值大的类型(GetLocal函数)

    enum {
        // unknown
        LOCAL_NONE,
        // address a local interface listens on
        LOCAL_IF,
        // address explicit bound to
        LOCAL_BIND,
        // address reported by UPnP
        LOCAL_UPNP,
        // address explicitly specified (-externalip=)
        LOCAL_MANUAL,
    
        LOCAL_MAX
    };
    

    本文由 copernicus 团队 冉小龙 编写,转载无需授权!

    相关文章

      网友评论

        本文标题:比特币源码分析-网络(三)

        本文链接:https://www.haomeiwen.com/subject/tiywfftx.html