美文网首页区块链-HyperLedger Fabric
HyperLedgerFabric源码解读(4)-batcher

HyperLedgerFabric源码解读(4)-batcher

作者: 神奇的考拉 | 来源:发表于2018-11-20 16:07 被阅读1次

    关于signedGossipMessage的emit组件,以及batchingEmitterImpl

    // 提交签名gossip消息后的回调函数
    type emitBatchCallback func([]interface{})
    
    // batchingEmitter常用语gossip的提交和转发阶段
    //   消息被添加批处理发射器中,接着定期分批被转发T次,然后丢弃
    //   若是batchingEmitter中存放的message数达到了一定的容量,将会触发消息分派
    //batchingEmitter is used for the gossip push/forwarding phase.
    // Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded.
    // If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch
    type batchingEmitter interface {
        // 将一个消息添加到批次里
        // Add adds a message to be batched
        Add(interface{})
    
        // 停止组件
        // Stop stops the component
        Stop()
    
        // 待发送消息的数量
        // Size returns the amount of pending messages to be emitted
        Size() int
    }
    
    // newBatchingEmitter accepts the following parameters:
    // iterations: number of times each message is forwarded
    // burstSize: a threshold that triggers a forwarding because of message count
    // latency: the maximum delay that each message can be stored without being forwarded
    // cb: a callback that is called in order for the forwarding to take place
    func newBatchingEmitter(iterations,  // 每次各自消息被转发的次数
        burstSize int,                    // 每次触发消息转发的消息总量的阈值
        latency time.Duration,            // 每条消息被存放而不被转发的延迟周期
        cb emitBatchCallback) batchingEmitter { // 为了进行转发而调用的回调函数
        // 消息被转发的次数不能 < 0
        if iterations < 0 {
            panic(errors.Errorf("Got a negative iterations number"))
        }
    
        // batchingEmitterImpl batchingEmitter实现
        p := &batchingEmitterImpl{
            cb:         cb,             // 转发而进行的回调函数
            delay:      latency,        // batch中消息存放的有效期
            iterations: iterations,     // 每次转发消息时每条消息被转发的次数
            burstSize:  burstSize,      // 转发消息触发的消息容量阈值
            lock:       &sync.Mutex{},  //
            buff:       make([]*batchedMessage, 0),  // batchmessage缓存
            stopFlag:   int32(0),                    // batchingEmitter停止的标记
        }
    
        if iterations != 0 {                        // 定期指定消息Emit 启动一个goroutine执行
            go p.periodicEmit()
        }
    
        return p
    }
    
    // 周期性执行消息emit
    // 循环判断当前batchingEmitter是否停止
    // 首先延迟delay周期 保证batch中的消息已超出缓存在batch最大有效期
    // 获取到已超出缓存在batch最大有效期的消息执行提交同时保证执行过程在lock环境下
    func (p *batchingEmitterImpl) periodicEmit() {
        for !p.toDie() {
            time.Sleep(p.delay)
            p.lock.Lock()
            p.emit()
            p.lock.Unlock()
        }
    }
    
    // 提交消息到其他peer
    func (p *batchingEmitterImpl) emit() {
        // 保证提交消息时batchingEmitter仍可用
        if p.toDie() {
            return
        }
        if len(p.buff) == 0 {  // batch的buffer里面没有可提交的消息
            return
        }
        msgs2beEmitted := make([]interface{}, len(p.buff)) // 本地emit数据缓存
        for i, v := range p.buff {
            msgs2beEmitted[i] = v.data
        }
    
        p.cb(msgs2beEmitted)   // 执行回调函数
        p.decrementCounters()  // 减少已提交的消息量
    }
    
    // 减少batch中已提交的消息数  保证batch消息量都是未提交的
    func (p *batchingEmitterImpl) decrementCounters() {
        //
        n := len(p.buff)
        for i := 0; i < n; i++ {
            msg := p.buff[i]
            msg.iterationsLeft--
            if msg.iterationsLeft == 0 {
                p.buff = append(p.buff[:i], p.buff[i+1:]...)
                n--
                i--
            }
        }
    }
    
    // batchingEmitter是否可用标记
    func (p *batchingEmitterImpl) toDie() bool {
        return atomic.LoadInt32(&(p.stopFlag)) == int32(1)
    }
    
    // batchingEmitter实现
    type batchingEmitterImpl struct {
        iterations int
        burstSize  int
        delay      time.Duration
        cb         emitBatchCallback
        lock       *sync.Mutex
        buff       []*batchedMessage
        stopFlag   int32
    }
    
    // batch消息
    type batchedMessage struct {
        data           interface{}
        iterationsLeft int
    }
    
    // 停止
    func (p *batchingEmitterImpl) Stop() {
        atomic.StoreInt32(&(p.stopFlag), int32(1))
    }
    
    // batch中已缓存的消息总量
    func (p *batchingEmitterImpl) Size() int {
        p.lock.Lock()
        defer p.lock.Unlock()
        return len(p.buff)
    }
    
    // 添加消息到batch缓存中
    // 一旦当前batch的buffer中消息量已达到消息发送阈值  则执行emit
    func (p *batchingEmitterImpl) Add(message interface{}) {
        if p.iterations == 0 {
            return
        }
        p.lock.Lock()
        defer p.lock.Unlock()
    
        // 添加消息到batch的buffer中 等待触发发送阈值 执行emit
        p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations})
    
        if len(p.buff) >= p.burstSize {
            p.emit()
        }
    }
    

    相关文章

      网友评论

        本文标题:HyperLedgerFabric源码解读(4)-batcher

        本文链接:https://www.haomeiwen.com/subject/trjxqqtx.html