美文网首页
比特币bitcoin源码解析之整体架构和流程

比特币bitcoin源码解析之整体架构和流程

作者: 瑜骐 | 来源:发表于2018-04-26 22:09 被阅读0次

    1. 比特币简介

    比特币(BitCoin)的概念最初由中本聪在2009年提出,根据中本聪的思路设计发布的开源软件以及建构其上的P2P网络。比特币是一种P2P形式的数字货币。点对点的传输意味着一个去中心化的支付系统。

    与大多数货币不同,比特币不依靠特定货币机构发行,它依据特定算法,通过大量的计算产生,比特币经济使用整个P2P网络中众多节点构成的分布式数据库来确认并记录所有的交易行为,并使用密码学的设计来确保货币流通各个环节安全性。P2P的去中心化特性与算法本身可以确保无法通过大量制造比特币来人为操控币值。基于密码学的设计可以使比特币只能被真实的拥有者转移或支付。这同样确保了货币所有权与流通交易的匿名性。比特币与其他虚拟货币最大的不同,是其总数量非常有限,具有极强的稀缺性。该货币系统曾在4年内只有不超过1050万个,之后的总数量将被永久限制在2100万个。

    2. 比特币整体架构

    点对点网络,每一个网络中的节点即是Client又是Server,如下图所示: 整体P2P架构
    • 节点和节点之间通过发送消息命令来相互通信的,下图是对应的消息命令格式: 消息命令格式
      CMessageHeader对应的类图如下所示: CMessageHeader类图
    • 节点通信过程中使用的对应的消息命令如下表所示:
    命令 消息内容 说明
    version nVersion,nService,nTime,address 版本,服务标识,时间;地址
    addr vector<CAddress> 地址列表
    inv vector<CInv> 库存信息列表:将对应的库存发送消息增加到库存发送已知中
    getdata vector<CInv> 根据inv对应的type执行不同的处理,就是将对应的请求消息转化为待请求的命令放入到节点对应的发送消息缓存中
    getblocks CBlockLocator locator;uint256 hashStop; 根据locator定位区块在链上位置,从这个位置开始往后找(找对应的next),一直到对应块的block的hash等于hashStop为止,并将所有找到的区块发送库存setInventoryKnown2中等待被发送
    Tx CTransaction 将交易消息放入到对应的已知库存中,如果此交易能够被接受,则对此消息进行转播,并递归处理所有依赖这个交易对应的孤儿交易;如果交易不被接受,则将次交易放入到孤儿交易列表中mapOrphanTransactions和mapOrphanTransactionsByPrev中
    block CBlock 块消息:将接收的block放入对应的已知库存中,并对应这个块进行处理,并将此块从mapAlreadyAskedFor咨询中移除

    3. 比特币整体处理流程

    对应的整体流程如下图所示: 整体流程

    3.1 LoadAdreess方法

    LoadAdreess方法从地址­文件addr.dat中读取对应的地址信息,放入对应的全局内存对象mapAddresses中,其中addr.dat对应的结构如下:

    Key Value 说明 内存中存放对象
    addr CAddress 数据库中保存对应的地址新 map<vector<unsigned char>,CAddress>mapAddresses节点地址映射:key对应的是ip地址+端口,value是CAddress对象

    3.2 LoadBlockIndex方法

    LoadBlockIndex方法从块索引文件blkindex.dat中读取对应的块索引信息,放入对应的全局内存对象mapBlockIndex中,其中blkindex.dat对应的结构如下:

    Key Value 说明 内存中存放对象
    blockindex CDiskBlockIndex 块索引数据库 map<uint256, CBlockIndex*> mapBlockIndex块索引信息:其中key对应的block的hash值

    对应的源码如下:

    bool LoadBlockIndex(bool fAllowNew)
    {
        //
        // Load block index
        //
        CTxDB txdb("cr");
        if (!txdb.LoadBlockIndex())
            return false;
        txdb.Close();
    
        //
        // Init with genesis block
        //
        if (mapBlockIndex.empty())
        {
            if (!fAllowNew)
                return false;
    
    
            // Genesis Block:
            // GetHash()      = 0x000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f
            // hashMerkleRoot = 0x4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b
            // txNew.vin[0].scriptSig     = 486604799 4 0x736B6E616220726F662074756F6C69616220646E6F63657320666F206B6E697262206E6F20726F6C6C65636E61684320393030322F6E614A2F33302073656D695420656854
            // txNew.vout[0].nValue       = 5000000000
            // txNew.vout[0].scriptPubKey = 0x5F1DF16B2B704C8A578D0BBAF74D385CDE12C11EE50455F3C438EF4C3FBCF649B6DE611FEAE06279A60939E028A8D65C10B73071A6F16719274855FEB0FD8A6704 OP_CHECKSIG
            // block.nVersion = 1
            // block.nTime    = 1231006505
            // block.nBits    = 0x1d00ffff
            // block.nNonce   = 2083236893
            // CBlock(hash=000000000019d6, ver=1, hashPrevBlock=00000000000000, hashMerkleRoot=4a5e1e, nTime=1231006505, nBits=1d00ffff, nNonce=2083236893, vtx=1)
            //   CTransaction(hash=4a5e1e, ver=1, vin.size=1, vout.size=1, nLockTime=0)
            //     CTxIn(COutPoint(000000, -1), coinbase 04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73)
            //     CTxOut(nValue=50.00000000, scriptPubKey=0x5F1DF16B2B704C8A578D0B)
            //   vMerkleTree: 4a5e1e
    
            // Genesis block
            char* pszTimestamp = "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks";
            CTransaction txNew;
            txNew.vin.resize(1);
            txNew.vout.resize(1);
            txNew.vin[0].scriptSig     = CScript() << 486604799 << CBigNum(4) << vector<unsigned char>((unsigned char*)pszTimestamp, (unsigned char*)pszTimestamp + strlen(pszTimestamp));
            txNew.vout[0].nValue       = 50 * COIN;
            txNew.vout[0].scriptPubKey = CScript() << CBigNum("0x5F1DF16B2B704C8A578D0BBAF74D385CDE12C11EE50455F3C438EF4C3FBCF649B6DE611FEAE06279A60939E028A8D65C10B73071A6F16719274855FEB0FD8A6704") << OP_CHECKSIG;
            CBlock block;
            block.vtx.push_back(txNew);
            block.hashPrevBlock = 0;
            block.hashMerkleRoot = block.BuildMerkleTree();
            block.nVersion = 1;
            block.nTime    = 1231006505;
            block.nBits    = 0x1d00ffff;
            block.nNonce   = 2083236893;
    
                //// debug print, delete this later
                printf("%s\n", block.GetHash().ToString().c_str());
                printf("%s\n", block.hashMerkleRoot.ToString().c_str());
                printf("%s\n", hashGenesisBlock.ToString().c_str());
                txNew.vout[0].scriptPubKey.print();
                block.print();
                assert(block.hashMerkleRoot == uint256("0x4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b"));
    
            assert(block.GetHash() == hashGenesisBlock);
    
            // Start new block file
            unsigned int nFile;
            unsigned int nBlockPos;
            if (!block.WriteToDisk(!fClient, nFile, nBlockPos))
                return error("LoadBlockIndex() : writing genesis block to disk failed");
            if (!block.AddToBlockIndex(nFile, nBlockPos))
                return error("LoadBlockIndex() : genesis block not accepted");
        }
    
        return true;
    }
    

    3.3 LoadWallet方法

    LoadWallet方法从文件wallet.dat中读取对应的钱包交易信息和其他的配置信息,放入到不同的全局内存对象中,其中wallet.dat对应的格式(key-value结构)如下所示:

    Key Value 说明 内存中存放对象
    Type Key1 Key分为类型类型和key
    name 比特币地址 比特币地址对应名称 地址和名称之间映射 map<string,string> mapAddressBook地址和名称的映射,其中key为地址,value为名称
    Tx 交易hash值 钱包交易对象CWalletTx 交易hash与交易之间映射 map<uint256, CWalletTx>mapWalle钱包交易对应的map,其中key对应的钱包交易的hash值
    Key 公钥 私钥 公钥和私钥对应关系 map<vector<unsigned char>, CPrivKey> mapKeys公钥和私钥对应的映射关系,其中key为公钥,value为私钥;map<uint160, vector<unsigned char> > mapPubKeys公钥的hash值和公钥的关系,其中key为公钥的hash值,value为公钥
    defaultkey vector<unsigned char> vchDefaultKeyRet 默认key对应对应的值
    setting fGenerateBitcoins 是否产生比特币标记 是否挖矿标记 int fGenerateBitcoins
    setting nTransactionFee 交易手续费的值 交易手续费 int64 nTransactionFee
    setting addrIncoming CAddress对象 获得当前对应的外部地址,用于接收外部的连接 CAddress addrIncoming

    源码内容如下:

    //
    // CWalletDB
    //
    
    bool CWalletDB::LoadWallet(vector<unsigned char>& vchDefaultKeyRet)
    {
        vchDefaultKeyRet.clear();
    
        //// todo: shouldn't we catch exceptions and try to recover and continue?
        CRITICAL_BLOCK(cs_mapKeys)
        CRITICAL_BLOCK(cs_mapWallet)
        {
            // Get cursor
            Dbc* pcursor = GetCursor();
            if (!pcursor)
                return false;
    
            loop
            {
                // Read next record
                CDataStream ssKey;
                CDataStream ssValue;
                int ret = ReadAtCursor(pcursor, ssKey, ssValue);
                if (ret == DB_NOTFOUND)
                    break;
                else if (ret != 0)
                    return false;
    
                // Unserialize
                // Taking advantage of the fact that pair serialization
                // is just the two items serialized one after the other
                string strType;
                ssKey >> strType;
                if (strType == "name")
                {
                    string strAddress;
                    ssKey >> strAddress;
                    ssValue >> mapAddressBook[strAddress];
                }
                else if (strType == "tx")
                {
                    uint256 hash;
                    ssKey >> hash;
                    CWalletTx& wtx = mapWallet[hash];
                    ssValue >> wtx;
    
                    if (wtx.GetHash() != hash)
                        printf("Error in wallet.dat, hash mismatch\n");
    
                    //// debug print
                    //printf("LoadWallet  %s\n", wtx.GetHash().ToString().c_str());
                    //printf(" %12I64d  %s  %s  %s\n",
                    //    wtx.vout[0].nValue,
                    //    DateTimeStr(wtx.nTime).c_str(),
                    //    wtx.hashBlock.ToString().substr(0,14).c_str(),
                    //    wtx.mapValue["message"].c_str());
                }
                else if (strType == "key")
                {
                    vector<unsigned char> vchPubKey;
                    ssKey >> vchPubKey;
                    CPrivKey vchPrivKey;
                    ssValue >> vchPrivKey;
    
                    mapKeys[vchPubKey] = vchPrivKey;
                    mapPubKeys[Hash160(vchPubKey)] = vchPubKey;
                }
                else if (strType == "defaultkey")
                {
                    ssValue >> vchDefaultKeyRet;
                }
                else if (strType == "setting")  /// or settings or option or options or config?
                {
                    string strKey;
                    ssKey >> strKey;
                    if (strKey == "fGenerateBitcoins")  ssValue >> fGenerateBitcoins;
                    if (strKey == "nTransactionFee")    ssValue >> nTransactionFee;
                    if (strKey == "addrIncoming")       ssValue >> addrIncoming;
                }
            }
        }
    
        printf("fGenerateBitcoins = %d\n", fGenerateBitcoins);
        printf("nTransactionFee = %I64d\n", nTransactionFee);
        printf("addrIncoming = %s\n", addrIncoming.ToString().c_str());
    
        return true;
    }
    

    3.4节点通信线程

    启动了三个主要的线程,ThreadOpenConnections,ThreadSocketHandler和ThreadMessageHandler,这个三个线程对应的主要处理流程分别如下图所示:

    3.4.1线程ThreadOpenConnections

    线程ThreadOpenConnections对应的处理流程如下图所示: ThreadOpenConnections处理流程

    源码如下:

    // 对于每一个打开节点的链接,进行节点之间信息通信,获得节点对应的最新信息,比如节点对应的知道地址进行交换等
    void ThreadOpenConnections2(void* parg)
    {
        printf("ThreadOpenConnections started\n");
    
        // 初始化网络连接
        // Initiate network connections
        const int nMaxConnections = 15; // 最大连接数
        loop
        {
            // Wait
            vfThreadRunning[1] = false;
            Sleep(500);
            while (vNodes.size() >= nMaxConnections || vNodes.size() >= mapAddresses.size())
            {
                CheckForShutdown(1);
                Sleep(2000);
            }
            vfThreadRunning[1] = true;
            CheckForShutdown(1);
    
    
            // Ip对应的C类地址,相同的C类地址放在一起
            // Make a list of unique class C's
            unsigned char pchIPCMask[4] = { 0xff, 0xff, 0xff, 0x00 };
            unsigned int nIPCMask = *(unsigned int*)pchIPCMask;
            vector<unsigned int> vIPC;
            CRITICAL_BLOCK(cs_mapAddresses)
            {
                vIPC.reserve(mapAddresses.size());
                unsigned int nPrev = 0;
                // mapAddress已经进行排序了,默认是生效排序
                foreach(const PAIRTYPE(vector<unsigned char>, CAddress)& item, mapAddresses)
                {
                    const CAddress& addr = item.second;
                    if (!addr.IsIPv4())
                        continue;
    
                    // Taking advantage of mapAddresses being in sorted order,
                    // with IPs of the same class C grouped together.
                    unsigned int ipC = addr.ip & nIPCMask;
                    if (ipC != nPrev)
                        vIPC.push_back(nPrev = ipC);
                }
            }
    
            // IP选择的过程
            // The IP selection process is designed to limit vulnerability致命性 to address flooding.
            // Any class C (a.b.c.?) has an equal chance of being chosen, then an IP is
            // chosen within the class C.  An attacker may be able to allocate many IPs, but
            // they would normally be concentrated in blocks of class C's.  They can hog独占 the
            // attention within their class C, but not the whole IP address space overall.
            // A lone node in a class C will get as much attention as someone holding all 255
            // IPs in another class C.
            //
            bool fSuccess = false;
            int nLimit = vIPC.size();
            while (!fSuccess && nLimit-- > 0)
            {
                // Choose a random class C 随机获取一个C级别的地址
                unsigned int ipC = vIPC[GetRand(vIPC.size())];
    
                // Organize all addresses in the class C by IP
                map<unsigned int, vector<CAddress> > mapIP;
                CRITICAL_BLOCK(cs_mapAddresses)
                {
                    unsigned int nDelay = ((30 * 60) << vNodes.size());
                    if (nDelay > 8 * 60 * 60)
                        nDelay = 8 * 60 * 60;
                    /*
                    map::lower_bound(key):返回map中第一个大于或等于key的迭代器指针
                    map::upper_bound(key):返回map中第一个大于key的迭代器指针
                    */
                    for (map<vector<unsigned char>, CAddress>::iterator mi = mapAddresses.lower_bound(CAddress(ipC, 0).GetKey());
                         mi != mapAddresses.upper_bound(CAddress(ipC | ~nIPCMask, 0xffff).GetKey());
                         ++mi)
                    {
                        const CAddress& addr = (*mi).second;
                        unsigned int nRandomizer = (addr.nLastFailed * addr.ip * 7777U) % 20000;
                        // 当前时间 - 地址连接最新失败的时间 要大于对应节点重连的间隔时间
                        if (GetTime() - addr.nLastFailed > nDelay * nRandomizer / 10000)
                            mapIP[addr.ip].push_back(addr); //同一个地址区段不同地址: 同一个地址的不同端口,所有对应同一个ip会有多个地址
                    }
                }
                if (mapIP.empty())
                    break;
    
                // Choose a random IP in the class C
                map<unsigned int, vector<CAddress> >::iterator mi = mapIP.begin();
                boost::iterators::advance_adl_barrier::advance(mi, GetRand(mapIP.size())); // 将指针定位到随机位置
    
                // 遍历同一个ip对应的所有不同端口
                // Once we've chosen an IP, we'll try every given port before moving on
                foreach(const CAddress& addrConnect, (*mi).second)
                {
                    // ip不能是本地ip,且不能是非ipV4地址,对应的ip地址不在本地的节点列表中
                    if (addrConnect.ip == addrLocalHost.ip || !addrConnect.IsIPv4() || FindNode(addrConnect.ip))
                        continue;
                    // 链接对应地址信息的节点
                    CNode* pnode = ConnectNode(addrConnect);
                    if (!pnode)
                        continue;
                    pnode->fNetworkNode = true; //设置对应的节点为网络节点,是因为从对应的本地节点列表中没有查询到
    
                    // 如果本地主机地址能够进行路由,则需要广播我们的地址
                    if (addrLocalHost.IsRoutable())
                    {
                        // Advertise our address
                        vector<CAddress> vAddrToSend;
                        vAddrToSend.push_back(addrLocalHost);
                        pnode->PushMessage("addr", vAddrToSend); // 将消息推送出去放入vsend中,在消息处理线程中进行处理
                    }
    
                    // 从创建的节点获得尽可能多的地址信息,发送消息,在消息处理线程中进行处理
                    // Get as many addresses as we can
                    pnode->PushMessage("getaddr");
    
                    ////// should the one on the receiving end do this too?
                    // Subscribe our local subscription list
                    // 新建的节点要订阅我们本地主机订阅的对应通断
                    const unsigned int nHops = 0;
                    for (unsigned int nChannel = 0; nChannel < pnodeLocalHost->vfSubscribe.size(); nChannel++)
                        if (pnodeLocalHost->vfSubscribe[nChannel])
                            pnode->PushMessage("subscribe", nChannel, nHops);
    
                    fSuccess = true;
                    break;
                }
            }
        }
    }
    

    3.4.2线程ThreadSocketHandler

    线程ThreadSocketHandler对应的处理流程如下: ThreadSocketHandler处理流程

    源码如下:

    // socket 处理,parag对应的是本地节点开启的监听socket
    void ThreadSocketHandler2(void* parg)
    {
        printf("ThreadSocketHandler started\n");
        SOCKET hListenSocket = *(SOCKET*)parg; // 获得监听socket
        list<CNode*> vNodesDisconnected;
        int nPrevNodeCount = 0;
    
        loop
        {
            //
            // Disconnect nodes
            //
            CRITICAL_BLOCK(cs_vNodes)
            {
                // Disconnect duplicate connections 释放同一个ip重复链接对应的节点,可能是不同端口
                map<unsigned int, CNode*> mapFirst;
                foreach(CNode* pnode, vNodes)
                {
                    if (pnode->fDisconnect)
                        continue;
                    unsigned int ip = pnode->addr.ip;
                    // 本地主机ip地址对应的是0,所以所有的ip地址都应该大于这个ip
                    if (mapFirst.count(ip) && addrLocalHost.ip < ip)
                    {
                        // In case two nodes connect to each other at once,
                        // the lower ip disconnects its outbound connection
                        CNode* pnodeExtra = mapFirst[ip];
    
                        if (pnodeExtra->GetRefCount() > (pnodeExtra->fNetworkNode ? 1 : 0))
                            swap(pnodeExtra, pnode);
    
                        if (pnodeExtra->GetRefCount() <= (pnodeExtra->fNetworkNode ? 1 : 0))
                        {
                            printf("(%d nodes) disconnecting duplicate: %s\n", vNodes.size(), pnodeExtra->addr.ToString().c_str());
                            if (pnodeExtra->fNetworkNode && !pnode->fNetworkNode)
                            {
                                pnode->AddRef();
                                swap(pnodeExtra->fNetworkNode, pnode->fNetworkNode);
                                pnodeExtra->Release();
                            }
                            pnodeExtra->fDisconnect = true;
                        }
                    }
                    mapFirst[ip] = pnode;
                }
                // 断开不使用的节点
                // Disconnect unused nodes
                vector<CNode*> vNodesCopy = vNodes;
                foreach(CNode* pnode, vNodesCopy)
                {
                    // 节点准备释放链接,并且对应的接收和发送缓存区都是空
                    if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
                    {
                        // 从节点列表中移除
                        // remove from vNodes
                        vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
                        pnode->Disconnect();
    
                        // 将对应准备释放的节点放在对应的节点释放链接池中,等待对应节点的所有引用释放
                        // hold in disconnected pool until all refs are released
                        pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60); // 向后推迟5分钟
                        if (pnode->fNetworkNode)
                            pnode->Release();
                        vNodesDisconnected.push_back(pnode);
                    }
                }
    
                // 删除端口的链接的节点:删除的条件是对应节点的引用小于等于0
                // Delete disconnected nodes
                list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
                foreach(CNode* pnode, vNodesDisconnectedCopy)
                {
                    // wait until threads are done using it
                    if (pnode->GetRefCount() <= 0)
                    {
                        bool fDelete = false;
                        TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                         TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                          TRY_CRITICAL_BLOCK(pnode->cs_mapRequests)
                           TRY_CRITICAL_BLOCK(pnode->cs_inventory)
                            fDelete = true;
                        if (fDelete)
                        {
                            vNodesDisconnected.remove(pnode);
                            delete pnode;
                        }
                    }
                }
            }
            if (vNodes.size() != nPrevNodeCount)
            {
                nPrevNodeCount = vNodes.size(); // 记录前一次节点对应的数量
                MainFrameRepaint();
            }
    
    
            // 找出哪一个socket有数据要发送
            // Find which sockets have data to receive
            //
            struct timeval timeout;
            timeout.tv_sec  = 0;
            timeout.tv_usec = 50000; // frequency to poll pnode->vSend 咨询节点是否有数据要发送的频率
    
            struct fd_set fdsetRecv; // 记录所有节点对应的socket句柄和监听socket句柄
            struct fd_set fdsetSend; // 记录所有有待发送消息的节点对应的socket句柄
            FD_ZERO(&fdsetRecv);
            FD_ZERO(&fdsetSend);
            SOCKET hSocketMax = 0;
            FD_SET(hListenSocket, &fdsetRecv); // FD_SET将hListenSocket 放入fdsetRecv对应的数组的最后
            hSocketMax = max(hSocketMax, hListenSocket);
            CRITICAL_BLOCK(cs_vNodes)
            {
                foreach(CNode* pnode, vNodes)
                {
                    FD_SET(pnode->hSocket, &fdsetRecv);
                    hSocketMax = max(hSocketMax, pnode->hSocket); // 找出所有节点对应的socket的最大值,包括监听socket
                    TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                        if (!pnode->vSend.empty())
                            FD_SET(pnode->hSocket, &fdsetSend); // FD_SET 字段设置
                }
            }
    
            vfThreadRunning[0] = false;
            // 函数参考:https://blog.csdn.net/rootusers/article/details/43604729
            /*确定一个或多个套接口的状态,本函数用于确定一个或多个套接口的状态,对每一个套接口,调用者可查询它的可读性、可写性及错误状态信息,用fd_set结构来表示一组等待检查的套接口,在调用返回时,这个结构存有满足一定条件的套接口组的子集,并且select()返回满足条件的套接口的数目。
                简单来说select用来填充一组可用的socket句柄,当满足下列之一条件时:
                1.可以读取的sockets。当这些socket被返回时,在这些socket上执行recv/accept等操作不会产生阻塞;
                2.可以写入的sockets。当这些socket被返回时,在这些socket上执行send等不会产生阻塞;
                3.返回有错误的sockets。
                select()的机制中提供一fd_set的数据结构,实际上市一long类型的数组,每一个数组元素都能与一打开的文件句柄(或者是其他的socket句柄,文件命名管道等)建立联系,建立联系的工作实际上由程序员完成,当调用select()的时候,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程那一socket或文件可读。
            */
            int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, NULL, &timeout);
            vfThreadRunning[0] = true;
            CheckForShutdown(0);
            if (nSelect == SOCKET_ERROR)
            {
                int nErr = WSAGetLastError();
                printf("select failed: %d\n", nErr);
                for (int i = 0; i <= hSocketMax; i++)
                {
                    FD_SET(i, &fdsetRecv); // 所有的值设置一遍
                    FD_SET(i, &fdsetSend);
                }
                Sleep(timeout.tv_usec/1000);
            }
            // 随机增加种子:性能计数
            RandAddSeed();
    
            //// debug print
            //foreach(CNode* pnode, vNodes)
            //{
            //    printf("vRecv = %-5d ", pnode->vRecv.size());
            //    printf("vSend = %-5d    ", pnode->vSend.size());
            //}
            //printf("\n");
    
    
            // 如果fdsetRecv中有监听socket,则接收改监听socket对应的链接请求,并将链接请求设置为新的节点
            // Accept new connections
            // 判断发送缓冲区中是否有对应的socket,如果有则接收新的交易
            if (FD_ISSET(hListenSocket, &fdsetRecv))
            {
                struct sockaddr_in sockaddr;
                int len = sizeof(sockaddr);
                SOCKET hSocket = accept(hListenSocket, (struct sockaddr*)&sockaddr, &len); // 接收socket链接
                CAddress addr(sockaddr);
                if (hSocket == INVALID_SOCKET)
                {
                    if (WSAGetLastError() != WSAEWOULDBLOCK)
                        printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
                }
                else
                {
                    printf("accepted connection from %s\n", addr.ToString().c_str());
                    CNode* pnode = new CNode(hSocket, addr, true); // 有新的socket链接,则新建对应的节点,并将节点在加入本地节点列表中
                    pnode->AddRef();
                    CRITICAL_BLOCK(cs_vNodes)
                        vNodes.push_back(pnode);
                }
            }
    
    
            // 对每一个socket进行服务处理
            // Service each socket
            //
            vector<CNode*> vNodesCopy;
            CRITICAL_BLOCK(cs_vNodes)
                vNodesCopy = vNodes;
            foreach(CNode* pnode, vNodesCopy)
            {
                CheckForShutdown(0);
                SOCKET hSocket = pnode->hSocket; // 获取每一个节点对应的socket
    
                // 从节点对应的socket中读取对应的数据,将数据放入节点的接收缓冲区中
                // Receive
                //
                if (FD_ISSET(hSocket, &fdsetRecv))
                {
                    TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                    {
                        CDataStream& vRecv = pnode->vRecv;
                        unsigned int nPos = vRecv.size();
    
                        // typical socket buffer is 8K-64K
                        const unsigned int nBufSize = 0x10000;
                        vRecv.resize(nPos + nBufSize);// 调整接收缓冲区的大小
                        int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);// 从socket中接收对应的数据
                        vRecv.resize(nPos + max(nBytes, 0));
                        if (nBytes == 0)
                        {
                            // socket closed gracefully (socket优雅的关闭)
                            if (!pnode->fDisconnect)
                                printf("recv: socket closed\n");
                            pnode->fDisconnect = true;
                        }
                        else if (nBytes < 0)
                        {
                            // socket error
                            int nErr = WSAGetLastError();
                            if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
                            {
                                if (!pnode->fDisconnect)
                                    printf("recv failed: %d\n", nErr);
                                pnode->fDisconnect = true;
                            }
                        }
                    }
                }
    
                // 将节点对应的发送缓冲中的内容发送出去
                // Send
                //
                if (FD_ISSET(hSocket, &fdsetSend))
                {
                    TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                    {
                        CDataStream& vSend = pnode->vSend;
                        if (!vSend.empty())
                        {
                            int nBytes = send(hSocket, &vSend[0], vSend.size(), 0); // 从节点对应的发送缓冲区中发送数据出去
                            if (nBytes > 0)
                            {
                                vSend.erase(vSend.begin(), vSend.begin() + nBytes);// 从发送缓冲区中移除发送过的内容
                            }
                            else if (nBytes == 0)
                            {
                                if (pnode->ReadyToDisconnect())
                                    pnode->vSend.clear();
                            }
                            else
                            {
                                printf("send error %d\n", nBytes);
                                if (pnode->ReadyToDisconnect())
                                    pnode->vSend.clear();
                            }
                        }
                    }
                }
            }
            Sleep(10);
        }
    }
    

    3.4.3线程ThreadMessageHandler

    线程ThreadMessageHandler对应的处理流程: ThreadMessageHandler处理流程

    源码如下:

    // 消息处理线程
    void ThreadMessageHandler2(void* parg)
    {
        printf("ThreadMessageHandler started\n");
        SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
        loop
        {
            // 轮询链接的节点用于消息处理
            // Poll the connected nodes for messages
            vector<CNode*> vNodesCopy;
            CRITICAL_BLOCK(cs_vNodes)
                vNodesCopy = vNodes;
            // 对每一个节点进行消息处理:发送消息和接收消息
            foreach(CNode* pnode, vNodesCopy)
            {
                pnode->AddRef();
    
                // Receive messages
                TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                    ProcessMessages(pnode);
    
                // Send messages
                TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                    SendMessages(pnode);
    
                pnode->Release();
            }
    
            // Wait and allow messages to bunch up
            vfThreadRunning[2] = false;
            Sleep(100);
            vfThreadRunning[2] = true;
            CheckForShutdown(2);
        }
    }
    // 处理单个节点对应的消息:单个节点接收到的消息进行处理
    bool ProcessMessages(CNode* pfrom)
    {
        CDataStream& vRecv = pfrom->vRecv;
        if (vRecv.empty())
            return true;
        printf("ProcessMessages(%d bytes)\n", vRecv.size());
    
        // 同一个的消息格式
        // Message format
        //  (4) message start
        //  (12) command
        //  (4) size
        //  (x) data
        //
        // 消息头包含:message start;command;size;
    
        loop
        {
            // Scan for message start
            CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart));
            // 删除无效的消息: 就是在对应的消息开始前面还有一些信息
            if (vRecv.end() - pstart < sizeof(CMessageHeader))
            {
                if (vRecv.size() > sizeof(CMessageHeader))
                {
                    printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n");
                    vRecv.erase(vRecv.begin(), vRecv.end() - sizeof(CMessageHeader));
                }
                break;
            }
            if (pstart - vRecv.begin() > 0)
                printf("\n\nPROCESSMESSAGE SKIPPED %d BYTES\n\n", pstart - vRecv.begin());
            vRecv.erase(vRecv.begin(), pstart); // 移除消息开始信息和接收缓冲区开头之间
    
            // 读取消息头
            // Read header
            CMessageHeader hdr;
            vRecv >> hdr; // 指针已经偏移了
            if (!hdr.IsValid())
            {
                printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str());
                continue;
            }
            string strCommand = hdr.GetCommand();
    
            // Message size
            unsigned int nMessageSize = hdr.nMessageSize;
            if (nMessageSize > vRecv.size())
            {
                // Rewind and wait for rest of message
                ///// need a mechanism to give up waiting for overlong message size error
                printf("MESSAGE-BREAK 2\n");
                vRecv.insert(vRecv.begin(), BEGIN(hdr), END(hdr));
                Sleep(100);
                break;
            }
    
            // Copy message to its own buffer
            CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion);
            vRecv.ignore(nMessageSize);
    
            // Process message
            bool fRet = false;
            try
            {
                CheckForShutdown(2);
                CRITICAL_BLOCK(cs_main)
                    // 根据命令和消息内容进行消息处理
                    fRet = ProcessMessage(pfrom, strCommand, vMsg);
                CheckForShutdown(2);
            }
            CATCH_PRINT_EXCEPTION("ProcessMessage()")
            if (!fRet)
                printf("ProcessMessage(%s, %d bytes) from %s to %s FAILED\n", strCommand.c_str(), nMessageSize, pfrom->addr.ToString().c_str(), addrLocalHost.ToString().c_str());
        }
    
        vRecv.Compact();
        return true;
    }
    
    // 对节点pFrom处理命令strCommand对应的消息内容为vRecv
    bool ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
    {
        static map<unsigned int, vector<unsigned char> > mapReuseKey;
        printf("received: %-12s (%d bytes)  ", strCommand.c_str(), vRecv.size());
        // 仅仅输出前25个字符
        for (int i = 0; i < min(vRecv.size(), (unsigned int)25); i++)
            printf("%02x ", vRecv[i] & 0xff);
        printf("\n");
        // 消息采集频率进行处理
        if (nDropMessagesTest > 0 && GetRand(nDropMessagesTest) == 0)
        {
            printf("dropmessages DROPPING RECV MESSAGE\n");
            return true;
        }
    
        // 如果命令是版本:节点对应的版本
        if (strCommand == "version")
        {
            // 节点对应的版本只能更新一次,初始为0,后面进行更新
            // Can only do this once
            if (pfrom->nVersion != 0)
                return false;
    
            int64 nTime;
            CAddress addrMe; // 读取消息对应的内容
            vRecv >> pfrom->nVersion >> pfrom->nServices >> nTime >> addrMe;
            if (pfrom->nVersion == 0)
                return false;
            // 更新发送和接收缓冲区中的对应的版本
            pfrom->vSend.SetVersion(min(pfrom->nVersion, VERSION));
            pfrom->vRecv.SetVersion(min(pfrom->nVersion, VERSION));
    
            // 如果节点对应的服务类型是节点网络,则对应节点的客户端标记就是false
            pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
            if (pfrom->fClient)
            {
                // 如果不是节点网络,可能仅仅是一些节点不要保存对应的完整区块信息,仅仅需要区块的头部进行校验就可以了
                pfrom->vSend.nType |= SER_BLOCKHEADERONLY;
                pfrom->vRecv.nType |= SER_BLOCKHEADERONLY;
            }
            // 增加时间样本数据:没有什么用处,仅仅用于输出
            AddTimeData(pfrom->addr.ip, nTime);
    
            // 对第一个进来的节点请求block信息
            // Ask the first connected node for block updates
            static bool fAskedForBlocks;
            if (!fAskedForBlocks && !pfrom->fClient)
            {
                fAskedForBlocks = true;
                pfrom->PushMessage("getblocks", CBlockLocator(pindexBest), uint256(0));
            }
    
            printf("version addrMe = %s\n", addrMe.ToString().c_str());
        }
    
    
        else if (pfrom->nVersion == 0)
        {
            // 节点在处理任何消息之前一定有一个版本消息
            // Must have a version message before anything else
            return false;
        }
    
        // 地址消息
        else if (strCommand == "addr")
        {
            vector<CAddress> vAddr;
            vRecv >> vAddr;
    
            // Store the new addresses
            CAddrDB addrdb;
            foreach(const CAddress& addr, vAddr)
            {
                if (fShutdown)
                    return true;
                // 将地址增加到数据库中
                if (AddAddress(addrdb, addr))
                {
                    // Put on lists to send to other nodes
                    pfrom->setAddrKnown.insert(addr); // 将对应的地址插入到已知地址集合中
                    CRITICAL_BLOCK(cs_vNodes)
                        foreach(CNode* pnode, vNodes)
                            if (!pnode->setAddrKnown.count(addr))
                                pnode->vAddrToSend.push_back(addr);// 地址的广播
                }
            }
        }
    
        // 库存消息
        else if (strCommand == "inv")
        {
            vector<CInv> vInv;
            vRecv >> vInv;
    
            CTxDB txdb("r");
            foreach(const CInv& inv, vInv)
            {
                if (fShutdown)
                    return true;
                pfrom->AddInventoryKnown(inv); // 将对应的库存发送消息增加到库存发送已知中
    
                bool fAlreadyHave = AlreadyHave(txdb, inv);
                printf("  got inventory: %s  %s\n", inv.ToString().c_str(), fAlreadyHave ? "have" : "new");
    
                if (!fAlreadyHave)
                    pfrom->AskFor(inv);// 如果不存在,则请求咨询,这里会在线程中发送getdata消息
                else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash))
                    pfrom->PushMessage("getblocks", CBlockLocator(pindexBest), GetOrphanRoot(mapOrphanBlocks[inv.hash]));
            }
        }
    
        // 获取数据
        else if (strCommand == "getdata")
        {
            vector<CInv> vInv;
            vRecv >> vInv;
    
            foreach(const CInv& inv, vInv)
            {
                if (fShutdown)
                    return true;
                printf("received getdata for: %s\n", inv.ToString().c_str());
    
                if (inv.type == MSG_BLOCK)
                {
                    // Send block from disk
                    map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(inv.hash);
                    if (mi != mapBlockIndex.end())
                    {
                        //// could optimize this to send header straight from blockindex for client
                        CBlock block;
                        block.ReadFromDisk((*mi).second, !pfrom->fClient);
                        pfrom->PushMessage("block", block);// 获取数据对应的类型是block,则发送对应的块信息
                    }
                }
                else if (inv.IsKnownType())
                {
                    // Send stream from relay memory
                    CRITICAL_BLOCK(cs_mapRelay)
                    {
                        map<CInv, CDataStream>::iterator mi = mapRelay.find(inv); // 重新转播的内容
                        if (mi != mapRelay.end())
                            pfrom->PushMessage(inv.GetCommand(), (*mi).second);
                    }
                }
            }
        }
    
    
        else if (strCommand == "getblocks")
        {
            CBlockLocator locator;
            uint256 hashStop;
            vRecv >> locator >> hashStop;
    
            //找到本地有的且在主链上的
            // Find the first block the caller has in the main chain
            CBlockIndex* pindex = locator.GetBlockIndex();
    
            // 将匹配得到的块索引之后的所有在主链上的块发送出去
            // Send the rest of the chain
            if (pindex)
                pindex = pindex->pnext;
            printf("getblocks %d to %s\n", (pindex ? pindex->nHeight : -1), hashStop.ToString().substr(0,14).c_str());
            for (; pindex; pindex = pindex->pnext)
            {
                if (pindex->GetBlockHash() == hashStop)
                {
                    printf("  getblocks stopping at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString().substr(0,14).c_str());
                    break;
                }
    
                // Bypass setInventoryKnown in case an inventory message got lost
                CRITICAL_BLOCK(pfrom->cs_inventory)
                {
                    CInv inv(MSG_BLOCK, pindex->GetBlockHash());
                    // 判断在已知库存2中是否存在
                    // returns true if wasn't already contained in the set
                    if (pfrom->setInventoryKnown2.insert(inv).second)
                    {
                        pfrom->setInventoryKnown.erase(inv);
                        pfrom->vInventoryToSend.push_back(inv);// 插入对应的库存发送集合中准备发送,在另一个线程中进行发送,发送的消息为inv
                    }
                }
            }
        }
    
        // 交易命令
        else if (strCommand == "tx")
        {
            vector<uint256> vWorkQueue;
            CDataStream vMsg(vRecv);
            CTransaction tx;
            vRecv >> tx;
    
            CInv inv(MSG_TX, tx.GetHash());
            pfrom->AddInventoryKnown(inv);// 将交易消息放入到对应的已知库存中
    
            bool fMissingInputs = false;
            // 如果交易能够被接受
            if (tx.AcceptTransaction(true, &fMissingInputs))
            {
                AddToWalletIfMine(tx, NULL);
                RelayMessage(inv, vMsg);// 转播消息
                mapAlreadyAskedFor.erase(inv);
                vWorkQueue.push_back(inv.hash);
    
                // 递归处理所有依赖这个交易对应的孤儿交易
                // Recursively process any orphan transactions that depended on this one
                for (int i = 0; i < vWorkQueue.size(); i++)
                {
                    uint256 hashPrev = vWorkQueue[i];
                    for (multimap<uint256, CDataStream*>::iterator mi = mapOrphanTransactionsByPrev.lower_bound(hashPrev);
                         mi != mapOrphanTransactionsByPrev.upper_bound(hashPrev);
                         ++mi)
                    {
                        const CDataStream& vMsg = *((*mi).second);
                        CTransaction tx;
                        CDataStream(vMsg) >> tx;
                        CInv inv(MSG_TX, tx.GetHash());
    
                        if (tx.AcceptTransaction(true))
                        {
                            printf("   accepted orphan tx %s\n", inv.hash.ToString().substr(0,6).c_str());
                            AddToWalletIfMine(tx, NULL);
                            RelayMessage(inv, vMsg);
                            mapAlreadyAskedFor.erase(inv);
                            vWorkQueue.push_back(inv.hash);
                        }
                    }
                }
    
                foreach(uint256 hash, vWorkQueue)
                    EraseOrphanTx(hash);
            }
            else if (fMissingInputs)
            {
                printf("storing orphan tx %s\n", inv.hash.ToString().substr(0,6).c_str());
                AddOrphanTx(vMsg); // 如果交易当前不被接受则对应的孤儿交易
            }
        }
    
        else if (strCommand == "review")
        {
            CDataStream vMsg(vRecv);
            CReview review;
            vRecv >> review;
    
            CInv inv(MSG_REVIEW, review.GetHash());
            pfrom->AddInventoryKnown(inv);
    
            if (review.AcceptReview())
            {
                // Relay the original message as-is in case it's a higher version than we know how to parse
                RelayMessage(inv, vMsg);
                mapAlreadyAskedFor.erase(inv);
            }
        }
    
        else if (strCommand == "block")
        {
            auto_ptr<CBlock> pblock(new CBlock);
            vRecv >> *pblock;
    
            //// debug print
            printf("received block:\n"); pblock->print();
    
            CInv inv(MSG_BLOCK, pblock->GetHash());
            pfrom->AddInventoryKnown(inv);// 增加库存
    
            if (ProcessBlock(pfrom, pblock.release()))
                mapAlreadyAskedFor.erase(inv);
        }
    
        else if (strCommand == "getaddr")
        {
            pfrom->vAddrToSend.clear();
            //// need to expand the time range if not enough found
            int64 nSince = GetAdjustedTime() - 60 * 60; // in the last hour 往前推一个小时
            CRITICAL_BLOCK(cs_mapAddresses)
            {
                foreach(const PAIRTYPE(vector<unsigned char>, CAddress)& item, mapAddresses)
                {
                    if (fShutdown)
                        return true;
                    const CAddress& addr = item.second;
                    if (addr.nTime > nSince)
                        pfrom->vAddrToSend.push_back(addr);
                }
            }
        }
    
        else if (strCommand == "checkorder")
        {
            uint256 hashReply;
            CWalletTx order;
            vRecv >> hashReply >> order;
    
            /// we have a chance to check the order here
    
            // Keep giving the same key to the same ip until they use it
            if (!mapReuseKey.count(pfrom->addr.ip))
                mapReuseKey[pfrom->addr.ip] = GenerateNewKey();
    
            // Send back approval of order and pubkey to use
            CScript scriptPubKey;
            scriptPubKey << mapReuseKey[pfrom->addr.ip] << OP_CHECKSIG;
            pfrom->PushMessage("reply", hashReply, (int)0, scriptPubKey);
        }
    
        else if (strCommand == "submitorder")
        {
            uint256 hashReply;
            CWalletTx wtxNew;
            vRecv >> hashReply >> wtxNew;
    
            // Broadcast
            if (!wtxNew.AcceptWalletTransaction())
            {
                pfrom->PushMessage("reply", hashReply, (int)1);
                return error("submitorder AcceptWalletTransaction() failed, returning error 1");
            }
            wtxNew.fTimeReceivedIsTxTime = true;
            AddToWallet(wtxNew);
            wtxNew.RelayWalletTransaction();
            mapReuseKey.erase(pfrom->addr.ip);
    
            // Send back confirmation
            pfrom->PushMessage("reply", hashReply, (int)0);
        }
    
        else if (strCommand == "reply")
        {
            uint256 hashReply;
            vRecv >> hashReply;
    
            CRequestTracker tracker;
            CRITICAL_BLOCK(pfrom->cs_mapRequests)
            {
                map<uint256, CRequestTracker>::iterator mi = pfrom->mapRequests.find(hashReply);
                if (mi != pfrom->mapRequests.end())
                {
                    tracker = (*mi).second;
                    pfrom->mapRequests.erase(mi);
                }
            }
            if (!tracker.IsNull())
                tracker.fn(tracker.param1, vRecv);
        }
    
        else
        {
            // Ignore unknown commands for extensibility
            printf("ProcessMessage(%s) : Ignored unknown message\n", strCommand.c_str());
        }
    
        if (!vRecv.empty())
            printf("ProcessMessage(%s) : %d extra bytes\n", strCommand.c_str(), vRecv.size());
    
        return true;
    }
    
    // 处理节点对应的消息发送
    bool SendMessages(CNode* pto)
    {
        CheckForShutdown(2);
        CRITICAL_BLOCK(cs_main)
        {
            // Don't send anything until we get their version message
            if (pto->nVersion == 0)
                return true;
    
            // 消息发送的地址
            // Message: addr
            //
            vector<CAddress> vAddrToSend;
            vAddrToSend.reserve(pto->vAddrToSend.size());
            // 如果发送的地址不在已知地址的集合中,则将其放入临时地址发送数组中
            foreach(const CAddress& addr, pto->vAddrToSend)
                if (!pto->setAddrKnown.count(addr))
                    vAddrToSend.push_back(addr);
            // 清空地址发送的数组
            pto->vAddrToSend.clear();
            // 如果临时地址发送数组不为空,则进行地址的消息的发送
            if (!vAddrToSend.empty())
                pto->PushMessage("addr", vAddrToSend);
    
            // 库存消息处理
            // Message: inventory
            //
            vector<CInv> vInventoryToSend;
            CRITICAL_BLOCK(pto->cs_inventory)
            {
                vInventoryToSend.reserve(pto->vInventoryToSend.size());
                foreach(const CInv& inv, pto->vInventoryToSend)
                {
                    // returns true if wasn't already contained in the set
                    if (pto->setInventoryKnown.insert(inv).second)
                        vInventoryToSend.push_back(inv);
                }
                pto->vInventoryToSend.clear();
                pto->setInventoryKnown2.clear();
            }
            // 库存消息发送
            if (!vInventoryToSend.empty())
                pto->PushMessage("inv", vInventoryToSend);
    
            // getdata消息发送
            // Message: getdata
            //
            vector<CInv> vAskFor;
            int64 nNow = GetTime() * 1000000;
            CTxDB txdb("r");
            // 判断节点对应的请求消息map是否为空,且对应的请求map中的消息对应的最早请求时间是否小于当前时间
            while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
            {
                const CInv& inv = (*pto->mapAskFor.begin()).second;
                printf("sending getdata: %s\n", inv.ToString().c_str());
                if (!AlreadyHave(txdb, inv))
                    vAskFor.push_back(inv);// 不存在才需要进行消息发送
                pto->mapAskFor.erase(pto->mapAskFor.begin());// 请求消息处理完一条就删除一条
            }
            if (!vAskFor.empty())
                pto->PushMessage("getdata", vAskFor);
    
        }
        return true;
    }
    // 处理节点对应的消息发送
    bool SendMessages(CNode* pto)
    {
        CheckForShutdown(2);
        CRITICAL_BLOCK(cs_main)
        {
            // Don't send anything until we get their version message
            if (pto->nVersion == 0)
                return true;
    
            // 消息发送的地址
            // Message: addr
            //
            vector<CAddress> vAddrToSend;
            vAddrToSend.reserve(pto->vAddrToSend.size());
            // 如果发送的地址不在已知地址的集合中,则将其放入临时地址发送数组中
            foreach(const CAddress& addr, pto->vAddrToSend)
                if (!pto->setAddrKnown.count(addr))
                    vAddrToSend.push_back(addr);
            // 清空地址发送的数组
            pto->vAddrToSend.clear();
            // 如果临时地址发送数组不为空,则进行地址的消息的发送
            if (!vAddrToSend.empty())
                pto->PushMessage("addr", vAddrToSend);
    
            // 库存消息处理
            // Message: inventory
            //
            vector<CInv> vInventoryToSend;
            CRITICAL_BLOCK(pto->cs_inventory)
            {
                vInventoryToSend.reserve(pto->vInventoryToSend.size());
                foreach(const CInv& inv, pto->vInventoryToSend)
                {
                    // returns true if wasn't already contained in the set
                    if (pto->setInventoryKnown.insert(inv).second)
                        vInventoryToSend.push_back(inv);
                }
                pto->vInventoryToSend.clear();
                pto->setInventoryKnown2.clear();
            }
            // 库存消息发送
            if (!vInventoryToSend.empty())
                pto->PushMessage("inv", vInventoryToSend);
           // getdata消息发送
            // Message: getdata
            //
            vector<CInv> vAskFor;
            int64 nNow = GetTime() * 1000000;
            CTxDB txdb("r");
            // 判断节点对应的请求消息map是否为空,且对应的请求map中的消息对应的最早请求时间是否小于当前时间
            while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
            {
                const CInv& inv = (*pto->mapAskFor.begin()).second;
                printf("sending getdata: %s\n", inv.ToString().c_str());
                if (!AlreadyHave(txdb, inv))
                    vAskFor.push_back(inv);// 不存在才需要进行消息发送
                pto->mapAskFor.erase(pto->mapAskFor.begin());// 请求消息处理完一条就删除一条
            }
            if (!vAskFor.empty())
                pto->PushMessage("getdata", vAskFor);
    
        }
        return true;
    }
    

    3.5节点不同缓冲区

    节点对应不同缓冲区的处理关系如下图所示: 节点对应不同缓冲区

    3.6挖矿线程ThreadBitcoinMiner处理流程

    挖矿处理流程如下图所示:


    挖矿处理流程

    源码内容如下:

    // 节点挖矿
    bool BitcoinMiner()
    {
        printf("BitcoinMiner started\n");
        SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
    
        CKey key;
        key.MakeNewKey(); // 使用椭圆曲线算法获得一对公钥和私钥
        // 随机数从0开始
        CBigNum bnExtraNonce = 0;
        while (fGenerateBitcoins)
        {
            Sleep(50);
            CheckForShutdown(3);
            while (vNodes.empty())
            {
                Sleep(1000);
                CheckForShutdown(3);
            }
            unsigned int nTransactionsUpdatedLast = nTransactionsUpdated;
            CBlockIndex* pindexPrev = pindexBest;
            // 获取挖矿难度
            unsigned int nBits = GetNextWorkRequired(pindexPrev);
            // 创建币基交易
            // Create coinbase tx
            //
            CTransaction txNew;
            txNew.vin.resize(1);
            txNew.vin[0].prevout.SetNull();
            txNew.vin[0].scriptSig << nBits << ++bnExtraNonce;
            txNew.vout.resize(1);
            txNew.vout[0].scriptPubKey << key.GetPubKey() << OP_CHECKSIG;
            // 创建新的区块
            // Create new block
            //
            auto_ptr<CBlock> pblock(new CBlock());
            if (!pblock.get())
                return false;
    
            // 增加币基交易左右区块的第一个交易
            // Add our coinbase tx as first transaction
            pblock->vtx.push_back(txNew);
    
            // 收集最新的交易放入区块中
            // Collect the latest transactions into the block
            int64 nFees = 0;
            CRITICAL_BLOCK(cs_main)
            CRITICAL_BLOCK(cs_mapTransactions)
            {
                CTxDB txdb("r");
                map<uint256, CTxIndex> mapTestPool;
                vector<char> vfAlreadyAdded(mapTransactions.size());
                bool fFoundSomething = true;
                unsigned int nBlockSize = 0;
                // 外层循环是因为是多线程,可能刚开始对应的交易没有怎么多,则在等待交易,进行打包,只等待一轮,如果mapTransactions有很多交易则一起打包
                while (fFoundSomething && nBlockSize < MAX_SIZE/2)
                {
                    fFoundSomething = false;
                    unsigned int n = 0;
                    for (map<uint256, CTransaction>::iterator mi = mapTransactions.begin(); mi != mapTransactions.end(); ++mi, ++n)
                    {
                        if (vfAlreadyAdded[n])
                            continue;
                        CTransaction& tx = (*mi).second;
                        if (tx.IsCoinBase() || !tx.IsFinal())
                            continue;
    
                        // Transaction fee requirements, mainly only needed for flood control
                        // Under 10K (about 80 inputs) is free for first 100 transactions
                        // Base rate is 0.01 per KB
                        // 根据费用来判断每一个交易需要的最少费用
                        int64 nMinFee = tx.GetMinFee(pblock->vtx.size() < 100);
    
                        map<uint256, CTxIndex> mapTestPoolTmp(mapTestPool);
                        // 判断当前交易是否满足对应的最低费用要求,对应的nFees在ConnectInputs是进行累加的
                        if (!tx.ConnectInputs(txdb, mapTestPoolTmp, CDiskTxPos(1,1,1), 0, nFees, false, true, nMinFee))
                            continue;
                        swap(mapTestPool, mapTestPoolTmp);
    
                        pblock->vtx.push_back(tx);
                        nBlockSize += ::GetSerializeSize(tx, SER_NETWORK); // 将当前加入块的交易大小加入对应的块大小中
                        vfAlreadyAdded[n] = true;
                        fFoundSomething = true;
                    }
                }
            }
            pblock->nBits = nBits; // 设置对应的挖坑难度值
            pblock->vtx[0].vout[0].nValue = pblock->GetBlockValue(nFees); // 设置对应的块第一个交易对应的输出对应的值=奖励 + 交易费用
            printf("\n\nRunning BitcoinMiner with %d transactions in block\n", pblock->vtx.size());
            //
            // Prebuild hash buffer
            //
            struct unnamed1
            {
                struct unnamed2
                {
                    int nVersion;
                    uint256 hashPrevBlock;
                    uint256 hashMerkleRoot;
                    unsigned int nTime;
                    unsigned int nBits;
                    unsigned int nNonce;
                }
                block;
                unsigned char pchPadding0[64];
                uint256 hash1;
                unsigned char pchPadding1[64];
            }
            tmp;
    
            tmp.block.nVersion       = pblock->nVersion;
            tmp.block.hashPrevBlock  = pblock->hashPrevBlock  = (pindexPrev ? pindexPrev->GetBlockHash() : 0);
            tmp.block.hashMerkleRoot = pblock->hashMerkleRoot = pblock->BuildMerkleTree();
            // 取前11个区块对应的创建时间对应的中位数
            tmp.block.nTime          = pblock->nTime          = max((pindexPrev ? pindexPrev->GetMedianTimePast()+1 : 0), GetAdjustedTime());
            tmp.block.nBits          = pblock->nBits          = nBits;
            tmp.block.nNonce         = pblock->nNonce         = 1; // 随机数从1开始
    
            unsigned int nBlocks0 = FormatHashBlocks(&tmp.block, sizeof(tmp.block));
            unsigned int nBlocks1 = FormatHashBlocks(&tmp.hash1, sizeof(tmp.hash1));
    
    
            //
            // Search
            //
            unsigned int nStart = GetTime();
            uint256 hashTarget = CBigNum().SetCompact(pblock->nBits).getuint256(); // 根据难度系数值获取对应的hash目标值
            uint256 hash;
            loop
            {
                BlockSHA256(&tmp.block, nBlocks0, &tmp.hash1);
                BlockSHA256(&tmp.hash1, nBlocks1, &hash);
    
    
                // 挖矿成功
                if (hash <= hashTarget)
                {
                    pblock->nNonce = tmp.block.nNonce;
                    assert(hash == pblock->GetHash());
    
                        //// debug print
                        printf("BitcoinMiner:\n");
                        printf("proof-of-work found  \n  hash: %s  \ntarget: %s\n", hash.GetHex().c_str(), hashTarget.GetHex().c_str());
                        pblock->print();
    
                    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_NORMAL);
                    CRITICAL_BLOCK(cs_main)
                    {
                        // Save key
                        if (!AddKey(key))
                            return false;
                        key.MakeNewKey();
    
                        // Process this block the same as if we had received it from another node
                        if (!ProcessBlock(NULL, pblock.release()))
                            printf("ERROR in BitcoinMiner, ProcessBlock, block not accepted\n");
                    }
                    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
    
                    Sleep(500);
                    break;
                }
    
                // 更新区块创建时间,重新用于挖矿
                // Update nTime every few seconds
                if ((++tmp.block.nNonce & 0x3ffff) == 0)
                {
                    CheckForShutdown(3);
                    if (tmp.block.nNonce == 0)
                        break;
                    if (pindexPrev != pindexBest)
                        break;
                    if (nTransactionsUpdated != nTransactionsUpdatedLast && GetTime() - nStart > 60)
                        break;
                    if (!fGenerateBitcoins)
                        break;
                    tmp.block.nTime = pblock->nTime = max(pindexPrev->GetMedianTimePast()+1, GetAdjustedTime());
                }
            }
        }
    
        return true;
    }
    

    4. 源码地址

    我对比特币bitcoin-0.1.0源码加了详细的注释,对应的下载地址:https://github.com/lwjaiyjk/bitcoin-comment-0.1.0.git

    转载请说明出处

    相关文章

      网友评论

          本文标题:比特币bitcoin源码解析之整体架构和流程

          本文链接:https://www.haomeiwen.com/subject/litvlftx.html