前言
前面已经看了一部分p2p源码,今天继续前面的来研读源码。
dial
上面的table类实现了Kademlia算法,udp实现了发现节点时节点间的网络通讯。发现节点后,就可以对一个节点发起连接了。devp2p中负责在两个节点建立连接的的便是dial类。
// NodeDialer is used to connect to nodes in the network, typically by using
// an underlying net.Dialer but also using net.Pipe in tests
// 连接网络中的节点,通常使用底层的net.Dialer,但也在测试中使用net.Pipe
type NodeDialer interface {
Dial(*discover.Node) (net.Conn, error)
}
// TCPDialer implements the NodeDialer interface by using a net.Dialer to
// create TCP connections to nodes in the network
// 通过使用net.Dialer创建与网络中节点的TCP连接来实现NodeDialer接口
type TCPDialer struct {
*net.Dialer
}
// Dial creates a TCP connection to the node
// 与节点创建一个tcp连接
func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) {
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
return t.Dialer.Dial("tcp", addr.String())
}
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
type dialstate struct {
// 最大的动态节点连接数
maxDynDials int
// discoverTable接口实现节点查询
ntab discoverTable
netrestrict *netutil.Netlist
lookupRunning bool
// 正在连接的节点
dialing map[discover.NodeID]connFlag
// 当前查询的节点结果
lookupBuf []*discover.Node // current discovery lookup results
// 从k桶表随机查询的节点
randomNodes []*discover.Node // filled from Table
// 静态节点
static map[discover.NodeID]*dialTask
// 连接历史
hist *dialHistory
// dialer首次使用的时间
start time.Time // time when the dialer was first used
// 内置节点,没有找到其他节点 连接这些节点
bootnodes []*discover.Node // default dials when there are no peers
}
type discoverTable interface {
Self() *discover.Node
Close()
Resolve(target discover.NodeID) *discover.Node
Lookup(target discover.NodeID) []*discover.Node
ReadRandomNodes([]*discover.Node) int
}
// the dial history remembers recent dials.
type dialHistory []pastDial
// pastDial is an entry in the dial history.
type pastDial struct {
id discover.NodeID
exp time.Time
}
type task interface {
Do(*Server)
}
这里有个接口定义了Do方法,同时可以看到下面有三种不同的task,可见每种task都会有对应的Do方法来处理task。
type task interface {
Do(*Server)
}
// A dialTask is generated for each node that is dialed. Its
// fields cannot be accessed while the task is running.
// 每个连接的节点会生成一个dialTask
type dialTask struct {
flags connFlag
dest *discover.Node
lastResolved time.Time
resolveDelay time.Duration
}
// discoverTask runs discovery table operations.
// Only one discoverTask is active at any time.
// discoverTask.Do performs a random lookup.
// 发现节点任务
type discoverTask struct {
results []*discover.Node
}
// A waitExpireTask is generated if there are no other tasks
// to keep the loop in Server.run ticking.
// 如果没有任务在server.run中循环就会生成waitExpireTask任务
type waitExpireTask struct {
time.Duration
}
有三种类型的task,那么怎么来生成一个task呢?
// 新建一个任务
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
}
var newtasks []task
// 检查节点,然后设置状态,最后把节点加入newtasks队列
addDial := func(flag connFlag, n *discover.Node) bool {
if err := s.checkDial(n, peers); err != nil {
log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
return false
}
s.dialing[n.ID] = flag
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
return true
}
// Compute number of dynamic dials necessary at this point.
// 计算所需的动态连接数
needDynDials := s.maxDynDials
// 首先统计已经建立连接的节点中动态连接数
for _, p := range peers {
// 动态类型
if p.rw.is(dynDialedConn) {
needDynDials--
}
}
// 其次统计正在建立的连接的动态连接数
for _, flag := range s.dialing {
if flag&dynDialedConn != 0 {
needDynDials--
}
}
// Expire the dial history on every invocation.
// 每次调用使连接记录到期
s.hist.expire(now)
// Create dials for static nodes if they are not connected.
// 为所有静态节点建立连接
for id, t := range s.static {
err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
delete(s.static, t.dest.ID)
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
// 当前还没有任何连接,并且fallbackInterval时间内仍未创建连接 使用内置节点
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
bootnode := s.bootnodes[0]
s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
s.bootnodes = append(s.bootnodes, bootnode)
if addDial(dynDialedConn, bootnode) {
needDynDials--
}
}
// Use random nodes from the table for half of the necessary
// dynamic dials.
// 使用1/2的随机节点创建连接
randomCandidates := needDynDials / 2
if randomCandidates > 0 {
n := s.ntab.ReadRandomNodes(s.randomNodes)
for i := 0; i < randomCandidates && i < n; i++ {
if addDial(dynDialedConn, s.randomNodes[i]) {
needDynDials--
}
}
}
// Create dynamic dials from random lookup results, removing tried
// items from the result buffer.
// 为随机查找的节点创建动态连接,并从结果缓冲区中删除尝试的节点
i := 0
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
if addDial(dynDialedConn, s.lookupBuf[i]) {
needDynDials--
}
}
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
// Launch a discovery lookup if more candidates are needed.
// 如果还需要更多的连接,则启动发现节点
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
s.lookupRunning = true
newtasks = append(newtasks, &discoverTask{})
}
// Launch a timer to wait for the next node to expire if all
// candidates have been tried and no task is currently active.
// This should prevent cases where the dialer logic is not ticked
// because there are no pending events.
// 如果当前没有任何任务,创建一个waitExpireTask
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
newtasks = append(newtasks, t)
}
return newtasks
}
其中的checkDial方法用来检查是否需要建立连接。
// 检查dial状态(是否需要创建连接)
func (s *dialstate) checkDial(n *discover.Node, peers map[discover.NodeID]*Peer) error {
_, dialing := s.dialing[n.ID]
switch {
case dialing:
// 正在创建
return errAlreadyDialing
case peers[n.ID] != nil
// 已经创建过连接
return errAlreadyConnected
case s.ntab != nil && n.ID == s.ntab.Self().ID:
// 创建的对象不是自己
return errSelf
case s.netrestrict != nil && !s.netrestrict.Contains(n.IP):
// 网络限制。对方IP不在白名单
return errNotWhitelisted
case s.hist.contains(n.ID):
return errRecentlyDialed
}
return nil
}
创建任务之后,针对不同的task会有不同的Do实现。我们一个一个来看这几个task的Do处理。首先来看看dialTask的Do处理,这里的dialTask主要在两个节点之间建立连接。
func (t *dialTask) Do(srv *Server) {
// 目标节点dest ip地址为空 使用resolve方法去查找目标节点并解析出ip地址
if t.dest.Incomplete() {
if !t.resolve(srv) {
return
}
}
// 建立连接
err := t.dial(srv, t.dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
// 如果是静态节点连接失败,尝试重新解析其节点ip地址 因为静态节点的ip是配置的,可能发生变动
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
}
// resolve attempts to find the current endpoint for the destination
// using discovery.
//
// Resolve operations are throttled with backoff to avoid flooding the
// discovery network with useless queries for nodes that don't exist.
// The backoff delay resets when the node is found.
// 当目标节点ip地址为空时使用该方法发现节点并解析ip地址
func (t *dialTask) resolve(srv *Server) bool {
if srv.ntab == nil {
log.Debug("Can't resolve node", "id", t.dest.ID, "err", "discovery is disabled")
return false
}
if t.resolveDelay == 0 {
t.resolveDelay = initialResolveDelay
}
if time.Since(t.lastResolved) < t.resolveDelay {
return false
}
// 查找到节点
resolved := srv.ntab.Resolve(t.dest.ID)
t.lastResolved = time.Now()
if resolved == nil {
t.resolveDelay *= 2
if t.resolveDelay > maxResolveDelay {
t.resolveDelay = maxResolveDelay
}
log.Debug("Resolving node failed", "id", t.dest.ID, "newdelay", t.resolveDelay)
return false
}
// The node was found.
t.resolveDelay = initialResolveDelay
t.dest = resolved
log.Debug("Resolved node", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)})
return true
}
type dialError struct {
error
}
// dial performs the actual connection attempt.
// 节点连接的实现
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
return &dialError{err}
}
// 新建一个计量连接
mfd := newMeteredConn(fd, false)
// 执行握手并尝试将连接方作为一个peer
return srv.SetupConn(mfd, t.flags, dest)
}
这里的SetupConn方法便是上面Server对象的方法了。里面经过握手协议后将节点添加到peers队列。
看完了dialTask,接着来看看另外两个task: discoverTask和waitExpireTask的Do处理。
// discoverTask的Do处理
func (t *discoverTask) Do(srv *Server) {
// newTasks generates a lookup task whenever dynamic dials are
// necessary. Lookups need to take some time, otherwise the
// event loop spins too fast.
// 查找任务
next := srv.lastLookup.Add(lookupInterval)
if now := time.Now(); now.Before(next) {
time.Sleep(next.Sub(now))
}
srv.lastLookup = time.Now()
var target discover.NodeID
rand.Read(target[:])
// 查找发现节点的函数
t.results = srv.ntab.Lookup(target)
}
...
func (t waitExpireTask) Do(*Server) {
time.Sleep(t.Duration)
}
这样建立连接的代码就看完了。综上,dial通过task任务来在两个节点之间建立连接。当新建一个dialTask时会检查所有节点并设置状态,然后发起连接。如果连接的节点没有达到动态连接数时,新建discoverTask来发现更多节点。
peer
dial对象在两个节点node之间建立连接,当节点建立连接之后便是peer。peer主要处理两个节点建立连接之后的协议处理。
const (
// PeerEventTypeAdd is the type of event emitted when a peer is added
// to a p2p.Server
// 一个远程节点被添加到服务器
PeerEventTypeAdd PeerEventType = "add"
// PeerEventTypeDrop is the type of event emitted when a peer is
// dropped from a p2p.Server
PeerEventTypeDrop PeerEventType = "drop"
// PeerEventTypeMsgSend is the type of event emitted when a
// message is successfully sent to a peer
PeerEventTypeMsgSend PeerEventType = "msgsend"
// PeerEventTypeMsgRecv is the type of event emitted when a
// message is received from a peer
PeerEventTypeMsgRecv PeerEventType = "msgrecv"
)
// PeerEvent is an event emitted when peers are either added or dropped from
// a p2p.Server or when a message is sent or received on a peer connection
// Server添加或删除peer时或在peer连接上发送或接收消息时发出的事件
type PeerEvent struct {
Type PeerEventType `json:"type"`
Peer discover.NodeID `json:"peer"`
Error string `json:"error,omitempty"`
Protocol string `json:"protocol,omitempty"`
MsgCode *uint64 `json:"msg_code,omitempty"`
MsgSize *uint32 `json:"msg_size,omitempty"`
}
// Peer represents a connected remote node.
// 连接的远程节点
type Peer struct {
// 节点间连接的底层信息,比如使用的socket以及对端节点支持的协议
rw *conn
// 节点间生效运行的协议簇
running map[string]*protoRW
log log.Logger
created mclock.AbsTime
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
// events receives message send / receive events if set
// 事件接收消息发送/接收事件(如果已设置)
events *event.Feed
}
这里定义了devp2p的四种消息类型,具体的协议类型参见wiki。
const (
// devp2p message codes
// 握手消息
handshakeMsg = 0x00
// 断开消息
discMsg = 0x01
// ping
pingMsg = 0x02
// ping消息的回复
pongMsg = 0x03
)
peer里至关重要的一个作用就是启动支持的协议族。
// 运行上层协议
func (p *Peer) run() (remoteRequested bool, err error) {
var (
// 写入开始的通道
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
// 开启两个协程,一个用于读取,一个用于ping操作
p.wg.Add(2)
// readLoop协程用于接收协议数据
go p.readLoop(readErr)
// pingLoop协程用于保持节点在线
go p.pingLoop()
// Start all protocol handlers.
// 启动协议
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
// Wait for an error or disconnect.
// 循环执行直到发生错误或断开
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
...
// 开启遍历协议
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
// 遍历目前运行的协议族
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
// 每一个协议开启一个协程调用其Run方法
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
}
}
ping方法来保持节点在线很简单,这里看下另一个读取消息的协程readLoop。
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
// 接收消息
msg, err := p.rw.ReadMsg()
if err != nil {
errc <- err
return
}
// 消息接收时间
msg.ReceivedAt = time.Now()
// 处理消息
if err = p.handle(msg); err != nil {
errc <- err
return
}
}
}
...
// 处理消息
func (p *Peer) handle(msg Msg) error {
// 判断消息类型分别处理
switch {
// 收到ping消息,回复pong消息
case msg.Code == pingMsg:
msg.Discard()
go SendItems(p.rw, pongMsg)
case msg.Code == discMsg:
var reason [1]DiscReason
// This is the last message. We don't need to discard or
// check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason)
return reason[0]
case msg.Code < baseProtocolLength:
// ignore other base protocol messages
return msg.Discard()
default:
// it's a subprotocol message
// 获取子协议消息
proto, err := p.getProto(msg.Code)
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
select {
case proto.in <- msg:
return nil
case <-p.closed:
return io.EOF
}
}
return nil
}
...
// getProto finds the protocol responsible for handling
// the given message code.
func (p *Peer) getProto(code uint64) (*protoRW, error) {
for _, proto := range p.running {
if code >= proto.offset && code < proto.offset+proto.Length {
return proto, nil
}
}
return nil, newPeerError(errInvalidMsgCode, "%d", code)
}
peer里还有个消息读写类protoRW来实现消息的读写。
// 协议读写类
type protoRW struct {
// 匿名对象
Protocol
// 收到消息的通道
in chan Msg // receices read messages
closed <-chan struct{} // receives when peer is shutting down
wstart <-chan struct{} // receives when write may start
werr chan<- error // for write results
offset uint64
w MsgWriter
}
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
select {
case <-rw.wstart:
err = rw.w.WriteMsg(msg)
// Report write status back to Peer.run. It will initiate
// shutdown if the error is non-nil and unblock the next write
// otherwise. The calling protocol code should exit for errors
// as well but we don't want to rely on that.
rw.werr <- err
case <-rw.closed:
err = ErrShuttingDown
}
return err
}
func (rw *protoRW) ReadMsg() (Msg, error) {
select {
case msg := <-rw.in:
msg.Code -= rw.offset
return msg, nil
case <-rw.closed:
return Msg{}, io.EOF
}
}
然后Protocol类定义了P2P协议;Rlpx定义了P2P网络通信底层的消息加密方式,peer中建立连接的两次握手就都是在这实现的。
至此p2p的核心源码就大致看完了。首先通过discover目录下的各个类去发现周围节点并将它们存储到数据库,这里主要涉及Kademlia算法的理解和实现。接着,通过dial在两个节点之间建立连接。随后建立连接的网络链路两端的节点就是peer,peer之间通过支持的协议进行通讯,建立tcp连接进行消息的传递。其中,底层的消息传递通过RLPx Encayption进行加密传输。
更多以太坊源码解析请移驾全球最大同性交友网,觉得有用记得给个小star哦
.
.
.
.
互联网颠覆世界,区块链颠覆互联网!
--------------------------------------------------20181103 17:58
网友评论