我们再来深入了解一下Host类里节点和本节点是怎么交互的,在上一节可以看到节点到了Host类后,会调用Host::connect
来连接对方,我们可以看下connect()
函数实现代码:
void Host::connect(std::shared_ptr<Peer> const& _p)
{
// ...
bi::tcp::endpoint ep(_p->endpoint);
cnetdetails << "Attempting connection to node " << _p->id << "@" << ep << " from " << id();
auto socket = make_shared<RLPXSocket>(m_ioService);
socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
{
// ...
if (ec)
{
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
<< ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
}
else
{
cnetdetails << "Connecting to " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
{
Guard l(x_connecting);
m_connecting.push_back(handshake);
}
handshake->start();
}
m_pendingPeerConns.erase(nptr);
});
}
可以看到先是创建了一个socket,然后用async_connect()
异步去连接这个节点,连接成功后生成了一个RLPXHandshake
类,并调用了RLPXHandshake::start()
来开启握手流程,这里并没有连接成功后就传输数据,因为对方可能并不是一个ethereum节点,或者是运行协议不匹配的节点,握手流程就用来过滤掉不合格的节点,只有通过了握手流程才能进行数据交互。
注:在cpp-ethereum项目中底层数据传输用的是boost::asio库,作为准标准库中一员,boost::asio广泛应用在c++跨平台网络开发中,不熟悉的读者建议先去网络上阅读相关文档,后续文档假定读者已经了解了boost::asio库。
RLPXHandshake类
RLPXHandshake::start()
函数实际调用了RLPXHandshake::transition()
函数,这个函数是RLPXHandshake
类的核心,从中可以看到握手的流程。
void RLPXHandshake::transition(boost::system::error_code _ech)
{
// ...
if (m_nextState == New)
{
m_nextState = AckAuth;
if (m_originated)
writeAuth();
else
readAuth();
}
else if (m_nextState == AckAuth)
{
m_nextState = WriteHello;
if (m_originated)
readAck();
else
writeAck();
}
else if (m_nextState == AckAuthEIP8)
{
m_nextState = WriteHello;
if (m_originated)
readAck();
else
writeAckEIP8();
}
else if (m_nextState == WriteHello)
{
m_nextState = ReadHello;
// ...
}
else if (m_nextState == ReadHello)
{
// Authenticate and decrypt initial hello frame with initial RLPXFrameCoder
// and request m_host to start session.
m_nextState = StartSession;
// ...
}
}
精简后的流程还是比较清楚的,初始时候m_nextState
值为New
,那么正常的握手状态是New -> AckAuth -> WriteHello -> ReadHello -> StartSession
。如果这些环节中某一步出错了,那么该节点不会走到最后,否则最后的状态会变成StartSession
,那么到了StartSession
状态后会发生什么事呢?我们再看看看这部分代码:
else if (m_nextState == ReadHello)
{
// Authenticate and decrypt initial hello frame with initial RLPXFrameCoder
// and request m_host to start session.
m_nextState = StartSession;
// read frame header
unsigned const handshakeSize = 32;
m_handshakeInBuffer.resize(handshakeSize);
ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, handshakeSize), [this, self](boost::system::error_code ec, std::size_t)
{
if (ec)
transition(ec);
else
{
// ...
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(header.size() - 3 - h128::size); // this is always 32 - 3 - 16 = 13. wtf?
bytesConstRef(&header).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
m_handshakeInBuffer.resize(frameSize + ((16 - (frameSize % 16)) % 16) + h128::size);
ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, m_handshakeInBuffer.size()), [this, self, headerRLP](boost::system::error_code ec, std::size_t)
{
// ...
if (ec)
transition(ec);
else
{
// ...
try
{
RLP rlp(frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall);
m_host->startPeerSession(m_remote, rlp, move(m_io), m_socket);
}
catch (std::exception const& _e)
{
cnetlog << "Handshake causing an exception: " << _e.what();
m_nextState = Error;
transition();
}
}
});
}
});
}
当状态从ReadHello
向StartSession
转变时,连续收了两个包,然后调用了Host::startPeerSession()
,节点在RLPXHandshake
类转了一圈以后,如果合格的话又回到了Host
类中,从此开始新的征程。
Host类
我们之前看到Host
类通过requirePeer()
函数推动了P2P发现模块的运转,但同时它又是整个P2P传输模块中的发动机,因此要研究ethereum网络部分需要从这里开始。
我们在libp2p\Host.h
文件中找到Host
类定义,其中有两个成员变量,熟悉boost::asio库的读者一定不陌生:
ba::io_service m_ioService;
bi::tcp::acceptor m_tcp4Acceptor;
其中m_ioService
就是Host类的核心了,它负责处理异步任务,当异步任务完成后调用完成句柄。
m_tcp4Acceptor
是负责接收连接的对象,它内部封装了一个socket对象。我们都知道服务端的socket需要经过创建,绑定IP端口,侦听,Accept这几个阶段,对于m_tcp4Acceptor
而言也是这样:
- 创建
直接在Host
类初始化列表中进行创建
- 绑定IP端口和侦听
这部分是在Network::tcp4Listen()
函数中完成的:
for (unsigned i = 0; i < 2; ++i)
{
bi::tcp::endpoint endpoint(listenIP, requirePort ? _netPrefs.listenPort : (i ? 0 : c_defaultListenPort));
try
{
/// ...
_acceptor.open(endpoint.protocol());
_acceptor.set_option(ba::socket_base::reuse_address(reuse));
_acceptor.bind(endpoint);
_acceptor.listen();
return _acceptor.local_endpoint().port();
}
catch (...)
{
// bail if this is first attempt && port was specificed, or second attempt failed (random port)
if (i || requirePort)
{
// both attempts failed
cwarn << "Couldn't start accepting connections on host. Failed to accept socket on " << listenIP << ":" << _netPrefs.listenPort << ".\n" << boost::current_exception_diagnostic_information();
_acceptor.close();
return -1;
}
_acceptor.close();
continue;
}
}
注意到这里有一个循环,是用来防止端口被占用的。如果第一次端口被占用,则第二次使用0端口,也就是随机端口。
在这个函数里,_acceptor
依次完成了设置协议,设置端口重用,绑定端口和侦听。
- Accept
又回到了Host
类,在Host::runAcceptor()
函数中,我们能找到以下代码:
auto socket = make_shared<RLPXSocket>(m_ioService);
m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
{
// ...
try
{
// incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(this, socket);
m_connecting.push_back(handshake);
handshake->start();
success = true;
}
catch (Exception const& _e)
{
cwarn << "ERROR: " << diagnostic_information(_e);
}
catch (std::exception const& _e)
{
cwarn << "ERROR: " << _e.what();
}
if (!success)
socket->ref().close();
runAcceptor();
});
m_tcp4Acceptor
通过async_accept()
异步接收连接,当一个连接到来的时候发生了什么?我们又看到了熟悉的代码,是的!创建了一个RLPXHandshake类,又开始了握手流程。ethereum对于接收到的连接也是谨慎的,同样需要先进行校验,这里的握手流程与前面connect
时的流程稍有不同,区别就在RLPXHandshake::m_originated
上,connect
时的m_originated
值为true,也就是先向对方发送自己的Auth包,而被动接收时m_originated
为false,会等待对方发过来Auth包。
最后别忘了启动Host::m_ioService
,这部分被放在doWork()
函数里,还记得doWork()
函数吗?因为Host
类是从Worker
类继承而来,doWork()
会在一个循环中被调用。
void Host::doWork()
{
try
{
if (m_run)
m_ioService.run();
}
catch (std::exception const& _e)
{
// ...
}
}
但是doWork()
不是会被循环调用的吗?难道m_ioService.run()
也会重复调用吗?答案是不会,因为m_ioService.run()
会阻塞在这里,所以只会执行一次。
至此m_tcp4Acceptor
能够愉快地接收到TCP连接,并把连接交给RLPXHandshake
类去处理了。
网友评论