美文网首页区块链系统教程我爱编程
Ethereum源码阅读笔记-whisper

Ethereum源码阅读笔记-whisper

作者: 豆瓣奶茶 | 来源:发表于2018-06-14 13:25 被阅读14次

    go-ethereum/whisper

    先从whipserv6看起。

    Whisper定义

    Whisper代表了在以太坊网络中的一个隐秘(dark)通信接口,使用的是以太坊自有的P2P通信层。所谓dark,意思是没有可靠的方法可以来追踪数据包(见于Specs

    // whisper/whisperv6/whisper.go
    type Whisper struct {
        // 协议描述和参数
        protocol p2p.Protocol
        // 订阅功能安装的消息过滤器
        filters  *Filters
        // 私钥存储
        privateKeys map[string]*ecdsa.PrivateKey
        // 对称密钥存储
        symKeys     map[string][]byte
        // 和密钥存储相关的互斥锁           
        keyMu       sync.RWMutex
        // 互斥地同步消息和过期池
        poolMu      sync.RWMutex
        // 当前由此节点跟踪的信封(envelopes)池
        envelopes   map[common.Hash]*Envelope
        // 消息过期池
        expirations map[uint32]*set.SetNonTS
        
        // 互斥地同步活跃节点集合
        peerMu sync.RWMutex
        // 当前活跃节点的集合
        peers  map[*Peer]struct{}
        // 普通whisper消息的消息队列
        messageQueue chan *Envelope
        // 点对点消息的消息队列(不会再被做任何转发的消息)
        p2pMsgQueue  chan *Envelope
        // 用于温和的退出的channel
        quit         chan struct{}
        // 存储了配置项的设置信息,以便可以动态修改。
        settings syncmap.Map
        // 允许处理whisper相关消息的最大时长(以秒为单位)
        syncAllowance int
        // 指示这个节点是否是纯粹的轻客户端(不转发任何消息)
        lightClient bool
        // 用于保护stats
        statsMu sync.Mutex
        // whisper节点的统计信息
        stats   Statistics
        // MailServer接口
        mailServer MailServer
    }
    

    Whisper节点的配置Config

    由于新建一个Whisper实例,需要接收Config参数,所以先来看看Config的定义,见于config.go文件中。

    // whisper/whisperv6/config.go
    // Config描述了一个whisper节点的配置状态
    type Config struct {
        // 最大消息长度
        MaxMessageSize     uint32  `toml:",omitempty"`
        // 接受工作量证明的最小值
        MinimumAcceptedPOW float64 `toml:",omitempty"`
    }
    

    这里面提到了一个MinimumAcceptedPOW,是关于工作量证明的,我在Proof of Work找到了这个设计的初衷。

    之所以用到POW,主要是为了防止垃圾消息,也同样是为了缓解网络的压力。计算POW的成本可以理解为“你想让网络将你的消息存储一段时间(即TTL时间段),因此需要分配资源,那么你就需要为此支付价格”。所需的POW应该与消息大小和TTL成正比。

    代码中还提供了一个默认配置样例DefaultConfig

    // whisper/whisperv6/config.go
    var DefaultConfig = Config{
        MaxMessageSize:     DefaultMaxMessageSize, // uint32(1024 * 1024)
        MinimumAcceptedPOW: DefaultMinimumPoW, // 0.2
    }
    

    新建一个Whisper客户端

    搞明白了Config,开始新建一个Whisper客户端,whisper.go中的New()方法会新建一个Whisper客户端,该客户端可以在以太坊P2P网络中进行通信。

    // whisper/whisperv6/whisper.go
    func New(cfg *Config) *Whisper {
        // 如果传入的cfg是nil,则使用上面提到的DefaultConfig
        if cfg == nil {
            cfg = &DefaultConfig
        }
        // 初始化Whisper结构体
        whisper := &Whisper{
            privateKeys:   make(map[string]*ecdsa.PrivateKey),
            symKeys:       make(map[string][]byte),
            envelopes:     make(map[common.Hash]*Envelope),
            expirations:   make(map[uint32]*set.SetNonTS),
            peers:         make(map[*Peer]struct{}),
            messageQueue:  make(chan *Envelope, messageQueueLimit),
            p2pMsgQueue:   make(chan *Envelope, messageQueueLimit),
            quit:          make(chan struct{}),
            // 最长处理时间默认为10s
            syncAllowance: DefaultSyncAllowance,
        }
        // 使用NewFilters为这个whisper新建过滤器
        whisper.filters = NewFilters(whisper)
        
        // 将<minPowIdx, cfg.MinimumAcceptedPOW>放入settings内存中
        whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
        // 将<maxMsgSizeIdx, cfg.MaxMessageSize>放入settings内存中
        whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
        // 指示消息队列是否溢出
        whisper.settings.Store(overflowIdx, false)
        // p2p whisper子协议规则
        whisper.protocol = p2p.Protocol{
            Name:    ProtocolName, // "shh"
            Version: uint(ProtocolVersion), // uint(uint64(6))
            Length:  NumberOfMessageCodes, // 128
            // 启动运行
            Run:     whisper.HandlePeer,
            // 节点信息
            NodeInfo: func() interface{} {
                return map[string]interface{}{
                    "version":        ProtocolVersionStr, // "6.0"
                    // 读取whisper.settings[maxMsgSizeIdx]
                    "maxMessageSize": whisper.MaxMessageSize(),
                    // 读取whisper.settings[minPowIdx]
                    // 如果为空或出错,则返回DefaultMinimumPoW,即0.2
                    "minimumPoW":     whisper.MinPow(),
                }
            },
        }
        return whisper
    }
    

    总体来说,New()主要是创建了一个空的whisper{}结构体,并尽力初始化,比较重要的地方,就是初始化whisper.protocol子协议字段,这里面涉及到很多魔数。

    关于版本号6wiki上有这么一段话:

    Tor系统有一个协议,可以在两个节点之间建立连接,但它们互相并不知道对方在哪里,这是用于隐藏服务的Rendezvous协议。隐藏的服务(相当于TCP/IP中一个监听端口的服务)会选择随机数量的”介绍人”节点(我想,这个随机数量一般都是6)。为了做到这一点,它会和每一个介绍人都建立标准的3跳链(3-hop chain),当一个用户想和一个隐藏服务建立连接时,它们就会传播一个请求去连接与特定公钥相关的隐藏服务。

    关于消息代号数量128p2p.Protocol结构体中的Lenght字段表示某一子协议中消息代号的总数量,而在whisper/whisperv6/doc.go的常量表中可以看到根据EIP-627提案,whisper协议的消息代号共有128,故为128

    HandlePeer:发现新节点后做什么

    在新建whisper客户端时,whisper.protocol结构体用HandlerPeer()填充了Run字段,当和一个节点协商该协议时,会在一个新的goroutine上调用Run代表的方法。在whisper这里,当whisper子协议连接被协商的时候,下面的P2P层会调用HandlePeer

    // whisper/whisperv6/whisper.go
    func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
        // 新建一个whisper peer实例,接下来开始追踪它
        whisperPeer := newPeer(whisper, peer, rw)
        
        // 同步的将这个peer实例添加到追踪列表中
        whisper.peerMu.Lock()
        whisper.peers[whisperPeer] = struct{}{}
        whisper.peerMu.Unlock()
        // 当退出时,将该peer从追踪列表中删除
        defer func() {
            whisper.peerMu.Lock()
            delete(whisper.peers, whisperPeer)
            whisper.peerMu.Unlock()
        }()
        // 握手,状态更新
        if err := whisperPeer.handshake(); err != nil {
            return err
        }
        whisperPeer.start() // 启动通信
        defer whisperPeer.stop() // 退出时停止通信
        return whisper.runMessageLoop(whisperPeer, rw) // 开始消息循环处理
    }
    

    这里主要是在发现一个新的节点时,whisper协议所做的操作,大部分操作都是和Peer有关。下面就来看看whisperPeer

    whisper中的Peer

    peer.go中有关于Peer的定义,Peer代表了在whisper协议中的一个节点连接。

    // whisper/whisperv6/peer.go
    type Peer struct {
        // 本地whisper节点
        host *Whisper 
        // 远程whisper节点
        peer *p2p.Peer
        // 消息读写句柄
        ws   p2p.MsgReadWriter
        // 远程节点是否可信
        trusted        bool
        // 远程节点要求的POW值
        powRequirement float64
        // 布隆过滤器锁
        bloomMu        sync.Mutex
        // 布隆过滤器
        bloomFilter    []byte
        // 是否是全节点过滤器
        fullNode       bool
        // 存储该节点已知的消息,以避免浪费带宽
        known *set.Set // Messages already known by the peer to avoid wasting bandwidth
        // 优雅的退出连接
        quit chan struct{}
    }
    

    新建一个远程节点的本地代理Peer

    // whisper/whisperv6/peer.go
    // 新建一个whisper协议下的peer对象,但这个方法并不会去握手。
    func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
        return &Peer{
            host:           host,
            peer:           remote,
            ws:             rw,
            trusted:        false, // 默认不可信
            powRequirement: 0.0, // 初始化为0
            known:          set.New(),
            quit:           make(chan struct{}),
            bloomFilter:    MakeFullNodeBloom(), //创建一个长度为64的布隆过滤器,并且所有位初始化为0xFF
            fullNode:       true, // 已经初始化为全节点过滤器
        }
    }
    

    和远程节点握手:handshake()

    handshake()向远程节点发送协议初始化状态信息,同时也会验证远程节点的状态。状态消息有以下几点要素:

    消息代号为statusCode,即0
    消息的payload是一个列表:[whisper协议版本号(即6), 本地节点需要的最小POW值, 本地节点感兴趣的消息过滤器]
    只有版本号是强制验证的,后续2个参数都是可选的,即本地节点要求的最小POW和本地节点感兴趣的消息过滤器

    // whisper/whisperv6/peer.go
    func (peer *Peer) handshake() error {
        
        // 异步地发送握手状态消息
        errc := make(chan error, 1) // error channel
        go func() {
            pow := peer.host.MinPow() // 获取本地节点(自己)的POW最小需求值
            powConverted := math.Float64bits(pow) // 将float64转化为uint64
            // BloomFilter()为本地节点所有感兴趣的话题返回一个集成的布隆过滤器
            // 要求远程节点只能发送被通告的布隆过滤器中匹配的消息
            // 如果不匹配,则被认为是垃圾消息,并将与该远程节点断开连接。
            bloom := peer.host.BloomFilter()
            // 通过读写句柄向远程节点发送状态消息(状态码为0,表示这是一个状态消息)
            // 这个消息包含了[whisper的协议版本号,POW值,消息相关的布隆过滤器]
            errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
        }()
        
        // 获取远程节点状态数据,并且验证协议是否匹配
        packet, err := peer.ws.ReadMsg() // 从读写句柄中读取远程消息
        if err != nil {
            return err
        }
        // 正常情况下,第一个数据包的代号应该是状态码,否则就报错
        if packet.Code != statusCode {
            return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
        }
        // 数据包是rlp序列化格式,需要进行解码
        s := rlp.NewStream(packet.Payload, uint64(packet.Size))
        _, err = s.List()
        if err != nil {
            return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
        }
        // 读取前8个字节,作为协议版本号
        //(上面也看到了,状态消息的格式为一个列表:[whisper的协议版本号,POW值,消息相关的布隆过滤器])
        peerVersion, err := s.Uint()
        if err != nil {
            return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
        }
        // 如果获取到的协议版本号不是uint64(6),那么说明协议不匹配。
        if peerVersion != ProtocolVersion {
            return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
        }
        // 只有版本号是强制要求的,后续的参数都是可选的
        powRaw, err := s.Uint() // 继续读取8个字节,作为POW值
        // 如果没出错,则验证pow和bloomfilter;如果出错,也没什么,继续往下走,即后续参数是可选的
        if err == nil {
            pow := math.Float64frombits(powRaw) // uint64 -> float64
            // pow无穷大,不是数字,小于0,都会报错
            if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
                return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
            }
            // 用pow更新peer.powRequirement
            peer.powRequirement = pow
            var bloom []byte
            err = s.Decode(&bloom) //将剩余的部分全部解码,并存至bloom[]中
            if err == nil {
                sz := len(bloom)
                // 初始化时,布隆过滤器的长度都是BloomFilterSize,即64,否则就验证报错。
                if sz != BloomFilterSize && sz != 0 {
                    return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
                }
                // 用解码后的bloom更新peer.bloomFilter,以及peer.fullNode
                peer.setBloomFilter(bloom)
            }
        }
        
        // 阻塞等待通道是否返回错误,如果出错,则报错。
        if err := <-errc; err != nil {
            return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
        }
        return nil
    }
    

    开始通信:start()

    start()初始化

    原文链接:http://www.huamo.online/2018/04/11/Ethereum%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-whisper/

    相关文章

      网友评论

        本文标题:Ethereum源码阅读笔记-whisper

        本文链接:https://www.haomeiwen.com/subject/gyqyeftx.html