美文网首页
以太坊源码(2)——rlpx协议的握手以及对分帧加解密传送

以太坊源码(2)——rlpx协议的握手以及对分帧加解密传送

作者: Jarvist | 来源:发表于2019-04-16 22:05 被阅读0次

ethereum协议

2、P2P网络启动

(3)Server.go setupDiscovery()

初始化发现协议。Discover


func (srv *Server) setupDiscovery() error {
    if srv.NoDiscovery && !srv.DiscoveryV5 {
        return nil
    }

    addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
    if err != nil {
        return err
    }
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return err
    }
    realaddr := conn.LocalAddr().(*net.UDPAddr)
    srv.log.Debug("UDP listener up", "addr", realaddr)
    if srv.NAT != nil {
        if !realaddr.IP.IsLoopback() {
            go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, qidongrealaddr.Port, "ethereum discovery")
        }
    }
    srv.localnode.SetFallbackUDP(realaddr.Port)

    // Discovery V4
    var unhandled chan discover.ReadPacket
    var sconn *sharedUDPConn
    if !srv.NoDiscovery {
        if srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }
        cfg := discover.Config{
            PrivateKey:  srv.PrivateKey,
            NetRestrict: srv.NetRestrict,
            Bootnodes:   srv.BootstrapNodes,
            Unhandled:   unhandled,
        }
        ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
    }
    // Discovery V5
    if srv.DiscoveryV5 {
        var ntab *discv5.Network
        var err error
        if sconn != nil {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, "", srv.NetRestrict)
        } else {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, "", srv.NetRestrict)
        }
        if err != nil {
            return err
        }
        if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
            return err
        }
        srv.DiscV5 = ntab
    }
    return nil
}

基于UDP。

func (srv *Server) setupListening() error {
    // Launch the TCP listener.
    listener, err := net.Listen("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    laddr := listener.Addr().(*net.TCPAddr)
    srv.ListenAddr = laddr.String()
    srv.listener = listener
    srv.localnode.Set(enr.TCP(laddr.Port))

    srv.loopWG.Add(1)
    go srv.listenLoop()

    // Map the TCP listening port if NAT is configured.
    if !laddr.IP.IsLoopback() && srv.NAT != nil {
        srv.loopWG.Add(1)
        go func() {
            nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
            srv.loopWG.Done()
        }()
    }
    return nil
}

setuplistening 创建TCP监听。srv.listenLoop 协程循环监听socket,等待slots。直到Accept,调用setupConnect()。


// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
    defer srv.loopWG.Done()
    srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())

    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        slots <- struct{}{}
    }

    for {
        // Wait for a handshake slot before accepting.
        <-slots

        var (
            fd  net.Conn
            err error
        )
        for {
            fd, err = srv.listener.Accept()
            if netutil.IsTemporaryError(err) {
                srv.log.Debug("Temporary read error", "err", err)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                return
            }
            break
        }

        // Reject connections that do not match NetRestrict.
        if srv.NetRestrict != nil {
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
                srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
                fd.Close()
                slots <- struct{}{}
                continue
            }
        }

        var ip net.IP
        if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
            ip = tcp.IP
        }
        fd = newMeteredConn(fd, true, ip)
        srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        go func() {
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{}
        }()
    }
}

看setupConn()。正常连接使用该函数建立p2p链路。
首先要获取dial节点的public key,也就是nodeID。


func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
    // Prevent leftover pending conns from entering the handshake.
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    // If dialing, figure out the remote public key.
    var dialPubkey *ecdsa.PublicKey
    if dialDest != nil {
        dialPubkey = new(ecdsa.PublicKey)
        if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
            return errors.New("dial destination doesn't have a secp256k1 public key")
        }
    }

然后进行加密握手。

remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)

协议握手

// Run the protocol handshake
    phs, err := c.doProtoHandshake(srv.ourHandshake)
    if err != nil {
        clog.Trace("Failed proto handshake", "err", err)
        return err
    }

这里卡住了好久。。因为Ctrl+leftclick跟不到doEncHandshake和doProtoHandshake方法。最后只能全局查找,发现原来实现在p2p/rlpx.go中。可以看到初始化了encHandshake类型的变量,然后通过makeAuthMsg创建了authMsg。然后产生authPacket,传输给远端,获取authRespPacket

func (t *rlpx) doEncHandshake(prv *ecdsa.PrivateKey, dial *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
    var (
        sec secrets
        err error
    )
    if dial == nil {
        sec, err = receiverEncHandshake(t.fd, prv)
    } else {
        sec, err = initiatorEncHandshake(t.fd, prv, dial)
    }
    if err != nil {
        return nil, err
    }
    t.wmu.Lock()
    t.rw = newRLPXFrameRW(t.fd, sec)
    t.wmu.Unlock()
    return sec.Remote.ExportECDSA(), nil
}

主动拨号状态的,发起initiatorEncHandshake,否则是监听时接收的receiverEncHandshake。再跟一步看initiatorEncHandshake和receiverEncHandshake。

func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s secrets, err error) {
    //定义h为encHandshake 导入ECDSA PUBLIC KEY
    h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
    //creates the initiator handshake message.
    authMsg, err := h.makeAuthMsg(prv)
    if err != nil {
        return s, err
    }
    //rlp编码,加密也在里面进行
    authPacket, err := sealEIP8(authMsg, h)
    if err != nil {
        return s, err
    }
    if _, err = conn.Write(authPacket); err != nil {
        return s, err
    }
    //RLPX V4 handshake 回应
    authRespMsg := new(authRespV4)

    //处理接收的消息,读取authResponsePacket 解码和解密。
    authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
    if err != nil {
        return s, err
    }
    //获取握手过程中的Nonce和随机PublicKey
    if err := h.handleAuthResp(authRespMsg); err != nil {
        return s, err
    }
    //将authPacket和authRespPacket打包成共享的secret
    return h.secrets(authPacket, authRespPacket)
}

下面详细看上面的几个函数。

// makeAuthMsg creates the initiator handshake message.
func (h *encHandshake) makeAuthMsg(prv *ecdsa.PrivateKey) (*authMsgV4, error) {
    // Generate random initiator nonce.
    //为加密握手生成一个Nonce
    h.initNonce = make([]byte, shaLen)
    _, err := rand.Read(h.initNonce)
    if err != nil {
        return nil, err
    }
    // Generate random keypair to for ECDH.
    //获取随机私钥
    h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
    if err != nil {
        return nil, err
    }

    // Sign known message: static-shared-secret ^ nonce
    //生成token ,就是共享秘钥
    token, err := h.staticSharedSecret(prv)
    if err != nil {
        return nil, err
    }
    //异或
    signed := xor(token, h.initNonce)
    //随机私钥签名
    signature, err := crypto.Sign(signed, h.randomPrivKey.ExportECDSA())
    if err != nil {
        return nil, err
    }

    msg := new(authMsgV4)
    //msg包含对Nonce的签名、发起者的公钥、以及发起者的Nonce
    copy(msg.Signature[:], signature)
    copy(msg.InitiatorPubkey[:], crypto.FromECDSAPub(&prv.PublicKey)[1:])
    copy(msg.Nonce[:], h.initNonce)
    msg.Version = 4
    //返回authMsgV4对象
    return msg, nil
}


func sealEIP8(msg interface{}, h *encHandshake) ([]byte, error) {
    //对msg进行编码和添加前缀,具体前缀是啥没细看。
    buf := new(bytes.Buffer)
    if err := rlp.Encode(buf, msg); err != nil {
        return nil, err
    }
    // pad with random amount of data. the amount needs to be at least 100 bytes to make
    // the message distinguishable from pre-EIP-8 handshakes.
    pad := padSpace[:mrand.Intn(len(padSpace)-100)+100]
    buf.Write(pad)
    prefix := make([]byte, 2)
    binary.BigEndian.PutUint16(prefix, uint16(buf.Len()+eciesOverhead))
    //用远端公钥ecies加密。
    enc, err := ecies.Encrypt(rand.Reader, h.remote, buf.Bytes(), nil, prefix)
    return append(prefix, enc...), err
}

func readHandshakeMsg(msg plainDecoder, plainSize int, prv *ecdsa.PrivateKey, r io.Reader) ([]byte, error) {
    buf := make([]byte, plainSize)
    if _, err := io.ReadFull(r, buf); err != nil {
        return buf, err
    }
    // Attempt decoding pre-EIP-8 "plain" format.
    //私钥解密,尝试用EIP8以前的格式
    key := ecies.ImportECDSA(prv)
    if dec, err := key.Decrypt(buf, nil, nil); err == nil {
        msg.decodePlain(dec)
        return buf, nil
    }
    // Could be EIP-8 format, try that.
    //EIP8后,使用了RLP编解码,需要处理前缀等等。
    prefix := buf[:2]
    size := binary.BigEndian.Uint16(prefix)
    if size < uint16(plainSize) {
        return buf, fmt.Errorf("size underflow, need at least %d bytes", plainSize)
    }
    buf = append(buf, make([]byte, size-uint16(plainSize)+2)...)
    if _, err := io.ReadFull(r, buf[plainSize:]); err != nil {
        return buf, err
    }
    //同样的私钥解码
    dec, err := key.Decrypt(buf[2:], nil, prefix)
    if err != nil {
        return buf, err
    }
    // Can't use rlp.DecodeBytes here because it rejects
    // trailing data (forward-compatibility).
    s := rlp.NewStream(bytes.NewReader(dec), 0)
    //返回的buf是authRespPacket -- buffer类型 ,s.Decode将buffer Decode,然后返回的是authRespV4对象。
    return buf, s.Decode(msg)
}

func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) {
    //从AuthMsg对象中取出Nonce和RandomPublicKey
    h.respNonce = msg.Nonce[:]
    h.remoteRandomPub, err = importPublicKey(msg.RandomPubkey[:])
    return err
}

//打包成加密握手的共享secret
func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
    //瞬时秘钥,只在当前链接中有效。满足前向安全。
    ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen)
    if err != nil {
        return secrets{}, err
    }

    // derive base secrets from ephemeral key agreement
    //对前面的秘钥,还有两个随机数进行hash,生成一个aes秘钥和MAC
    sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce))
    aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret)
    s := secrets{
        Remote: h.remote,
        AES:    aesSecret,
        MAC:    crypto.Keccak256(ecdheSecret, aesSecret),
    }

    // setup sha3 instances for the MACs
    //算两个mac,作为EMac和InMac,用于后续RLPX帧的MAC验证。
    mac1 := sha3.NewLegacyKeccak256()
    mac1.Write(xor(s.MAC, h.respNonce))
    mac1.Write(auth)
    mac2 := sha3.NewLegacyKeccak256()
    mac2.Write(xor(s.MAC, h.initNonce))
    mac2.Write(authResp)
    if h.initiator {
        s.EgressMAC, s.IngressMAC = mac1, mac2
    } else {
        s.EgressMAC, s.IngressMAC = mac2, mac1
    }
    //返回s
    return s, nil
}

再看接受加密请求的处理函数receiverEncHandshake。


// prv is the local client's private key.
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s secrets, err error) {
    authMsg := new(authMsgV4)
    //收到请求,先调用readHandshakeMsg进行解密解码
    authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
    if err != nil {
        return s, err
    }
    h := new(encHandshake)
    //处理发过来的请求
    if err := h.handleAuthMsg(authMsg, prv); err != nil {
        return s, err
    }
    //发起请求应答
    authRespMsg, err := h.makeAuthResp()
    if err != nil {
        return s, err
    }
    //编码成packet
    var authRespPacket []byte
    if authMsg.gotPlain {
        authRespPacket, err = authRespMsg.sealPlain(h)
    } else {
        authRespPacket, err = sealEIP8(authRespMsg, h)
    }
    if err != nil {
        return s, err
    }
    //写入传输
    if _, err = conn.Write(authRespPacket); err != nil {
        return s, err
    }
    //握手包打包
    return h.secrets(authPacket, authRespPacket)
}

对发来的请求进行处理,获取发起者的nonce以及随机公钥。


func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
    // Import the remote identity.
    rpub, err := importPublicKey(msg.InitiatorPubkey[:])
    if err != nil {
        return err
    }
    //这里的initNonce就是发起者发过来的Nonce
    h.initNonce = msg.Nonce[:]
    //发起者公钥
    h.remote = rpub

 ,
    // Generate random keypair for ECDH.
    // If a private key is already set, use it instead of generating one (for testing).
    if h.randomPrivKey == nil {
        //   //生成自己的随机私钥
        h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
        if err != nil {
            return err
        }
    }
    //通过ECDH算出共享秘钥token
    // Check the signature.
    token, err := h.staticSharedSecret(prv)
    if err != nil {
        return err
    }
    //异或,从明文以及签名推出发起者的随机公钥。
    signedMsg := xor(token, h.initNonce)
    remoteRandomPub, err := secp256k1.RecoverPubkey(signedMsg, msg.Signature[:])
    if err != nil {
        return err
    }
    //发起者随机公钥放入握手中
    h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
    return nil
}

准备应答请求的makeAuthResp,生成被动方的nonce,将自己的Nonce以及公钥都放入Msg中。


func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) {
    // Generate random nonce.
    h.respNonce = make([]byte, shaLen)
    if _, err = rand.Read(h.respNonce); err != nil {
        return nil, err
    }
    //authResponse 包括,接受者自己生成的Nonce,自己的随机公钥
    msg = new(authRespV4)
    copy(msg.Nonce[:], h.respNonce)
    copy(msg.RandomPubkey[:], exportPubkey(&h.randomPrivKey.PublicKey))
    msg.Version = 4
    return msg, nil
}

至此,加密握手完毕。
开始分帧传输。主要是通过WriteMsg和ReadMsg两个函数实现,给上层的p2p提供支持。

帧结构如下:

frame = header || header-mac || frame-data || frame-mac
header = frame-size || header-data || padding
frame-size = size of frame excluding padding, integer < 2**24, big endian
header-data = rlp.list(protocol-type[, context-id])
protocol-type = integer < 2**16, big endian
context-id = integer < 2**16, big endian
padding = zero-fill to 16-byte boundary
frame-content = any binary data

header-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ header-ciphertext).digest
frame-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ left16(egress-mac.update(frame-ciphertext).digest))
egress-mac = keccak256 state, continuously updated with egress bytes
ingress-mac = keccak256 state, continuously updated with ingress bytes

left16(x) is the first 16 bytes of x
|| is concatenate
^ is xor

新建的帧函数

func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
    //HMAC的秘钥
    macc, err := aes.NewCipher(s.MAC)
    if err != nil {
        panic("invalid MAC secret: " + err.Error())
    }
    //aes 秘钥
    encc, err := aes.NewCipher(s.AES)
    if err != nil {
        panic("invalid AES secret: " + err.Error())
    }
    // we use an all-zeroes IV for AES because the key used
    // for encryption is ephemeral.
    //零向量
    iv := make([]byte, encc.BlockSize())
    return &rlpxFrameRW{
        conn:       conn,
        enc:        cipher.NewCTR(encc, iv),
        dec:        cipher.NewCTR(encc, iv),
        macCipher:  macc,
        egressMAC:  s.EgressMAC,
        ingressMAC: s.IngressMAC,
    }
}

WriteMsg看一下


func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
    //将msg的code(应该指状态码)RLP编码成byte流
    ptype, _ := rlp.EncodeToBytes(msg.Code)

    // if snappy is enabled, compress message now
    //snappy 好像是个数据流压缩方式
    if rw.snappy {
        if msg.Size > maxUint24 {
            return errPlainMessageTooLarge
        }
        payload, _ := ioutil.ReadAll(msg.Payload)
        payload = snappy.Encode(nil, payload)

        msg.Payload = bytes.NewReader(payload)
        msg.Size = uint32(len(payload))
    }
    // write header
    //帧的header 32byte
    headbuf := make([]byte, 32)
    //帧大小,包括payload大小以及Code RLP编码后的byte[]长度
    fsize := uint32(len(ptype)) + msg.Size
    if fsize > maxUint24 {
        return errors.New("message size overflows uint24")
    }
    putInt24(fsize, headbuf) // TODO: check overflow
    copy(headbuf[3:], zeroHeader)
    //头16个byte,AES256 CTR模式 和AES秘钥加密
    rw.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted

    // write header MAC
    //更新header MAC 送入conn
    copy(headbuf[16:], updateMAC(rw.egressMAC, rw.macCipher, headbuf[:16]))
    if _, err := rw.conn.Write(headbuf); err != nil {
        return err
    }

    // write encrypted frame, updating the egress MAC hash with the data written to conn.
    // rw.enc是AES秘钥,tee好像是一个StreamWriter,将msg写成byte类型, 它调用XORKeyStream来处理通过的每个数据片段,读进来应该就是
    tee := cipher.StreamWriter{S: rw.enc, W: io.MultiWriter(rw.conn, rw.egressMAC)}
    //这里通过tee.Write 调用XORKeyStream与秘钥进行XOR运算,将Msg.code进行加密
    //
    if _, err := tee.Write(ptype); err != nil {
        return err
    }
    //将payload复制到tee的Writer中,但其实因为tee的Writer实现了自己的Write函数,所以也调用了XORKeyStream进行加密
    if _, err := io.Copy(tee, msg.Payload); err != nil {
        return err
    }
    //补零补足
    if padding := fsize % 16; padding > 0 {
        if _, err := tee.Write(zero16[:16-padding]); err != nil {
            return err
        }
    }

    // write frame MAC. egress MAC hash is up to date because
    // frame content was written to it as well.
    //更新frameMAC,因为frame内容中也包括了framemac所以egressMAC会不停变化?
    fmacseed := rw.egressMAC.Sum(nil)
    //func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte 
    //mac是指frame mac
    mac := updateMAC(rw.egressMAC, rw.macCipher, fmacseed)
    _, err := rw.conn.Write(mac)
    return err
}

然后是协议握手。 商量好运行的协议的version啊名称啊等等


func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
    // Writing our handshake happens concurrently, we prefer
    // returning the handshake read error. If the remote side
    // disconnects us early with a valid reason, we should return it
    // as the error so it can be tracked elsewhere.
    werr := make(chan error, 1)
    go func() { werr <- Send(t.rw, handshakeMsg, our) }()
    if their, err = readProtocolHandshake(t.rw); err != nil {
        <-werr // make sure the write terminates too
        return nil, err
    }
    if err := <-werr; err != nil {
        return nil, fmt.Errorf("write error: %v", err)
    }
    // If the protocol version supports Snappy encoding, upgrade immediately
    t.rw.snappy = their.Version >= snappyProtocolVersion

    return their, nil
}

rlpx加解密篇完毕!!!!

相关文章

网友评论

      本文标题:以太坊源码(2)——rlpx协议的握手以及对分帧加解密传送

      本文链接:https://www.haomeiwen.com/subject/duxwwqtx.html