这一节介绍的是如何启动底层的p2p网络,和启动的p2p网络会做些什么。
启动的p2p网络会做三件事情:一是帮助本地节点发现网络中其他的远程节点,建立相邻节点列表,这里体现了p2p网络的结构化。二是监听网络中远程节点发送的建立TCP连接请求。三是向网络中的远程节点发起建立TCP连接请求。
这里主要先介绍监听TCP连接请求。TCP连接,有点类似于打电话。监听TCP连接请求,就像是在等待别人call你,你接通电话后,就建立了TCP连接。(TCP连接有三次握手过程)。在此之上,以太坊还使用了rlpx加密协议,来保障节点之间通信的安全。
接下来是代码方面的说明。
0.索引
01.以太坊p2p模块的目录对应功能
02.Server的介绍
03.Start 启动p2p网络
04.监听TCP连接的过程
05.总结
1.以太坊p2p模块的目录对应功能
省略了测试用的文件。
p2p/
discover/ 基于UDP的节点发现V4协议
discv5/ 节点发现V5协议
enode/ 节点信息
enr/ 以太坊节点记录(ethereum node records)
nat/ 网络地址转换,用于内网穿透
netutil/
protocol/
simulations/ 本地p2p网络的模拟器
dial.go 建立连接请求,以任务的形式
message.go 定义了读写的接口
metrics.go 计时器和计量器工具
peer.go 节点
protocol.go 子协议
rlpx.go 加密传输协议
server.go 底层p2p网络的函数入口
以太坊的p2p网络位于go-ethereum/p2p
目录下,该目录下的server.go
是以太坊p2p网络的文件入口。server.go
里定义了Server
服务。Server
服务对应了p2p底层网络的实现。
2.Server的介绍
Server
是p2p/server.go
中的核心结构体,用来管理所有连接的peer
。这里的peer
指的是一个共同建立p2p网络的远程节点。
type Server struct {...}
Server
结构体中包含了Config
配置,running
运行状态标志,ntab
节点发现的数据表等字段。
Server
实现的方法:
- 服务启动与停止:
Start
,Stop
- 增加或移除节点:
AddPeer
,AddTrustedPeer
,RemovePeer
,RemoveTrustedPeer
- 订阅事件:
SubscribeEvents
- 查看本地节点或是远程节点的信息:
Self
,Peers
,PeerCount
,tcpAddr
其中,最重要的是Start
方法,也就是p2p底层网络启动的具体过程。
3.Start 启动p2p网络
先来看一下大概的流程图:
(黄色方框表示调用了外部包,具体实现在黄色方框的go文件里。)
Start
方法执行了三个主要的步骤:
- 1.基于UDP的节点发现,调用了
discover.ListenUDP
的方法,这部分的实现逻辑在discover
目录里,kad的算法也就是在这其中实现。(补链接)ntab, err := discover.ListenUDP(conn, cfg)
- 2.
srv.startListening
监听远程节点发来的建立TCP连接请求。会开启一个单独的线程循环监听。// 开启监听 if err := srv.startListening(); err != nil { return err }
- 3.
srv.run
先发起建立TCP连接请求,然后接收建立了连接的远程节点发来的消息,进行对应处理。这也是一个单独的线程。(这里介绍发起TCP连接请求)dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) ... go srv.run(dialer)
Start
方法也就是做了启动后的p2p网络会做的三件事情,其中节点发现,发起TCP请求都涉及到了别的目录或go文件,所以放在下次介绍。接下来就是监听TCP连接的部分。
4.监听TCP连接的过程
分为两个部分:一是开启监听;二是监听循环。
startListening 开启监听
开启监听的过程比较简单,直接看代码。
func (srv *Server) startListening() error {
// 创建监听用的 listener
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
// 更新listener的地址,Addr返回的是listener的网络地址
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
// 开始监听循环
srv.loopWG.Add(1)
go srv.listenLoop()
// 如果配置了NAT,则映射TCP监听端口。
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
return nil
}
- 1.创建监听连接用的
lisetener
对象,srv.ListenAddr
是配置文件的监听地址,顺便一提,以太坊的默认的监听端口为30303
. - 2.更新
srv.ListenAddr
地址为监听者lisetener
的网络地址。 - 3.开始并发的监听循环。
- 4.如果配置了NAT,则开启一个单独的协程来进行TCP的监听端口。
listenLoop 监听循环
监听循环有两个主要过程,如下图,左半图是循环的接收接入的TCP连接;右半图是在符合要求的连接之上,建立加密的通道,即使用rlpx协议。
监听循环以及加密通道建立
- 1.设置最大挂起的节点数值。创建对应数值的通道缓存。每一个通道缓存对应一个建立TCP连接的线程。
slots
也相当于是循环次数。tokens = srv.MaxPendingPeers ... slots := make(chan struct{}, tokens)
- 2.开始监听循环(以下步骤都在循环里)。当
slots
里无缓存数据的时候,监听循环会被阻塞。(这里循环次数减1。)for { <-slots ... }
- 3.接收监听到的内容,返回的是一个连接
Conn
对象。fd, err = srv.listener.Accept()
- 4.如果设置了ip限制,没有匹配到ip,则拒绝连接。并给
slots
加一个空结构体,相当于循环次数加1。if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue } }
- 5.在上述过程中监听的连接,确认没问题后,开启一个独立的协程,与发来连接的远程节点建立加密通道。并给
slots
加一个空结构体,循环次数加1。
传入参数中的nil
值得注意,下面用到。go func() { srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }()
SetupConn 建立加密通道
- 1.获取远程节点的公钥,用于后续的加密通道的建立。这里需要注意以下,因为我们是在监听远程节点的连接,所以我们无该节点的公钥。也就是说此时
dialDest
是等于nil
的。var dialPubkey *ecdsa.PublicKey if dialDest != nil { dialPubkey = new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { return fmt.Errorf("dial destination doesn't have a secp256k1 public key") } }
- 2.执行加密握手。
c
是连接,srv.PrivateKey
是本地私钥,dialPubkey
是远程节点的公钥。关于dialPubkey
是否为nil
,c.doEncHandshake
有两种不同的应对方式,其实也就是TCP监听者和TCP发送者的不同。具体的代码细节在p2p/rlpx.go
里。remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)
- 3.检查加密握手是否顺利完成。
srv.posthandshake
是conn
类型的通道。err = srv.checkpoint(c, srv.posthandshake)
- 4.执行协议握手。协议握手的内容
protoHandshake
为版本号,节点公钥,子协议列表,监听的端口和节点名字。(在p2p/peer.go中。)phs, err := c.doProtoHandshake(srv.ourHandshake)
- 5.协议握手检查。往
srv.addpeer
通道里放入连接c
,表示要加入远程节点。
这里来看一下err = srv.checkpoint(c, srv.addpeer)
checkpoint
方法,传入参数为连接和通道。以协议握手为例,传入了与远程节点建立的连接c
,以及添加节点的通道srv.addpeer
,表示了建立TCP连接,两次握手通过以后,就执行添加远程节点的程序。func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { select { case stage <- c: case <-srv.quit: return errServerStopped } select { // 使用cont来向SetupConn发送信号错误。 case err := <-c.cont: return err case <-srv.quit: return errServerStopped } }
5.总结
- 1.
server.go
是p2p模块的函数入口,通过Start
方法来开启p2p服务。 - 2.
Start
包含了三个主要过程:基于UDP的节点发现,基于TCP的连接监听与建立。 - 3.介绍了监听TCP连接的实现过程。也就是
startListening
和listenLoop
方法。 - 4.监听到远程节点的连接请求后,与该节点建立rlpx加密通道。
网友评论