这是我写的从头完整实现以太坊协议系列的第四部分(第一部分,第二部分,第三部分,如果你以前没看过,建议你从第一部分开始看)。等这个系列教程结束的时候,你就可以爬取以太坊网络获取对等端点,同步和验证区块链,为以太坊虚拟机编写智能合约,以及挖以太币。我们现在正在实现其发现协议部分。一旦完成,我们就可以用一种类似torrent的方式下载区块链。我们上一次完成了Ping
引导节点并解码和验证其Pong
响应。今天我们要实现FindNeighbors
请求和Neighbors
响应,我们将用它们爬取以太坊网络。
这一部分不难,我们简单地为FindNeighbors
和Neighbors
的数据包定义类结构,并像我们之前发送PingNode
和Pong
那样将它们发送即可。但是,想要成功发送FindNeighbors
数据包,还需要满足一些必备条件。我们并没有在协议文档中看到这些条件,是因为文档比源代码旧。go-ethereum源代码的发现协议采用v4
版本。但是RLPx协议(我们的实现)却只到v3
版本。源代码里甚至还有一个叫discv5的模块,表明它们正在实现v5
版本,不过,通过检查引导节点发回的Ping
消息的version
字段,我们发现它们跑的依然是v4
版本。
v4
版本的协议要求,为了获取FindNeighbors
请求的响应,必须先要有一次UDP"握手"。我们在udp.go源文件里面可以看到:
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
if t.db.node(fromID) == nil {
// No bond exists, we don't process the packet. This prevents
// an attack vector where the discovery protocol could be used
// to amplify traffic in a DDOS attack. A malicious actor
// would send a findnode request with the IP address and UDP
// port of the target as the source address. The recipient of
// the findnode packet would then send a neighbors packet
// (which is a much bigger packet than findnode) to the victim.
return errUnknownNode
}
为了处理findnode
数据包(FindNeighbors
的Go实现),代码首先检查请求来源fromID
是否在它的已知节点记录里面。如果不在,它就丢弃这个请求(难怪我之前的请求一直出问题,现在弄明白了)。
为了成为已知节点,首先我们必须ping
引导节点。当引导节点接收到ping
,它会先响应一个pong
,然后再发一个ping
,并等待我们响应一个pong
回去。一旦我们响应了pong
,我们的nodeID
就会进入引导节点的已知节点列表。
因此,为了能够发送FindNeighbors
数据包,首先,我们需要创建与PingNode
和Pong
数据包具有相同功能的FindNeighbors
和Neighbors
类。然后,我们需要在receive_ping
中加一个Pong
的响应以便跟引导节点UDP握手。接着,我们需要调整PingServer
使之能持续监听数据包。最后,我们需要调整send_ping.py
脚本:发送一个ping
,留足够的时间等待引导节点依次响应pong
和ping
,之后假设我们正确实现了pong
响应的话,就可以发送FindNeighbors
数据包并接收Neighbors
响应了。
到https://github.com/HuangFJ/pyeth下载本项目代码:
git clone https://github.com/HuangFJ/pyeth
创建FindNeighbors和Neighbors类
在此系列前一部分我们为PingNode
和Pong
创建了类,这一节,我们要以同样的方式为FindNeighbors
和Neighbors
创建Python类。我们为每个类都创建__init__
,__str__
,pack
,unpack
方法,并为PingServer
类添加receive_
的方法。
对于FindNeighbors
,规范描述的数据包结构是:
FindNeighbours packet-type: 0x03
struct FindNeighbours
{
NodeId target; // Id of a node. The responding node will send back nodes closest to the target.
uint32_t timestamp;
};
target
是一个NodeId
类型,它是一个64字节的公钥。这意味着我们可以在pack
和unpack
方法中存储和提取它。对于__str__
,我将使用binascii.b2a_hex
把字节打印成16进制格式。除此以外,其他代码跟我们在PingNode
和Pong
所见到的相似。所以,我们在discovery.py
编写:
class FindNeighbors(object):
packet_type = '\x03'
def __init__(self, target, timestamp):
self.target = target
self.timestamp = timestamp
def __str__(self):
return "(FN " + binascii.b2a_hex(self.target)[:7] + "... " + str(self.ti\
mestamp) + ")"
def pack(self):
return [
self.target,
struct.pack(">I", self.timestamp)
]
@classmethod
def unpack(cls, packed):
timestamp = struct.unpack(">I", packed[1])[0]
return cls(packed[0], timestamp)
对于Neighbors
,数据包结构为:
Neighbors packet-type: 0x04
struct Neighbours
{
list nodes: struct Neighbour
{
inline Endpoint endpoint;
NodeId node;
};
uint32_t timestamp;
};
这要求我们先定义一个Neighbor
类,我将在之后定义并取名为Node
。对于Neighbors
,唯一新概念是nodes
是一个列表,所以我们将使用map
来打包和解包数据:
class Neighbors(object):
packet_type = '\x04'
def __init__(self, nodes, timestamp):
self.nodes = nodes
self.timestamp = timestamp
def __str__(self):
return "(Ns [" + ", ".join(map(str, self.nodes)) + "] " + str(self.times\
tamp) + ")"
def pack(self):
return [
map(lambda x: x.pack(), self.nodes),
struct.pack(">I", self.timestamp)
]
@classmethod
def unpack(cls, packed):
nodes = map(lambda x: Node.unpack(x), packed[0])
timestamp = struct.unpack(">I", packed[1])[0]
return cls(nodes, timestamp)
对于Node
,唯一新概念是endpoint
是内联打包,所以endpoint.pack()
后成为一个单独的列表项,但是它不必,它只要把nodeID
追加到此列表末端。
class Node(object):
def __init__(self, endpoint, node):
self.endpoint = endpoint
self.node = node
def __str__(self):
return "(N " + binascii.b2a_hex(self.node)[:7] + "...)"
def pack(self):
packed = self.endpoint.pack()
packed.append(node)
return packed
@classmethod
def unpack(cls, packed):
endpoint = EndPoint.unpack(packed[0:3])
return cls(endpoint, packed[3])
对于新建的数据包类,让我们定义新的PingServer
方法来接收数据包,先简单地定义:
def receive_find_neighbors(self, payload):
print " received FindNeighbors"
print "", FindNeighbors.unpack(rlp.decode(payload))
def receive_neighbors(self, payload):
print " received Neighbors"
print "", Neighbors.unpack(rlp.decode(payload))
在PingServer
的receive
方法里面,我们也要调整response_types
派发表:
response_types = {
PingNode.packet_type : self.receive_ping,
Pong.packet_type : self.receive_pong,
FindNeighbors.packet_type : self.receive_find_neighbors,
Neighbors.packet_type : self.receive_neighbors
}
让服务器持续监听
为了让服务可以持续监听数据包,还有几个事项需要处理:
-
PingServer
的功能变得更通用,因此我们将它改名为Server
。 - 我们通过设置
self.sock.setblocking(0)
让服务器的套接字不再阻塞。 - 让我们把
receive
方法中#verify hash
上面的代码移到新的listen
方法中,并给receive
添加一个新的参数data
。这个新的listen
函数循环以select
等待数据包的到达并以receive
响应。select
函数的作用是在可选的超时时间内等待直至资源可用。 - 我们把从套接字读取的字节数增加到2048,因为一些以太数据包大小超过1024字节长。
- 我们将
udp_listen
更改为listen_thread
,并将线程对象返回,我们把线程的daemon
字段设置为True
,这意味着即便监听线程依然在运行,进程也将终止。(之前进程是挂起的)
最终相应的代码部分是这样的:
...
import select
...
class Server(object):
def __init__(self, my_endpoint):
...
## set socket non-blocking mode
self.sock.setblocking(0)
...
def receive(self, data):
## verify hash
msg_hash = data[:32]
...
...
def listen(self):
print "listening..."
while True:
ready = select.select([self.sock], [], [], 1.0)
if ready[0]:
data, addr = self.sock.recvfrom(2048)
print "received message[", addr, "]:"
self.receive(data)
...
def listen_thread(self):
thread = threading.Thread(target = self.listen)
thread.daemon = True
return thread
响应pings
我们必须修改Server
类的receive_ping
方法以响应一个Pong
。这也要求我们将Server
的ping
方法修改成更通用的函数send
。原来ping
创建一个PingNode
对象并发送,现在变成了send
接收一个新的packet
自变量,做发送前准备并发送。
def receive_ping(self, payload, msg_hash):
print " received Ping"
ping = PingNode.unpack(rlp.decode(payload))
pong = Pong(ping.endpoint_from, msg_hash, time.time() + 60)
print " sending Pong response: " + str(pong)
self.send(pong, pong.to)
...
def send(self, packet, endpoint):
message = self.wrap_packet(packet)
print "sending " + str(packet)
self.sock.sendto(message, (endpoint.address.exploded, endpoint.udpPort))
注意receive_ping
有一个新的msg_hash
参数。这个参数需放进位于Server
的receive
方法里面的dispatch
调用中,以及所有其他receive_
开头的函数。
def receive_pong(self, payload, msg_hash):
...
def receive_find_neighbors(self, payload, msg_hash):
...
def receive_neighbors(self, payload, msg_hash):
...
def receive(self, data):
## verify hash
msg_hash = data[:32]
...
dispatch(payload, msg_hash)
其他修复
因为引导节点使用v4
版本的RLPx协议。但是规范文档和我们的实现使用的是v3
,我们需要把PingNode
unpack
方法的packed[0]==cls.version
注释掉。在我可以找到基于新版本的集中文档之前,我不打算修改类的实际版本号。在前一篇文章里面,我忘记了把解包的timestamp
包含到cls
的参数里面,所以你的uppack
看上去要像下面这样:
@classmethod
def unpack(cls, packed):
## assert(packed[0] == cls.version)
endpoint_from = EndPoint.unpack(packed[1])
endpoint_to = EndPoint.unpack(packed[2])
timestamp = struct.unpack(">I", packed[3])[0]
return cls(endpoint_from, endpoint_to, timestamp)
v4
的另一个变化是EndPoint
编码的第二个自变量是可选的,所以你需要在unpack
方法中阐释。如果没有的话,你要设置tcpPort
等于udpPort
。
@classmethod
def unpack(cls, packed):
udpPort = struct.unpack(">H", packed[1])[0]
if packed[2] == '':
tcpPort = udpPort
else:
tcpPort = struct.unpack(">H", packed[2])[0]
return cls(packed[0], udpPort, tcpPort)
对之前版本代码的最后一个修改是,Pong
的pack
方法有一个拼写错误,timestamp
应该改为self.timestamp
。之所以没发现是因为我们从未发送过Pong
消息:
def pack(self):
return [
self.to.pack(),
self.echo,
struct.pack(">I", self.timestamp)]
修改send_ping.py
我们需要重写send_ping.py
以阐释新的发送流程。
from discovery import EndPoint, PingNode, Server, FindNeighbors, Node
import time
import binascii
bootnode_key = "3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99"
bootnode_endpoint = EndPoint(u'13.93.211.84',
30303,
30303)
bootnode = Node(bootnode_endpoint,
binascii.a2b_hex(bootnode_key))
my_endpoint = EndPoint(u'52.4.20.183', 30303, 30303)
server = Server(my_endpoint)
listen_thread = server.listen_thread()
listen_thread.start()
fn = FindNeighbors(bootnode.node, time.time() + 60)
ping = PingNode(my_endpoint, bootnode.endpoint, time.time() + 60)
## introduce self
server.send(ping, bootnode.endpoint)
## wait for pong-ping-pong
time.sleep(3)
## ask for neighbors
server.send(fn, bootnode.endpoint)
## wait for response
time.sleep(3)
首先,我们从params/bootnodes.go扒一个引导节点的key,创建一个Node
对象,作为我们的第一个联系对象。然后我们创建一个服务器,启动监听线程,并创建PingNode
和FindNeighbors
数据包。接着我们按照握手流程,ping
引导节点,接收一个pong
和一个ping
。我们将响应一个pong
以使自己成为一个公认已知节点。最后我们就可以发送fn
数据包。引导节点应该会以Neighbors
响应。
执行python send_ping.py
你应该可以看到:
$ python send_ping.py
sending (Ping 3 (EP 52.4.20.183 30303 30303) (EP 13.93.211.84 30303 30303) 1502819202.25)
listening...
received message[ ('13.93.211.84', 30303) ]:
Verified message hash.
Verified signature.
received Pong
(Pong (EP 52.4.20.183 30303 30303) <echo hash=""> 1502819162)
received message[ ('13.93.211.84', 30303) ]:
Verified message hash.
Verified signature.
received Ping
sending Pong response: (Pong (EP 13.93.211.84 30303 30303) <echo hash=""> 1502819202.34)
sending (Pong (EP 13.93.211.84 30303 30303) <echo hash=""> 1502819202.34)
sending (FN 3f1d120... 1502983026.6)
received message[ ('13.93.211.84', 30303) ]:
Verified message hash.
Verified signature.
received Neighbors
(Ns [(N 9e44f97...), (N 112917b...), (N ebf683d...), (N 2232e47...), (N f6ff826...), (N 7524431...), (N 804613e...), (N 78e5ce9...), (N c6dd88f...), (N 1dbf854...), (N 48a80a9...), (N 8b6c265...)] 1502982991)
received message[ ('13.93.211.84', 30303) ]:
Verified message hash.
Verified signature.
received Neighbors
(Ns [(N 8567bc4...), (N bf48f6a...), (N f8cb486...), (N 8e7e82e...)] 1502982991)
引导节点分两个数据包响应了16个邻居节点。
下一次,我们将构建一个流程来爬取这些邻居直到我们有足够的对等端点可以同步区块链。
网友评论