关于signedGossipMessage的emit组件,以及batchingEmitterImpl
// 提交签名gossip消息后的回调函数
type emitBatchCallback func([]interface{})
// batchingEmitter常用语gossip的提交和转发阶段
// 消息被添加批处理发射器中,接着定期分批被转发T次,然后丢弃
// 若是batchingEmitter中存放的message数达到了一定的容量,将会触发消息分派
//batchingEmitter is used for the gossip push/forwarding phase.
// Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded.
// If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch
type batchingEmitter interface {
// 将一个消息添加到批次里
// Add adds a message to be batched
Add(interface{})
// 停止组件
// Stop stops the component
Stop()
// 待发送消息的数量
// Size returns the amount of pending messages to be emitted
Size() int
}
// newBatchingEmitter accepts the following parameters:
// iterations: number of times each message is forwarded
// burstSize: a threshold that triggers a forwarding because of message count
// latency: the maximum delay that each message can be stored without being forwarded
// cb: a callback that is called in order for the forwarding to take place
func newBatchingEmitter(iterations, // 每次各自消息被转发的次数
burstSize int, // 每次触发消息转发的消息总量的阈值
latency time.Duration, // 每条消息被存放而不被转发的延迟周期
cb emitBatchCallback) batchingEmitter { // 为了进行转发而调用的回调函数
// 消息被转发的次数不能 < 0
if iterations < 0 {
panic(errors.Errorf("Got a negative iterations number"))
}
// batchingEmitterImpl batchingEmitter实现
p := &batchingEmitterImpl{
cb: cb, // 转发而进行的回调函数
delay: latency, // batch中消息存放的有效期
iterations: iterations, // 每次转发消息时每条消息被转发的次数
burstSize: burstSize, // 转发消息触发的消息容量阈值
lock: &sync.Mutex{}, //
buff: make([]*batchedMessage, 0), // batchmessage缓存
stopFlag: int32(0), // batchingEmitter停止的标记
}
if iterations != 0 { // 定期指定消息Emit 启动一个goroutine执行
go p.periodicEmit()
}
return p
}
// 周期性执行消息emit
// 循环判断当前batchingEmitter是否停止
// 首先延迟delay周期 保证batch中的消息已超出缓存在batch最大有效期
// 获取到已超出缓存在batch最大有效期的消息执行提交同时保证执行过程在lock环境下
func (p *batchingEmitterImpl) periodicEmit() {
for !p.toDie() {
time.Sleep(p.delay)
p.lock.Lock()
p.emit()
p.lock.Unlock()
}
}
// 提交消息到其他peer
func (p *batchingEmitterImpl) emit() {
// 保证提交消息时batchingEmitter仍可用
if p.toDie() {
return
}
if len(p.buff) == 0 { // batch的buffer里面没有可提交的消息
return
}
msgs2beEmitted := make([]interface{}, len(p.buff)) // 本地emit数据缓存
for i, v := range p.buff {
msgs2beEmitted[i] = v.data
}
p.cb(msgs2beEmitted) // 执行回调函数
p.decrementCounters() // 减少已提交的消息量
}
// 减少batch中已提交的消息数 保证batch消息量都是未提交的
func (p *batchingEmitterImpl) decrementCounters() {
//
n := len(p.buff)
for i := 0; i < n; i++ {
msg := p.buff[i]
msg.iterationsLeft--
if msg.iterationsLeft == 0 {
p.buff = append(p.buff[:i], p.buff[i+1:]...)
n--
i--
}
}
}
// batchingEmitter是否可用标记
func (p *batchingEmitterImpl) toDie() bool {
return atomic.LoadInt32(&(p.stopFlag)) == int32(1)
}
// batchingEmitter实现
type batchingEmitterImpl struct {
iterations int
burstSize int
delay time.Duration
cb emitBatchCallback
lock *sync.Mutex
buff []*batchedMessage
stopFlag int32
}
// batch消息
type batchedMessage struct {
data interface{}
iterationsLeft int
}
// 停止
func (p *batchingEmitterImpl) Stop() {
atomic.StoreInt32(&(p.stopFlag), int32(1))
}
// batch中已缓存的消息总量
func (p *batchingEmitterImpl) Size() int {
p.lock.Lock()
defer p.lock.Unlock()
return len(p.buff)
}
// 添加消息到batch缓存中
// 一旦当前batch的buffer中消息量已达到消息发送阈值 则执行emit
func (p *batchingEmitterImpl) Add(message interface{}) {
if p.iterations == 0 {
return
}
p.lock.Lock()
defer p.lock.Unlock()
// 添加消息到batch的buffer中 等待触发发送阈值 执行emit
p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations})
if len(p.buff) >= p.burstSize {
p.emit()
}
}
网友评论