onPeerBlockBodies()
BlockChainSync::requestBlocks()
请求区块体后,如果对方有这些区块就会把数据返回回来,本节我们来看看接收区块体数据的处理。
数据包辗转从Session
到EthereumPeer
,再到EthereumPeerObserver
,最后到BlockChainSync::onPeerBlockBodies
中。
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
size_t itemCount = _r.itemCount();
LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
<< (itemCount ? "" : ": NoMoreBodies");
clearPeerDownload(_peer);
if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
LOG(m_logger) << "Ignoring unexpected blocks";
return;
}
if (m_state == SyncState::Waiting)
{
LOG(m_loggerDetail) << "Ignored blocks while waiting";
return;
}
if (itemCount == 0)
{
LOG(m_loggerDetail) << "Peer does not have the blocks requested";
_peer->addRating(-1);
}
这个函数比onPeerBlockHeaders()
简单多了,开头仍然主要是对SyncState
做检查。
接着是遍历校验接收到的区块体:
for (unsigned i = 0; i < itemCount; i++)
{
RLP body(_r[i]);
auto txList = body[0];
h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
h256 uncles = sha3(body[1].data());
HeaderId id { transactionRoot, uncles };
auto iter = m_headerIdToNumber.find(id);
if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
{
LOG(m_loggerDetail) << "Ignored unknown block body";
continue;
}
unsigned blockNumber = iter->second;
if (haveItem(m_bodies, blockNumber))
{
LOG(m_logger) << "Skipping already downloaded block body " << blockNumber;
continue;
}
m_headerIdToNumber.erase(id);
mergeInto(m_bodies, blockNumber, body.data().toBytes());
}
校验过程也比较简单,主要是从区块体数据中重新计算transactionRoot
和uncles
值,和m_headers
中对应块头里记录的值做比较,如果一样则mergeInto
到m_bodies
里。
最后仍然是:
collectBlocks();
continueSync();
但是这次我们需要深入collectBlocks()
这个函数里去看看了,因为这次有了区块头和区块体,只要条件允许就可以进行合并操作了。
collectBlocks()
BlockChainSync::collectBlocks()
函数开头就提出合并所需要的两组条件。
第一组条件是:
if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
return;
第一组包含三个条件,缺一不可。
第二组条件是:
auto& headers = *m_headers.begin();
auto& bodies = *m_bodies.begin();
if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
return;
这里的headers
是m_headers
中第一个连续区域,bodies
是m_bodies
中第一个连续区域。那么这里的两个条件是headers
中最低区块号必须和bodies
中最低区块号相同,并且这个区块号就是所需要同步的下一个区块。
满足这两个条件就可以进入正式的合并流程了,在BlockChainSync
里的部分其实并不多:
for (; i < headers.second.size() && i < bodies.second.size(); i++)
{
RLPStream blockStream(3);
blockStream.appendRaw(headers.second[i].data);
RLP body(bodies.second[i]);
blockStream.appendRaw(body[0].data());
blockStream.appendRaw(body[1].data());
bytes block;
blockStream.swapOut(block);
switch (host().bq().import(&block))
{
// ...
}
}
假如headers
里的区块是[区块3,区块4,区块5,区块6],bodies
里的区块是[区块3,区块4],那么这里的遍历范围是[区块3,区块4]。将区块头和区块体合并起来以后放到RLPStream
中,并调用BlockQueue::import()
函数导入二级缓冲区中,BlockQueue::import()
函数的实现在以后BlockQueue
类里再细说,这里主要看BlockChainSync
类里的流程。
调用完BlockQueue::import()
后根据返回值做不同处理,这段就不分析了,可以直接去看源码。
auto newHeaders = std::move(headers.second);
newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
unsigned newHeaderHead = headers.first + i;
auto newBodies = std::move(bodies.second);
newBodies.erase(newBodies.begin(), newBodies.begin() + i);
unsigned newBodiesHead = bodies.first + i;
m_headers.erase(m_headers.begin());
m_bodies.erase(m_bodies.begin());
if (!newHeaders.empty())
m_headers[newHeaderHead] = newHeaders;
if (!newBodies.empty())
m_bodies[newBodiesHead] = newBodies;
导入二级缓冲区后,将导入成功的区块从headers
和bodies
中删除,并重设m_headers
和m_bodies
中的最低连续区域。
网友评论