ethereum协议
2、P2P网络启动
(3)Server.go setupDiscovery()
初始化发现协议。Discover
func (srv *Server) setupDiscovery() error {
if srv.NoDiscovery && !srv.DiscoveryV5 {
return nil
}
addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
realaddr := conn.LocalAddr().(*net.UDPAddr)
srv.log.Debug("UDP listener up", "addr", realaddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, qidongrealaddr.Port, "ethereum discovery")
}
}
srv.localnode.SetFallbackUDP(realaddr.Port)
// Discovery V4
var unhandled chan discover.ReadPacket
var sconn *sharedUDPConn
if !srv.NoDiscovery {
if srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
// Discovery V5
if srv.DiscoveryV5 {
var ntab *discv5.Network
var err error
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, "", srv.NetRestrict)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, "", srv.NetRestrict)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}
return nil
}
基于UDP。
func (srv *Server) setupListening() error {
// Launch the TCP listener.
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
srv.localnode.Set(enr.TCP(laddr.Port))
srv.loopWG.Add(1)
go srv.listenLoop()
// Map the TCP listening port if NAT is configured.
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
return nil
}
setuplistening 创建TCP监听。srv.listenLoop 协程循环监听socket,等待slots。直到Accept,调用setupConnect()。
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
for {
// Wait for a handshake slot before accepting.
<-slots
var (
fd net.Conn
err error
)
for {
fd, err = srv.listener.Accept()
if netutil.IsTemporaryError(err) {
srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
return
}
break
}
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
}
}
var ip net.IP
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
ip = tcp.IP
}
fd = newMeteredConn(fd, true, ip)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
看setupConn()。正常连接使用该函数建立p2p链路。
首先要获取dial节点的public key,也就是nodeID。
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
// If dialing, figure out the remote public key.
var dialPubkey *ecdsa.PublicKey
if dialDest != nil {
dialPubkey = new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
return errors.New("dial destination doesn't have a secp256k1 public key")
}
}
然后进行加密握手。
remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)
协议握手
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
return err
}
这里卡住了好久。。因为Ctrl+leftclick跟不到doEncHandshake和doProtoHandshake方法。最后只能全局查找,发现原来实现在p2p/rlpx.go中。可以看到初始化了encHandshake类型的变量,然后通过makeAuthMsg创建了authMsg。然后产生authPacket,传输给远端,获取authRespPacket
func (t *rlpx) doEncHandshake(prv *ecdsa.PrivateKey, dial *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
var (
sec secrets
err error
)
if dial == nil {
sec, err = receiverEncHandshake(t.fd, prv)
} else {
sec, err = initiatorEncHandshake(t.fd, prv, dial)
}
if err != nil {
return nil, err
}
t.wmu.Lock()
t.rw = newRLPXFrameRW(t.fd, sec)
t.wmu.Unlock()
return sec.Remote.ExportECDSA(), nil
}
主动拨号状态的,发起initiatorEncHandshake,否则是监听时接收的receiverEncHandshake。再跟一步看initiatorEncHandshake和receiverEncHandshake。
func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s secrets, err error) {
//定义h为encHandshake 导入ECDSA PUBLIC KEY
h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
//creates the initiator handshake message.
authMsg, err := h.makeAuthMsg(prv)
if err != nil {
return s, err
}
//rlp编码,加密也在里面进行
authPacket, err := sealEIP8(authMsg, h)
if err != nil {
return s, err
}
if _, err = conn.Write(authPacket); err != nil {
return s, err
}
//RLPX V4 handshake 回应
authRespMsg := new(authRespV4)
//处理接收的消息,读取authResponsePacket 解码和解密。
authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
if err != nil {
return s, err
}
//获取握手过程中的Nonce和随机PublicKey
if err := h.handleAuthResp(authRespMsg); err != nil {
return s, err
}
//将authPacket和authRespPacket打包成共享的secret
return h.secrets(authPacket, authRespPacket)
}
下面详细看上面的几个函数。
// makeAuthMsg creates the initiator handshake message.
func (h *encHandshake) makeAuthMsg(prv *ecdsa.PrivateKey) (*authMsgV4, error) {
// Generate random initiator nonce.
//为加密握手生成一个Nonce
h.initNonce = make([]byte, shaLen)
_, err := rand.Read(h.initNonce)
if err != nil {
return nil, err
}
// Generate random keypair to for ECDH.
//获取随机私钥
h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
if err != nil {
return nil, err
}
// Sign known message: static-shared-secret ^ nonce
//生成token ,就是共享秘钥
token, err := h.staticSharedSecret(prv)
if err != nil {
return nil, err
}
//异或
signed := xor(token, h.initNonce)
//随机私钥签名
signature, err := crypto.Sign(signed, h.randomPrivKey.ExportECDSA())
if err != nil {
return nil, err
}
msg := new(authMsgV4)
//msg包含对Nonce的签名、发起者的公钥、以及发起者的Nonce
copy(msg.Signature[:], signature)
copy(msg.InitiatorPubkey[:], crypto.FromECDSAPub(&prv.PublicKey)[1:])
copy(msg.Nonce[:], h.initNonce)
msg.Version = 4
//返回authMsgV4对象
return msg, nil
}
func sealEIP8(msg interface{}, h *encHandshake) ([]byte, error) {
//对msg进行编码和添加前缀,具体前缀是啥没细看。
buf := new(bytes.Buffer)
if err := rlp.Encode(buf, msg); err != nil {
return nil, err
}
// pad with random amount of data. the amount needs to be at least 100 bytes to make
// the message distinguishable from pre-EIP-8 handshakes.
pad := padSpace[:mrand.Intn(len(padSpace)-100)+100]
buf.Write(pad)
prefix := make([]byte, 2)
binary.BigEndian.PutUint16(prefix, uint16(buf.Len()+eciesOverhead))
//用远端公钥ecies加密。
enc, err := ecies.Encrypt(rand.Reader, h.remote, buf.Bytes(), nil, prefix)
return append(prefix, enc...), err
}
func readHandshakeMsg(msg plainDecoder, plainSize int, prv *ecdsa.PrivateKey, r io.Reader) ([]byte, error) {
buf := make([]byte, plainSize)
if _, err := io.ReadFull(r, buf); err != nil {
return buf, err
}
// Attempt decoding pre-EIP-8 "plain" format.
//私钥解密,尝试用EIP8以前的格式
key := ecies.ImportECDSA(prv)
if dec, err := key.Decrypt(buf, nil, nil); err == nil {
msg.decodePlain(dec)
return buf, nil
}
// Could be EIP-8 format, try that.
//EIP8后,使用了RLP编解码,需要处理前缀等等。
prefix := buf[:2]
size := binary.BigEndian.Uint16(prefix)
if size < uint16(plainSize) {
return buf, fmt.Errorf("size underflow, need at least %d bytes", plainSize)
}
buf = append(buf, make([]byte, size-uint16(plainSize)+2)...)
if _, err := io.ReadFull(r, buf[plainSize:]); err != nil {
return buf, err
}
//同样的私钥解码
dec, err := key.Decrypt(buf[2:], nil, prefix)
if err != nil {
return buf, err
}
// Can't use rlp.DecodeBytes here because it rejects
// trailing data (forward-compatibility).
s := rlp.NewStream(bytes.NewReader(dec), 0)
//返回的buf是authRespPacket -- buffer类型 ,s.Decode将buffer Decode,然后返回的是authRespV4对象。
return buf, s.Decode(msg)
}
func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) {
//从AuthMsg对象中取出Nonce和RandomPublicKey
h.respNonce = msg.Nonce[:]
h.remoteRandomPub, err = importPublicKey(msg.RandomPubkey[:])
return err
}
//打包成加密握手的共享secret
func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
//瞬时秘钥,只在当前链接中有效。满足前向安全。
ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen)
if err != nil {
return secrets{}, err
}
// derive base secrets from ephemeral key agreement
//对前面的秘钥,还有两个随机数进行hash,生成一个aes秘钥和MAC
sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce))
aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret)
s := secrets{
Remote: h.remote,
AES: aesSecret,
MAC: crypto.Keccak256(ecdheSecret, aesSecret),
}
// setup sha3 instances for the MACs
//算两个mac,作为EMac和InMac,用于后续RLPX帧的MAC验证。
mac1 := sha3.NewLegacyKeccak256()
mac1.Write(xor(s.MAC, h.respNonce))
mac1.Write(auth)
mac2 := sha3.NewLegacyKeccak256()
mac2.Write(xor(s.MAC, h.initNonce))
mac2.Write(authResp)
if h.initiator {
s.EgressMAC, s.IngressMAC = mac1, mac2
} else {
s.EgressMAC, s.IngressMAC = mac2, mac1
}
//返回s
return s, nil
}
再看接受加密请求的处理函数receiverEncHandshake。
// prv is the local client's private key.
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s secrets, err error) {
authMsg := new(authMsgV4)
//收到请求,先调用readHandshakeMsg进行解密解码
authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
if err != nil {
return s, err
}
h := new(encHandshake)
//处理发过来的请求
if err := h.handleAuthMsg(authMsg, prv); err != nil {
return s, err
}
//发起请求应答
authRespMsg, err := h.makeAuthResp()
if err != nil {
return s, err
}
//编码成packet
var authRespPacket []byte
if authMsg.gotPlain {
authRespPacket, err = authRespMsg.sealPlain(h)
} else {
authRespPacket, err = sealEIP8(authRespMsg, h)
}
if err != nil {
return s, err
}
//写入传输
if _, err = conn.Write(authRespPacket); err != nil {
return s, err
}
//握手包打包
return h.secrets(authPacket, authRespPacket)
}
对发来的请求进行处理,获取发起者的nonce以及随机公钥。
func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
// Import the remote identity.
rpub, err := importPublicKey(msg.InitiatorPubkey[:])
if err != nil {
return err
}
//这里的initNonce就是发起者发过来的Nonce
h.initNonce = msg.Nonce[:]
//发起者公钥
h.remote = rpub
,
// Generate random keypair for ECDH.
// If a private key is already set, use it instead of generating one (for testing).
if h.randomPrivKey == nil {
// //生成自己的随机私钥
h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
if err != nil {
return err
}
}
//通过ECDH算出共享秘钥token
// Check the signature.
token, err := h.staticSharedSecret(prv)
if err != nil {
return err
}
//异或,从明文以及签名推出发起者的随机公钥。
signedMsg := xor(token, h.initNonce)
remoteRandomPub, err := secp256k1.RecoverPubkey(signedMsg, msg.Signature[:])
if err != nil {
return err
}
//发起者随机公钥放入握手中
h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
return nil
}
准备应答请求的makeAuthResp,生成被动方的nonce,将自己的Nonce以及公钥都放入Msg中。
func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) {
// Generate random nonce.
h.respNonce = make([]byte, shaLen)
if _, err = rand.Read(h.respNonce); err != nil {
return nil, err
}
//authResponse 包括,接受者自己生成的Nonce,自己的随机公钥
msg = new(authRespV4)
copy(msg.Nonce[:], h.respNonce)
copy(msg.RandomPubkey[:], exportPubkey(&h.randomPrivKey.PublicKey))
msg.Version = 4
return msg, nil
}
至此,加密握手完毕。
开始分帧传输。主要是通过WriteMsg和ReadMsg两个函数实现,给上层的p2p提供支持。
帧结构如下:
frame = header || header-mac || frame-data || frame-mac
header = frame-size || header-data || padding
frame-size = size of frame excluding padding, integer < 2**24, big endian
header-data = rlp.list(protocol-type[, context-id])
protocol-type = integer < 2**16, big endian
context-id = integer < 2**16, big endian
padding = zero-fill to 16-byte boundary
frame-content = any binary data
header-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ header-ciphertext).digest
frame-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ left16(egress-mac.update(frame-ciphertext).digest))
egress-mac = keccak256 state, continuously updated with egress bytes
ingress-mac = keccak256 state, continuously updated with ingress bytes
left16(x) is the first 16 bytes of x
|| is concatenate
^ is xor
新建的帧函数
func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
//HMAC的秘钥
macc, err := aes.NewCipher(s.MAC)
if err != nil {
panic("invalid MAC secret: " + err.Error())
}
//aes 秘钥
encc, err := aes.NewCipher(s.AES)
if err != nil {
panic("invalid AES secret: " + err.Error())
}
// we use an all-zeroes IV for AES because the key used
// for encryption is ephemeral.
//零向量
iv := make([]byte, encc.BlockSize())
return &rlpxFrameRW{
conn: conn,
enc: cipher.NewCTR(encc, iv),
dec: cipher.NewCTR(encc, iv),
macCipher: macc,
egressMAC: s.EgressMAC,
ingressMAC: s.IngressMAC,
}
}
WriteMsg看一下
func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
//将msg的code(应该指状态码)RLP编码成byte流
ptype, _ := rlp.EncodeToBytes(msg.Code)
// if snappy is enabled, compress message now
//snappy 好像是个数据流压缩方式
if rw.snappy {
if msg.Size > maxUint24 {
return errPlainMessageTooLarge
}
payload, _ := ioutil.ReadAll(msg.Payload)
payload = snappy.Encode(nil, payload)
msg.Payload = bytes.NewReader(payload)
msg.Size = uint32(len(payload))
}
// write header
//帧的header 32byte
headbuf := make([]byte, 32)
//帧大小,包括payload大小以及Code RLP编码后的byte[]长度
fsize := uint32(len(ptype)) + msg.Size
if fsize > maxUint24 {
return errors.New("message size overflows uint24")
}
putInt24(fsize, headbuf) // TODO: check overflow
copy(headbuf[3:], zeroHeader)
//头16个byte,AES256 CTR模式 和AES秘钥加密
rw.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted
// write header MAC
//更新header MAC 送入conn
copy(headbuf[16:], updateMAC(rw.egressMAC, rw.macCipher, headbuf[:16]))
if _, err := rw.conn.Write(headbuf); err != nil {
return err
}
// write encrypted frame, updating the egress MAC hash with the data written to conn.
// rw.enc是AES秘钥,tee好像是一个StreamWriter,将msg写成byte类型, 它调用XORKeyStream来处理通过的每个数据片段,读进来应该就是
tee := cipher.StreamWriter{S: rw.enc, W: io.MultiWriter(rw.conn, rw.egressMAC)}
//这里通过tee.Write 调用XORKeyStream与秘钥进行XOR运算,将Msg.code进行加密
//
if _, err := tee.Write(ptype); err != nil {
return err
}
//将payload复制到tee的Writer中,但其实因为tee的Writer实现了自己的Write函数,所以也调用了XORKeyStream进行加密
if _, err := io.Copy(tee, msg.Payload); err != nil {
return err
}
//补零补足
if padding := fsize % 16; padding > 0 {
if _, err := tee.Write(zero16[:16-padding]); err != nil {
return err
}
}
// write frame MAC. egress MAC hash is up to date because
// frame content was written to it as well.
//更新frameMAC,因为frame内容中也包括了framemac所以egressMAC会不停变化?
fmacseed := rw.egressMAC.Sum(nil)
//func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte
//mac是指frame mac
mac := updateMAC(rw.egressMAC, rw.macCipher, fmacseed)
_, err := rw.conn.Write(mac)
return err
}
然后是协议握手。 商量好运行的协议的version啊名称啊等等
func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
// Writing our handshake happens concurrently, we prefer
// returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it
// as the error so it can be tracked elsewhere.
werr := make(chan error, 1)
go func() { werr <- Send(t.rw, handshakeMsg, our) }()
if their, err = readProtocolHandshake(t.rw); err != nil {
<-werr // make sure the write terminates too
return nil, err
}
if err := <-werr; err != nil {
return nil, fmt.Errorf("write error: %v", err)
}
// If the protocol version supports Snappy encoding, upgrade immediately
t.rw.snappy = their.Version >= snappyProtocolVersion
return their, nil
}
rlpx加解密篇完毕!!!!
网友评论