以太坊源码研读0xa1 p2p实现(上)

作者: WallisW | 来源:发表于2018-11-04 18:33 被阅读45次

首先,在之前的go公链实战中大概介绍了区块链网络的原理和实现,通信协议的实现参照的是Bitcoin的,这里以太坊的通信协议也大同小异。

以太坊devp2p最新动态和相关说明请点击这里。谈到devp2p就不得不讲libp2p,libp2p是一个开源的第三方p2p网络实现库,它汇集了各种传输和点对点协议,使开发人员可以轻松构建大型,强大的p2p网络。

废话少说撸代码

p2p目录结构

p2p目录结构

Server

我们就从服务器逻辑处理类来入手。

// Server manages all peer connections.
// 点对点连接的服务器管理者
type Server struct {
    // Config fields may not be modified while the server is running.
    // 服务器配置
    Config

    // Hooks for testing. These are useful because we can inhibit
    // the whole protocol stack.
    // 测试hook
    newTransport func(net.Conn) transport
    newPeerHook  func(*Peer)

    // 同步锁
    lock    sync.Mutex // protects running
    running bool

    ntab         discoverTable
    listener     net.Listener
    ourHandshake *protoHandshake
    lastLookup   time.Time
    DiscV5       *discv5.Network

    // These are for Peers, PeerCount (and nothing else).
    peerOp     chan peerOpFunc
    peerOpDone chan struct{}

    quit          chan struct{}
    addstatic     chan *discover.Node
    removestatic  chan *discover.Node
    posthandshake chan *conn
    addpeer       chan *conn
    delpeer       chan peerDrop
    loopWG        sync.WaitGroup // loop, listenLoop
    peerFeed      event.Feed
    log           log.Logger
}
...
// Config holds Server options.
type Config struct {
    // This field must be set to a valid secp256k1 private key.
    // 节点私钥
    PrivateKey *ecdsa.PrivateKey `toml:"-"`

    // MaxPeers is the maximum number of peers that can be
    // connected. It must be greater than zero.
    // 允许连接的最大节点数
    MaxPeers int

    // MaxPendingPeers is the maximum number of peers that can be pending in the
    // handshake phase, counted separately for inbound and outbound connections.
    // Zero defaults to preset values.
    // 握手阶段可以建立的最大连接数
    MaxPendingPeers int `toml:",omitempty"`

    // DialRatio controls the ratio of inbound to dialed connections.
    // Example: a DialRatio of 2 allows 1/2 of connections to be dialed.
    // Setting DialRatio to zero defaults it to 3.
    // 控制拨号比例 eg:DialRatio = 2 允许拨打1/2连接
    DialRatio int `toml:",omitempty"`

    // NoDiscovery can be used to disable the peer discovery mechanism.
    // Disabling is useful for protocol debugging (manual topology).
    // 禁用发现
    NoDiscovery bool

    // DiscoveryV5 specifies whether the the new topic-discovery based V5 discovery
    // protocol should be started or not.
    // 是否启用新的发现协议
    DiscoveryV5 bool `toml:",omitempty"`

    // Name sets the node name of this server.
    // Use common.MakeName to create a name that follows existing conventions.
    // 节点名称
    Name string `toml:"-"`

    // BootstrapNodes are used to establish connectivity
    // with the rest of the network.
    // 用于和网络其余部分建立连接
    BootstrapNodes []*discover.Node

    // BootstrapNodesV5 are used to establish connectivity
    // with the rest of the network using the V5 discovery
    // protocol.
    // 新的发现协议使用
    BootstrapNodesV5 []*discv5.Node `toml:",omitempty"`

    // Static nodes are used as pre-configured connections which are always
    // maintained and re-connected on disconnects.
    // 静态节点
    StaticNodes []*discover.Node

    // Trusted nodes are used as pre-configured connections which are always
    // allowed to connect, even above the peer limit.
    // 可信节点,始终允许连接
    TrustedNodes []*discover.Node

    // Connectivity can be restricted to certain IP networks.
    // If this option is set to a non-nil value, only hosts which match one of the
    // IP networks contained in the list are considered.
    // 限制连接设置
    // NetRestrict != 0 则只考虑与列表中包含的IP进行连接
    NetRestrict *netutil.Netlist `toml:",omitempty"`

    // NodeDatabase is the path to the database containing the previously seen
    // live nodes in the network.
    // 节点数据库路径
    NodeDatabase string `toml:",omitempty"`

    // Protocols should contain the protocols supported
    // by the server. Matching protocols are launched for
    // each peer.
    // 协议集
    Protocols []Protocol `toml:"-"`

    // If ListenAddr is set to a non-nil address, the server
    // will listen for incoming connections.
    //
    // If the port is zero, the operating system will pick a port. The
    // ListenAddr field will be updated with the actual address when
    // the server is started.
    // 监听地址
    // 如果ListenAddr设置为非零地址,则服务器将侦听传入连接
    // ListenAddr = 0 则系统选择一个端口
    ListenAddr string

    // If set to a non-nil value, the given NAT port mapper
    // is used to make the listening port available to the
    // Internet.
    // NAT端口
    NAT nat.Interface `toml:",omitempty"`

    // If Dialer is set to a non-nil value, the given Dialer
    // is used to dial outbound peer connections.
    // 拨号方
    Dialer NodeDialer `toml:"-"`

    // If NoDial is true, the server will not dial any peers.
    // 不与任何节点发起连接的标志
    NoDial bool `toml:",omitempty"`

    // If EnableMsgEvents is set then the server will emit PeerEvents
    // whenever a message is sent to or received from a peer
    // EnableMsgEvents = yes 则只要发起对等连接或接收消息,服务器就会发出PeerEvent
    EnableMsgEvents bool

    // Logger is a custom logger to use with the p2p.Server.
    // 客户端日志
    Logger log.Logger `toml:",omitempty"`
}
...
const (

    // 默认的连接超时时间
    defaultDialTimeout = 15 * time.Second

    // Connectivity defaults.
    // 默认连接属性
    maxActiveDialTasks     = 16
    defaultMaxPendingPeers = 50
    defaultDialRatio       = 3

    // Maximum time allowed for reading a complete message.
    // This is effectively the amount of time a connection can be idle.
    // 读取一个完整消息需要的最长时间(连接空闲时间)
    frameReadTimeout = 30 * time.Second

    // Maximum amount of time allowed for writing a complete message.
    // 构造一个完整消息徐亚的最长时间
    frameWriteTimeout = 20 * time.Second
)

然后看启动p2p服务的代码。

// Start starts running the server.
// Servers can not be re-used after stopping.
// 启动p2p
func (srv *Server) Start() (err error) {

    // 避免重复多次启动
    srv.lock.Lock()
    defer srv.lock.Unlock()
    if srv.running {
        return errors.New("server already running")
    }
    srv.running = true
    srv.log = srv.Config.Logger
    if srv.log == nil {
        srv.log = log.New()
    }
    srv.log.Info("Starting P2P networking")

    // static fields
    if srv.PrivateKey == nil {
        return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
    }
    if srv.newTransport == nil {

        // 加密链路使用RLPX协议
        srv.newTransport = newRLPX
    }
    if srv.Dialer == nil {
        // 使用TCPDialer
        srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
    }

    // 各种channel通道
    srv.quit = make(chan struct{})
    srv.addpeer = make(chan *conn)
    srv.delpeer = make(chan peerDrop)
    srv.posthandshake = make(chan *conn)
    srv.addstatic = make(chan *discover.Node)
    srv.removestatic = make(chan *discover.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})

    var (
        conn      *net.UDPConn
        sconn     *sharedUDPConn
        realaddr  *net.UDPAddr
        unhandled chan discover.ReadPacket
    )

    if !srv.NoDiscovery || srv.DiscoveryV5 {
        // 启动discover网络
        addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
        if err != nil {
            return err
        }
        // 开启UDP监听
        conn, err = net.ListenUDP("udp", addr)
        if err != nil {
            return err
        }
        realaddr = conn.LocalAddr().(*net.UDPAddr)
        if srv.NAT != nil {
            if !realaddr.IP.IsLoopback() {
                go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
            }
            // TODO: react to external IP changes over time.
            if ext, err := srv.NAT.ExternalIP(); err == nil {
                realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
            }
        }
    }

    if !srv.NoDiscovery && srv.DiscoveryV5 {
        // 新的节点发现协议
        unhandled = make(chan discover.ReadPacket, 100)
        sconn = &sharedUDPConn{conn, unhandled}
    }

    // node table
    if !srv.NoDiscovery {
        cfg := discover.Config{
            PrivateKey:   srv.PrivateKey,
            AnnounceAddr: realaddr,
            NodeDBPath:   srv.NodeDatabase,
            NetRestrict:  srv.NetRestrict,
            Bootnodes:    srv.BootstrapNodes,
            Unhandled:    unhandled,
        }
        ntab, err := discover.ListenUDP(conn, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
    }

    if srv.DiscoveryV5 {
        var (
            ntab *discv5.Network
            err  error
        )
        if sconn != nil {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
        } else {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
        }
        if err != nil {
            return err
        }
        if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
            return err
        }
        srv.DiscV5 = ntab
    }

    dynPeers := srv.maxDialedConns()
    // 新建dialerstate用来处理与节点的连接
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

    // handshake
    // 握手协议
    srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
    for _, p := range srv.Protocols {
        srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
    }
    // listen/dial
    if srv.ListenAddr != "" {
        // 开始监听TCP端口
        if err := srv.startListening(); err != nil {
            return err
        }
    }
    if srv.NoDial && srv.ListenAddr == "" {
        srv.log.Warn("P2P server will be useless, neither dialing nor listening")
    }

    srv.loopWG.Add(1)
    // 开启协程来处理
    go srv.run(dialer)
    srv.running = true
    return nil
}

接着继续来看里面,有关启动tcp监听的函数srv.startListening().它会启动底层来监听socket,当收到请求Accept后会调用setupConn()函数来建立连接。

setupConn()中握手主要分两个阶段:加密握手和协议握手。

func (srv *Server) startListening() error {
    // Launch the TCP listener.
    listener, err := net.Listen("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    laddr := listener.Addr().(*net.TCPAddr)
    srv.ListenAddr = laddr.String()
    srv.listener = listener
    srv.loopWG.Add(1)
    go srv.listenLoop()
    // Map the TCP listening port if NAT is configured.
    // 匹配TCP端口
    if !laddr.IP.IsLoopback() && srv.NAT != nil {
        srv.loopWG.Add(1)
        go func() {
            nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
            srv.loopWG.Done()
        }()
    }
    return nil
}
...
// listenLoop runs in its own goroutine and accepts
// inbound connections.
// 循环监听的一个协程
func (srv *Server) listenLoop() {
    defer srv.loopWG.Done()
    srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))

    // 连接处理数
    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        slots <- struct{}{}
    }

    for {
        // Wait for a handshake slot before accepting.
        // 等待握手信号槽
        <-slots

        var (
            fd  net.Conn
            err error
        )
        for {
            fd, err = srv.listener.Accept()
            if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
                srv.log.Debug("Temporary read error", "err", err)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                return
            }
            break
        }

        // Reject connections that do not match NetRestrict.
        // 拒绝不在白名单NetRestrict里的连接
        if srv.NetRestrict != nil {
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
                srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
                fd.Close()
                slots <- struct{}{}
                continue
            }
        }

        fd = newMeteredConn(fd, true)
        srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        go func() {
            // 执行连接
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{}
        }()
    }
}
...
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
// 执行握手并尝试将连接方作为一个peer
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
    self := srv.Self()
    if self == nil {
        return errors.New("shutdown")
    }
    // 创建conn对象
    c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
    err := srv.setupConn(c, flags, dialDest)
    if err != nil {
        c.close(err)
        srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
    }
    return err
}

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
    // Prevent leftover pending conns from entering the handshake.
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    // Run the encryption handshake.
    var err error
    // 握手协议的RLPX编码
    if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
        srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
        return err
    }
    clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
    // For dialed connections, check that the remote public key matches.
    // 连接握手的ID和对应的ID不匹配
    if dialDest != nil && c.id != dialDest.ID {
        clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
        return DiscUnexpectedIdentity
    }
    // 将c发送给srv.posthandshake
    err = srv.checkpoint(c, srv.posthandshake)
    if err != nil {
        clog.Trace("Rejected peer before protocol handshake", "err", err)
        return err
    }
    // Run the protocol handshake
    // 运行握手协议
    phs, err := c.doProtoHandshake(srv.ourHandshake)
    if err != nil {
        clog.Trace("Failed proto handshake", "err", err)
        return err
    }
    if phs.ID != c.id {
        clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
        return DiscUnexpectedIdentity
    }
    c.caps, c.name = phs.Caps, phs.Name
    // 将c发送给addpeer队列
    err = srv.checkpoint(c, srv.addpeer)
    if err != nil {
        clog.Trace("Rejected peer", "err", err)
        return err
    }
    // If the checks completed successfully, runPeer has now been
    // launched by run.
    clog.Trace("connection set up", "inbound", dialDest == nil)
    return nil
}

在server.start()还有个run()函数,它主要用来处理主动连接外部节点的流程和已经连接的checkpoint逻辑。

func (srv *Server) run(dialstate dialer) {
    defer srv.loopWG.Done()
    var (
        peers        = make(map[discover.NodeID]*Peer)
        inboundCount = 0
        trusted      = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
        taskdone     = make(chan task, maxActiveDialTasks)
        runningTasks []task
        queuedTasks  []task // tasks that can't run yet
    )
    // Put trusted nodes into a map to speed up checks.
    // Trusted peers are loaded on startup and cannot be
    // modified while the server is running.
    // 将信任节点加入map以快速检错
    for _, n := range srv.TrustedNodes {
        trusted[n.ID] = true
    }

    // removes t from runningTasks
    // delTask函数用来从runningTasks删除一个task
    delTask := func(t task) {
        for i := range runningTasks {
            if runningTasks[i] == t {
                runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
                break
            }
        }
    }
    // starts until max number of active tasks is satisfied
    // 启动任务,直到任务数达到最大
    startTasks := func(ts []task) (rest []task) {
        i := 0
        for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
            t := ts[i]
            srv.log.Trace("New dial task", "task", t)
            go func() { t.Do(srv); taskdone <- t }()
            runningTasks = append(runningTasks, t)
        }
        return ts[i:]
    }
    scheduleTasks := func() {
        // Start from queue first.
        // 启动queuedTasks中的第一个
        queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
        // Query dialer for new tasks and start as many as possible now.
        // 调用newTasks来生成任务,并尝试用startTasks启动
        // 把暂时无法启动的放入queuedTasks队列
        if len(runningTasks) < maxActiveDialTasks {
            nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
            queuedTasks = append(queuedTasks, startTasks(nt)...)
        }
    }

running:
    for {
        scheduleTasks()

        select {
        case <-srv.quit:
            // The server was stopped. Run the cleanup logic.
            // 服务器停止
            break running
        case n := <-srv.addstatic:
            // This channel is used by AddPeer to add to the
            // ephemeral static peer list. Add it to the dialer,
            // it will keep the node connected.
            // 添加静态节点
            srv.log.Trace("Adding static node", "node", n)
            dialstate.addStatic(n)
        case n := <-srv.removestatic:
            // This channel is used by RemovePeer to send a
            // disconnect request to a peer and begin the
            // stop keeping the node connected
            // 移除静态节点
            srv.log.Trace("Removing static node", "node", n)
            dialstate.removeStatic(n)
            if p, ok := peers[n.ID]; ok {
                p.Disconnect(DiscRequested)
            }
        case op := <-srv.peerOp:
            // This channel is used by Peers and PeerCount.
            op(peers)
            srv.peerOpDone <- struct{}{}
        case t := <-taskdone:
            // A task got done. Tell dialstate about it so it
            // can update its state and remove it from the active
            // tasks list.
            srv.log.Trace("Dial task done", "task", t)
            dialstate.taskDone(t, time.Now())
            delTask(t)
        case c := <-srv.posthandshake:
            // A connection has passed the encryption handshake so
            // the remote identity is known (but hasn't been verified yet).
            // 之前调用checkpoint()会把连接发送到这
            if trusted[c.id] {
                // Ensure that the trusted flag is set before checking against MaxPeers.
                // 确保在检查MaxPeers之前设置了可信标志。
                c.flags |= trustedConn
            }
            // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
            select {
            case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
            case <-srv.quit:
                break running
            }
        case c := <-srv.addpeer:
            // At this point the connection is past the protocol handshake.
            // Its capabilities are known and the remote identity is verified.
            // 已经通过握手协议
            err := srv.protoHandshakeChecks(peers, inboundCount, c)
            if err == nil {
                // The handshakes are done and it passed all checks.
                // 节点通过验证
                p := newPeer(c, srv.Protocols)
                // If message events are enabled, pass the peerFeed
                // to the peer
                // 如果启用了消息事件,请将peerFeed传递给peer
                if srv.EnableMsgEvents {
                    p.events = &srv.peerFeed
                }
                name := truncateName(c.name)
                srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
                // 添加p2p节点
                go srv.runPeer(p)
                peers[c.id] = p
                if p.Inbound() {
                    inboundCount++
                }
            }
            // The dialer logic relies on the assumption that
            // dial tasks complete after the peer has been added or
            // discarded. Unblock the task last.
            select {
            case c.cont <- err:
            case <-srv.quit:
                break running
            }
        case pd := <-srv.delpeer:
            // A peer disconnected.
            // 删除peer
            d := common.PrettyDuration(mclock.Now() - pd.created)
            pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
            delete(peers, pd.ID())
            if pd.Inbound() {
                inboundCount--
            }
        }
    }

    srv.log.Trace("P2P networking is spinning down")

    // Terminate discovery. If there is a running lookup it will terminate soon.
    // 停止发现节点
    if srv.ntab != nil {
        srv.ntab.Close()
    }
    if srv.DiscV5 != nil {
        srv.DiscV5.Close()
    }
    // Disconnect all peers.
    // 断开与所有peer的连接
    for _, p := range peers {
        p.Disconnect(DiscQuitting)
    }
    // Wait for peers to shut down. Pending connections and tasks are
    // not handled here and will terminate soon-ish because srv.quit
    // is closed.
    // 等待peer关闭
    for len(peers) > 0 {
        p := <-srv.delpeer
        p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks))
        delete(peers, p.ID())
    }
}
...
// runPeer runs in its own goroutine for each peer.
// it waits until the Peer logic returns and removes
// the peer.
func (srv *Server) runPeer(p *Peer) {
    if srv.newPeerHook != nil {
        srv.newPeerHook(p)
    }

    // broadcast peer add
    // 广播节点添加
    srv.peerFeed.Send(&PeerEvent{
        Type: PeerEventTypeAdd,
        Peer: p.ID(),
    })

    // run the protocol
    // 运行协议
    remoteRequested, err := p.run()

    // broadcast peer drop
    srv.peerFeed.Send(&PeerEvent{
        Type:  PeerEventTypeDrop,
        Peer:  p.ID(),
        Error: err.Error(),
    })

    // Note: run waits for existing peers to be sent on srv.delpeer
    // before returning, so this send should not select on srv.quit.
    srv.delpeer <- peerDrop{p, err, remoteRequested}
}

总结一下,当server启动后,它会通过listenloop()来接收和处理外部请求,通过run来主动发起与外部其他节点的连接。

Node

所以,接着就看看node的结构。

// Node represents a host on the network.
// The fields of Node may not be modified.
// 节点表示网络上的主机
type Node struct {
    IP       net.IP // len 4 for IPv4 or 16 for IPv6
    UDP, TCP uint16 // port numbers
    ID       NodeID // the node's public key

    // This is a cached copy of sha3(ID) which is used for node
    // distance calculations. This is part of Node in order to make it
    // possible to write tests that need a node at a certain distance.
    // In those tests, the content of sha will not actually correspond
    // with ID.
    sha common.Hash

    // Time when the node was added to the table.
    addedAt time.Time
}

连接中的远程节点peer的结构是:

// Peer represents a connected remote node.
// 连接的远程节点
type Peer struct {
    rw      *conn
    running map[string]*protoRW
    log     log.Logger
    created mclock.AbsTime

    wg       sync.WaitGroup
    protoErr chan error
    closed   chan struct{}
    disc     chan DiscReason

    // events receives message send / receive events if set
    events *event.Feed
}

在p2p网络中,每一个节点既是服务端又是客户端。devp2p主要有两个功能,一个是用来发现当前节点之外的其他节点,一个是实现两个节点间的信息通讯。

节点发现

准备知识

对于分布式系统,怎么去高效地发现节点是一个至关重要的问题。一般的分布式系统都是使用DHT(Distributed Hash Table,分布式哈希表)原理来发现和管理节点。其中应用最广的又当属Kademlia算法。

这里且通过我认为比较好的两篇博文来理解DHT原理和Kademlia算法。

其中,《易懂分布式 | Kademlia算法》一文可以着重看看,它将是接下来看发现节点源码的基础,这也是我看过的关于Kademlia最通俗易懂的博文。

这里也将Kademlia算法的论文原文以及中文翻译版贴出:

table(k-bucket in Kademlia)

这里的table结构就相当于Kademlia算法中的k-bucket(k桶),用来存储一个节点附近的某些节点路由。

const (
    // 并发因子
    alpha           = 3  // Kademlia concurrency factor
    // k桶容量
    bucketSize      = 16 // Kademlia bucket size
    // 每个k桶更换清单的大小
    maxReplacements = 10 // Size of per-bucket replacement list

    // We keep buckets for the upper 1/15 of distances because
    // it's very unlikely we'll ever encounter a node that's closer.
    //
    hashBits          = len(common.Hash{}) * 8
    nBuckets          = hashBits / 15       // Number of buckets
    // 最近k桶的距离
    bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket

    // IP address limits.
    bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
    tableIPLimit, tableSubnet   = 10, 24

    // 节点丢弃的限制,超过这些限制的节点将被丢弃
    maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped
    refreshInterval     = 30 * time.Minute
    revalidateInterval  = 10 * time.Second
    copyNodesInterval   = 30 * time.Second
    seedMinTableTime    = 5 * time.Minute
    seedCount           = 30
    seedMaxAge          = 5 * 24 * time.Hour
)

//k-bucket表
type Table struct {
    mutex   sync.Mutex        // protects buckets, bucket content, nursery, rand
    // k桶数组
    buckets [nBuckets]*bucket // index of known nodes by distance
    // 引导节点
    nursery []*Node           // bootstrap nodes
    // 随机源头
    rand    *mrand.Rand       // source of randomness, periodically reseeded
    ips     netutil.DistinctNetSet

    // 节点数据库
    db         *nodeDB // database of known nodes
    refreshReq chan chan struct{}
    initDone   chan struct{}
    closeReq   chan struct{}
    closed     chan struct{}

    nodeAddedHook func(*Node) // for testing

    net  transport
    self *Node // metadata of the local node
}
...
// bucket contains nodes, ordered by their last activity. the entry
// that was most recently active is the first element in entries.
// 按上一个活动排序的节点
// 最近活动的条目是条目中的第一个元素
type bucket struct {
    // 实时节点列表
    entries      []*Node // live entries, sorted by time of last contact
    // 候补节点,entries满了后之后的节点会存储到这
    replacements []*Node // recently seen nodes to be used if revalidation fails
    ips          netutil.DistinctNetSet
}

那么怎么初始化一个k桶表呢?

func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
    // If no node database was given, use an in-memory one
    // 1.新建节点数据库
    db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
    if err != nil {
        return nil, err
    }

    // 2.构造k桶表
    tab := &Table{
        net:        t,
        db:         db,
        self:       NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
    }

    // 3.设置初始连接点
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }

    // 4.填充k桶
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }

    // 4.加载种子节点
    tab.seedRand()
    tab.loadSeedNodes()
    // Start the background expiration goroutine after loading seeds so that the search for
    // seed nodes also considers older nodes that would otherwise be removed by the
    // expiration.
    tab.db.ensureExpirer()
    
    // 5.开启协程循环更新
    go tab.loop()
    return tab, nil
}
...
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
    var (
        revalidate     = time.NewTimer(tab.nextRevalidateTime())
        refresh        = time.NewTicker(refreshInterval)
        copyNodes      = time.NewTicker(copyNodesInterval)
        revalidateDone = make(chan struct{})
        refreshDone    = make(chan struct{})           // where doRefresh reports completion
        waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
    )
    defer refresh.Stop()
    defer revalidate.Stop()
    defer copyNodes.Stop()

    // Start initial refresh.
    // 初始化刷新
    go tab.doRefresh(refreshDone)

loop:
    for {
        select {
        case <-refresh.C:
            tab.seedRand()
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case req := <-tab.refreshReq:
            // 将刷新请求加入通道数组
            waiting = append(waiting, req)
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case <-refreshDone:
            // 刷新完毕,关闭通道
            for _, ch := range waiting {
                close(ch)
            }
            waiting, refreshDone = nil, nil
        case <-revalidate.C:
            // 重新验证
            go tab.doRevalidate(revalidateDone)
        case <-revalidateDone:
            revalidate.Reset(tab.nextRevalidateTime())
        case <-copyNodes.C:
            // 拷贝活跃节点
            go tab.copyLiveNodes()
        case <-tab.closeReq:
            break loop
        }
    }

    if tab.net != nil {
        tab.net.close()
    }
    if refreshDone != nil {
        <-refreshDone
    }
    for _, ch := range waiting {
        close(ch)
    }
    tab.db.close()
    close(tab.closed)
}

接着继续看看loop里的几个函数。

// Lookup performs a network search for nodes close
// to the given target. It approaches the target by querying
// nodes that are closer to it on each iteration.
// The given target does not need to be an actual node
// identifier.
// 搜索当前节点的附近节点
func (tab *Table) Lookup(targetID NodeID) []*Node {
    return tab.lookup(targetID, true)
}

func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
    var (
        target         = crypto.Keccak256Hash(targetID[:])
        asked          = make(map[NodeID]bool)
        seen           = make(map[NodeID]bool)
        reply          = make(chan []*Node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    // 一般询问自己是没有意义的
    asked[tab.self.ID] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
        // 获取k桶表中给定节点id的n个节点
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {

            // k桶表不为空
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.

        // k桶表为空
        <-tab.refresh()
        refreshIfEmpty = false
    }

    for {
        // ask the alpha closest nodes that we haven't asked yet
        // 询问尚未询问的最接近的节点
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            // 未被查询的节点
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++
                // 查找节点
                go tab.findnode(n, targetID, reply)
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            // 所有最近节点都询问过了
            break
        }
        // wait for the next reply
        for _, n := range <-reply {
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }
    return result.entries
}
...
func (tab *Table) findnode(n *Node, targetID NodeID, reply chan<- []*Node) {
    fails := tab.db.findFails(n.ID)
    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
    if err != nil || len(r) == 0 {
        fails++
        tab.db.updateFindFails(n.ID, fails)
        log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
        if fails >= maxFindnodeFailures {
            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
            tab.delete(n)
        }
    } else if fails > 0 {
        tab.db.updateFindFails(n.ID, fails-1)
    }

    // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
    // just remove those again during revalidation.
    // 将所有节点加入k桶表,尽管有些已经离线(我们会在重新验证时删除这些节点)
    for _, n := range r {
        tab.add(n)
    }
    reply <- r
}
...
// nodesByDistance is a list of nodes, ordered by
// distance to target.
// 根据距离远近排序的邻居节点
type nodesByDistance struct {
    entries []*Node
    target  common.Hash
}

// push adds the given node to the list, keeping the total size below maxElems.
// 将节点添加到给定列表并维持大小小于maxElems
// 
func (h *nodesByDistance) push(n *Node, maxElems int) {
    
    // 根据节点到target的距离排序
    ix := sort.Search(len(h.entries), func(i int) bool {
        return distcmp(h.target, h.entries[i].sha, n.sha) > 0
    })
    
    // 大小未超出限制,将节点加入
    if len(h.entries) < maxElems {
        h.entries = append(h.entries, n)
    }
    if ix == len(h.entries) {
        // farther away than all nodes we already have.
        // if there was room for it, the node is now the last element.
    } else {
        // slide existing entries down to make room
        // this will overwrite the entry we just appended.
        copy(h.entries[ix+1:], h.entries[ix:])
        h.entries[ix] = n
    }
}

现在只看了刷新节点的方法doRefresh(),查找到的节点还需要进行二次验证来确保每一个节点都在线。

// doRevalidate checks that the last node in a random bucket is still live
// and replaces or deletes the node if it isn't.
// 检查最后一个节点是否在线,不在线就更换或删除
func (tab *Table) doRevalidate(done chan<- struct{}) {
    defer func() { done <- struct{}{} }()

    last, bi := tab.nodeToRevalidate()
    if last == nil {
        // No non-empty bucket found.
        return
    }

    // Ping the selected node and wait for a pong.
    // 发出ping指令,等待pong回复
    err := tab.net.ping(last.ID, last.addr())

    tab.mutex.Lock()
    defer tab.mutex.Unlock()
    b := tab.buckets[bi]
    if err == nil {
        // The node responded, move it to the front.
        log.Trace("Revalidated node", "b", bi, "id", last.ID)
        b.bump(last)
        return
    }
    // No reply received, pick a replacement or delete the node if there aren't
    // any replacements.
    // 没有事收到ping的回复,更换或删除节点
    if r := tab.replace(b, last); r != nil {
        log.Trace("Replaced dead node", "b", bi, "id", last.ID, "ip", last.IP, "r", r.ID, "rip", r.IP)
    } else {
        log.Trace("Removed dead node", "b", bi, "id", last.ID, "ip", last.IP)
    }
}

// nodeToRevalidate returns the last node in a random, non-empty bucket.
// 获取随机非空k桶的最后一个节点
func (tab *Table) nodeToRevalidate() (n *Node, bi int) {
    tab.mutex.Lock()
    defer tab.mutex.Unlock()

    for _, bi = range tab.rand.Perm(len(tab.buckets)) {
        b := tab.buckets[bi]
        if len(b.entries) > 0 {
            last := b.entries[len(b.entries)-1]
            return last, bi
        }
    }
    return nil, 0
}
...
// bump moves the given node to the front of the bucket entry list
// if it is contained in that list.
// 将节点node移动到最前
func (b *bucket) bump(n *Node) bool {
    for i := range b.entries {
        if b.entries[i].ID == n.ID {
            // move it to the front
            copy(b.entries[1:], b.entries[:i])
            b.entries[0] = n
            return true
        }
    }
    return false
}
...
// replace removes n from the replacement list and replaces 'last' with it if it is the
// last entry in the bucket. If 'last' isn't the last entry, it has either been replaced
// with someone else or became active.
// 这里验证失败后会把前面buket结构里replacements里的节点候补到entries
func (tab *Table) replace(b *bucket, last *Node) *Node {
    if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID != last.ID {
        // Entry has moved, don't replace it.
        return nil
    }
    // Still the last entry.
    if len(b.replacements) == 0 {
        tab.deleteInBucket(b, last)
        return nil
    }
    r := b.replacements[tab.rand.Intn(len(b.replacements))]
    b.replacements = deleteNode(b.replacements, r)
    b.entries[len(b.entries)-1] = r
    tab.removeIP(b, last.IP)
    return r
}

综上,table主要实现了用于p2p的Kademlia协议。主要通过loop函数来进行k桶表的构造,大概的流程是:

  • 1.loadSeedNodes方法加载种子节点,通过这些节点来发现其他网络节点。

  • 2.loop循环当收到刷新请求时,会调lookup方法去发现当前节点的种子节点

  • 3.按照Kademlia核心思想,递归地调用findnode方法询问所有最近节点直到所有节点都被询问过

  • 4.当收到一个节点replay时调用push方法将节点加入到nodesByDistance结构的数组,并按照各邻居节点到当前节点的顺序排序

  • 5.然后还会通过doRevalidate方法发出ping指令对搜索到的bucket中的节点进行验证,以此来剔除不在线的但已添加到entries列表的节点

上面涉及到的ping()方法以及最终findnode方法都是通过transport接口实现的。不难发现udp这个类正好实现了这个接口,还有kademlia协议本身就是使用UDP来进行网络通信的。

// transport is implemented by the UDP transport.
// it is an interface so we can test without opening lots of UDP
// sockets and without generating a private key.
// 真正寻找节点的接口
type transport interface {
    ping(NodeID, *net.UDPAddr) error
    findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error)
    close()
}

udp

进入udp.go,可以看到其中定义了4种数据报结构:

// RPC packet types
const (
    // ping包  用于询问节点是否在线
    pingPacket = iota + 1 // zero is 'reserved'
    //pong包 是对ping的回复
    pongPacket
    // 查找邻居节点
    findnodePacket
    // 返回另据节点  是对findnodePacket的回复
    neighborsPacket
)

// RPC request structures
type (
    ping struct {
        Version    uint
        From, To   rpcEndpoint
        Expiration uint64
        // Ignore additional fields (for forward compatibility).
        Rest []rlp.RawValue `rlp:"tail"`
    }

    // pong is the reply to ping.
    pong struct {
        // This field should mirror the UDP envelope address
        // of the ping packet, which provides a way to discover the
        // the external address (after NAT).
        To rpcEndpoint

        ReplyTok   []byte // This contains the hash of the ping packet.
        Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
        // Ignore additional fields (for forward compatibility).
        Rest []rlp.RawValue `rlp:"tail"`
    }

    // findnode is a query for nodes close to the given target.
    findnode struct {
        Target     NodeID // doesn't need to be an actual public key
        Expiration uint64
        // Ignore additional fields (for forward compatibility).
        Rest []rlp.RawValue `rlp:"tail"`
    }

    // reply to findnode
    neighbors struct {
        Nodes      []rpcNode
        Expiration uint64
        // Ignore additional fields (for forward compatibility).
        Rest []rlp.RawValue `rlp:"tail"`
    }

    rpcNode struct {
        IP  net.IP // len 4 for IPv4 or 16 for IPv6
        UDP uint16 // for discovery protocol
        TCP uint16 // for RLPx protocol
        ID  NodeID
    }

    rpcEndpoint struct {
        IP  net.IP // len 4 for IPv4 or 16 for IPv6
        UDP uint16 // for discovery protocol
        TCP uint16 // for RLPx protocol
    }
)

接着来看一下udp的数据结构。

// conn接口实现了udp通讯必要的方法
type conn interface {
    ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
    WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
    Close() error
    LocalAddr() net.Addr
}

// udp implements the RPC protocol.
type udp struct {
    // conn接口实现了udp通讯必要的方法
    conn        conn
    netrestrict *netutil.Netlist
    // 节点私钥
    priv        *ecdsa.PrivateKey
    ourEndpoint rpcEndpoint

    // 待处理的回应,存储发出一个消息后应当得到的回应
    addpending chan *pending
    // 处理回应,收到回复消息后会遍历pending列表查找匹配的回应
    gotreply   chan reply

    closing chan struct{}
    nat     nat.Interface

    // 匿名对象,可以直接调用Table的方法
    *Table
}

// pending represents a pending reply.
//
// some implementations of the protocol wish to send more than one
// reply packet to findnode. in general, any neighbors packet cannot
// be matched up with a specific findnode packet.
//
// our implementation handles this by storing a callback function for
// each pending reply. incoming packets from a node are dispatched
// to all the callback functions for that node.
// 待处理的回复
//
type pending struct {
    // these fields must match in the reply.
    from  NodeID
    ptype byte

    // time when the request must complete
    // 请求消息的截止时间
    deadline time.Time

    // callback is called when a matching reply arrives. if it returns
    // true, the callback is removed from the pending reply queue.
    // if it returns false, the reply is considered incomplete and
    // the callback will be invoked again for the next matching reply.
    // 匹配的回复消息到达时调用回调
    // true-从挂起的回复队列中删除回调
    // false-回复不完整,并且将再次调用回调以进行下一次匹配
    callback func(resp interface{}) (done bool)

    // errc receives nil when the callback indicates completion or an
    // error if no further reply is received within the timeout.
    errc chan<- error
}

type reply struct {
    from  NodeID
    ptype byte
    data  interface{}
    // loop indicates whether there was
    // a matching request by sending on this channel.
    matched chan<- bool
}

接着,我们看看在table里调用的ping()是怎么实现的。

// ping sends a ping message to the given node and waits for a reply.
// 发送ping消息并等待回复
func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
    return <-t.sendPing(toid, toaddr, nil)
}

// sendPing sends a ping message to the given node and invokes the callback
// when the reply arrives.
// 向给定节点发送ping,并在收到pang回应时调用回调
func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan error {
    req := &ping{
        Version:    4,
        From:       t.ourEndpoint,
        To:         makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
        Expiration: uint64(time.Now().Add(expiration).Unix()),
    }
    packet, hash, err := encodePacket(t.priv, pingPacket, req)
    if err != nil {
        errc := make(chan error, 1)
        errc <- err
        return errc
    }
    errc := t.pending(toid, pongPacket, func(p interface{}) bool {
        ok := bytes.Equal(p.(*pong).ReplyTok, hash)
        if ok && callback != nil {
            callback()
        }
        return ok
    })
    t.write(toaddr, req.name(), packet)
    return errc
}

然后,顺便看下查找节点的消息发送机制。

// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
// 发起一个findnode请求并等待邻居节点的回应
func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
    // If we haven't seen a ping from the destination node for a while, it won't remember
    // our endpoint proof and reject findnode. Solicit a ping first.
    // 如果没有收到目标节点的ping,先发起ping请求
    if time.Since(t.db.lastPingReceived(toid)) > nodeDBNodeExpiration {
        t.ping(toid, toaddr)
        t.waitping(toid)
    }

    nodes := make([]*Node, 0, bucketSize)
    nreceived := 0
    errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
        reply := r.(*neighbors)
        for _, rn := range reply.Nodes {
            nreceived++
            n, err := t.nodeFromRPC(toaddr, rn)
            if err != nil {
                log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
                continue
            }
            nodes = append(nodes, n)
        }
        return nreceived >= bucketSize
    })
    // 发送findnodePacket消息
    t.send(toaddr, findnodePacket, &findnode{
        Target:     target,
        Expiration: uint64(time.Now().Add(expiration).Unix()),
    })
    return nodes, <-errc
}
...
// pending adds a reply callback to the pending reply queue.
// see the documentation of type pending for a detailed explanation.
// 将回复回调添加到挂起的回复队列
func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
    ch := make(chan error, 1)
    p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
    select {
    case t.addpending <- p:
        // loop will handle it
    case <-t.closing:
        ch <- errClosed
    }
    return ch
}
...
func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
    packet, hash, err := encodePacket(t.priv, ptype, req)
    if err != nil {
        return hash, err
    }
    return hash, t.write(toaddr, req.name(), packet)
}

那么当一个节点收到这些消息时,又是怎么处理的呢?根据代码发现四种消息包对应了四种处理方式,可见这里一定定义了一个接口来实现对收到的消息包的处理。

type packet interface {
    handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
    name() string
}
...
// handle ping
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    if expired(req.Expiration) {
        return errExpired
    }
    t.send(from, pongPacket, &pong{
        To:         makeEndpoint(from, req.From.TCP),
        ReplyTok:   mac,
        Expiration: uint64(time.Now().Add(expiration).Unix()),
    })
    t.handleReply(fromID, pingPacket, req)

    // Add the node to the table. Before doing so, ensure that we have a recent enough pong
    // recorded in the database so their findnode requests will be accepted later.
    // 组装节点信息
    n := NewNode(fromID, from.IP, uint16(from.Port), req.From.TCP)
    // 判断上次收到pang消息的时间间隔是否超过限制
    if time.Since(t.db.lastPongReceived(fromID)) > nodeDBNodeExpiration {
        t.sendPing(fromID, from, func() { t.addThroughPing(n) })
    } else {
        t.addThroughPing(n)
    }
    t.db.updateLastPingReceived(fromID, time.Now())
    return nil
}

func (req *ping) name() string { return "PING/v4" }

// handle pong
func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    if expired(req.Expiration) {
        return errExpired
    }
    // 没有找到匹配的ping请求
    if !t.handleReply(fromID, pongPacket, req) {
        return errUnsolicitedReply
    }
    t.db.updateLastPongReceived(fromID, time.Now())
    return nil
}

func (req *pong) name() string { return "PONG/v4" }

// handle findnode
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    if expired(req.Expiration) {
        return errExpired
    }
    // 请求节点是否绑定
    if !t.db.hasBond(fromID) {
        // No endpoint proof pong exists, we don't process the packet. This prevents an
        // attack vector where the discovery protocol could be used to amplify traffic in a
        // DDOS attack. A malicious actor would send a findnode request with the IP address
        // and UDP port of the target as the source address. The recipient of the findnode
        // packet would then send a neighbors packet (which is a much bigger packet than
        // findnode) to the victim.
        return errUnknownNode
    }
    target := crypto.Keccak256Hash(req.Target[:])
    t.mutex.Lock()
    closest := t.closest(target, bucketSize).entries
    t.mutex.Unlock()

    p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
    var sent bool
    // Send neighbors in chunks with at most maxNeighbors per packet
    // to stay below the 1280 byte limit.
    // 以块为单位发送最多maxNeighbors个块,以保持1280bytes的传输限制
    for _, n := range closest {
        if netutil.CheckRelayIP(from.IP, n.IP) == nil {
            p.Nodes = append(p.Nodes, nodeToRPC(n))
        }
        if len(p.Nodes) == maxNeighbors {
            t.send(from, neighborsPacket, &p)
            p.Nodes = p.Nodes[:0]
            sent = true
        }
    }
    if len(p.Nodes) > 0 || !sent {
        t.send(from, neighborsPacket, &p)
    }
    return nil
}

func (req *findnode) name() string { return "FINDNODE/v4" }

// handle neighbors
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
    if expired(req.Expiration) {
        return errExpired
    }
    if !t.handleReply(fromID, neighborsPacket, req) {
        return errUnsolicitedReply
    }
    return nil
}

还有就是网络通信中发送的消息包都是经过RLPx编码后发送的。

func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) {
    b := new(bytes.Buffer)
    b.Write(headSpace)
    b.WriteByte(ptype)
    if err := rlp.Encode(b, req); err != nil {
        log.Error("Can't encode discv4 packet", "err", err)
        return nil, nil, err
    }
    packet = b.Bytes()
    sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
    if err != nil {
        log.Error("Can't sign discv4 packet", "err", err)
        return nil, nil, err
    }
    copy(packet[macSize:], sig)
    // add the hash to the front. Note: this doesn't protect the
    // packet in any way. Our public key will be part of this hash in
    // The future.
    hash = crypto.Keccak256(packet[macSize:])
    copy(packet, hash)
    return packet, hash, nil
}
...
func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
    if len(buf) < headSize+1 {
        return nil, NodeID{}, nil, errPacketTooSmall
    }
    hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
    shouldhash := crypto.Keccak256(buf[macSize:])
    if !bytes.Equal(hash, shouldhash) {
        return nil, NodeID{}, nil, errBadHash
    }
    fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
    if err != nil {
        return nil, NodeID{}, hash, err
    }
    var req packet
    switch ptype := sigdata[0]; ptype {
    case pingPacket:
        req = new(ping)
    case pongPacket:
        req = new(pong)
    case findnodePacket:
        req = new(findnode)
    case neighborsPacket:
        req = new(neighbors)
    default:
        return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
    }
    s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
    err = s.Decode(req)
    return req, fromID, hash, err
}

今天基本将p2p中节点发现的源码看了,明天接着看剩下的源码。

更多以太坊源码解析请移驾全球最大同性交友网,觉得有用记得给个小star哦

.
.
.
.

互联网颠覆世界,区块链颠覆互联网!

--------------------------------------------------20181103 17:58

相关文章

网友评论

    本文标题:以太坊源码研读0xa1 p2p实现(上)

    本文链接:https://www.haomeiwen.com/subject/upwvtqtx.html