对于以太坊的p2p网络,我觉得,分为底层p2p网络的构建,以及上层eth服务的实现。在介绍以太坊上层服务之前,需要先来看一下底层网络是怎么跟上层网络联系在一起的。
0.索引
01.ProtocolManager 协议管理
02.新建一个 ProtocolManager
03.建立联系
04.使用 Run 方法
05.底层的peer结构
06.总结
1.ProtocolManager 协议管理
ProtocolManager
是eth/handle.go
中的核心结构体,用来管理节点之间的通信。
type ProtocolManager struct {
networkID uint64
fastSync uint32
acceptTxs uint32
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// fetcher, syncer, txsyncLoop 的通道
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
wg sync.WaitGroup
}
ProtocolManager
结构体中包含了
-
networkID
网络id。 -
fastSync
快速同步的标志,acceptTxs
接收交易方式的标志。 -
txpool
交易池,blockchain
区块链,chainconfig
区块链配置,maxpeers
最大节点数。 -
downloader
下载器,fetcher
提取器,peers
相邻节点表。 -
SubProtocols
子协议列表。(与底层节点相关。) - 以及需要用到的各种通道和同步锁。
2.新建一个 ProtocolManager
底层节点与上层节点的联系,就在新建一个ProtocolManager
的方法里。(第3个步骤,再往下会做具体说明。)
func NewProtocolManager(
config *params.ChainConfig,
mode downloader.SyncMode,
networkID uint64,
mux *event.TypeMux,
txpool txPool,
engine consensus.Engine,
blockchain *core.BlockChain,
chaindb ethdb.Database
) (*ProtocolManager, error)
- 1.初始化基础字段。
manager := &ProtocolManager{networkID: networkID, eventMux: mux, txpool: txpool, blockchain: blockchain, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}),}
- 2.确认是否快速同步模式。
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {...}
- 3.每个实现的版本添加子协议。也就是上层服务给予底层p2p网络调用上层服务的函数入口。(在这一步建立的联系。)
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions {...}
- 4.新建一个下载器
downloader
,用于下载区块。构建不同的同步机制,mode
为同步机制类型。manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
- 5.
validator
引用验证区块头的方法。validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) }
- 6.
heighter
引用获取区块高度的方法。heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() }
- 7.
inserter
引用在区块链上插入区块的方法。inserter := func(blocks types.Blocks) (int, error) { ... return manager.blockchain.InsertChain(blocks) }
- 8.新建一个提取器
fetcher
,用于辅助同步区块。manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
3.(添加子协议)建立联系
添加子协议的具体步骤。
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id enode.ID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
在新建一个p2p.Protocol
对象的时候,
- 传入三个字段:
Name
协议名,Version
协议版本,Length
协议长度。 - 新建三个远程节点的回调函数:
-
Run
执行协议。 -
NodeInfo
返回了本地节点的网络id,区块难度值,创世区块哈希值,区块链配置,当前区块哈希值。 -
PeerInfo
远程节点的信息。
-
Run
方法:manager.newPeer
方法使得底层的peer
能创建一个上层的peer
,并且自身包含在上层的peer
里。创建了上层的peer
后,调用了manager.handle(peer)
方法开始处理远程节点发来的消息。
4.调用子协议的 Run 方法 (包含步骤5)
在发起TCP连接请求的那一篇里,提到了:
在节点协议握手成功之后,srv.addpeer
的通道中加入与远程节点的连接。
这时候会触发 case c := <-srv.addpeer:
。
(代码在p2p/server.go
中)
case c := <-srv.addpeer:
// 对协议握手进行一次检查
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// 协议握手完成,通过检查。
// 新建底层peer。
p := newPeer(c, srv.Protocols)
// 如果启用了消息事件,请将peerFeed传递给peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
// 启动一个单独的协程,运行节点。
go srv.runPeer(p)
// 接收请求的节点集合加入该节点。
peers[c.node.ID()] = p
if p.Inbound() {
// 接入连接的数量加1.
inboundCount++
}
}
- 1.先对协议握手进行检查。
- 2.协议握手检查通过后,新建一个底层的
peer
。 - 3.启动一个单独的协程,来执行这一个
peer
。go srv.runPeer(p)
。先进行节点添加这一事件的广播,然后调用p2p/peer.go
的peer
对象的run
方法。func (srv *Server) runPeer(p *Peer) { // 测试。 ... // 广播节点添加事件。 srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeAdd, Peer: p.ID(), }) // 执行协议· remoteRequested, err := p.run() // 广播节点下线 srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), Error: err.Error(), }) // 删除节点 srv.delpeer <- peerDrop{p, err, remoteRequested} }
- 4.在
peers
里加入该peer
,如果接入成功,接入数量加1。
5.底层的peer结构以及run方法
(在p2p/peer.go
中)
首先是底层peer
结构体。它代表一个远程节点的连接。包含了rw
建立的TCP连接,running
协议对应的读写通道。
type Peer struct {
rw *conn // 建立的TCP连接
running map[string]*protoRW // 协议对应的读写通道
log log.Logger // 日志记录
created mclock.AbsTime
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
// 接收消息发送/接收事件
events *event.Feed
}
然后是底层peer
的run
方法,也就是一个底层节点会执行的所有操作。
func (p *Peer) run() (remoteRequested bool, err error) {
// 定义变量。
...
// 启动了两个单独的协程,一个用于循环的读取消息,一个用于循环发送ping消息,确保对方节点在线。
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
// 开启所有协议。
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
// 等待接收到错误或者是断开连接。
loop:
...
// 关闭节点。
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
- 启动了两个单独的协程,
go p.readLoop(readErr)
用于循环的读取消息,go p.pingLoop()
用于循环发送ping消息,确保对方节点在线。
- 启动了两个单独的协程,
- 2.执行节点包含的所有协议,即
p.startProtocols(writeStart, writeErr)
方法。在p.startProtocols
方法中调用了底层网络设置的回调函数Run
(以下程序语句)。err := proto.Run(p, rw)
步骤4和步骤5一步一步的执行和调用,最后使用了p2p.Protocol
的Run
方法。
6.总结
- 1.底层
peer
与上层peer
通过p2p.Protocol
对象的Run
方法联系在一起。Run
实现了新建上层节点,以及与协议版本对应的处理节点之间通信的消息的功能。 - 2.关于底层p2p网络启动了的协程的总结:
-
Server
服务启动了两个协程:监听远程节点发来的TCP请求,发起TCP连接请求。 - 底层
peer
运行协议的时候,启动了n+2个协程:循环读取消息,循环发送ping消息,以及n个协议对应的处理方式的协程。
-
网友评论