美文网首页
go-diodes源码阅读--多生产者,单消费者无锁队列(2)

go-diodes源码阅读--多生产者,单消费者无锁队列(2)

作者: guonaihong | 来源:发表于2020-02-18 13:21 被阅读0次

go go go继续看源码

上一次看了最后发现是单go程生产,单go程消费的无锁队列。在go-diodes的代码里面翻了翻,竟然还有多生产者,单消费者的无锁队列,后面我们一块,趁着吃午饭的时间,继续研究下。

无锁队列数据结构

  • 很精简,其中writeIndex uint64 标记生产都的位置
  • readIndex uint64 标记消费都的位置
  • alerter Alerter 丢数据调用的回调函数
  • buffer []unsafe.Pointer 存放任意类型的指针slice,这点学习下
// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
    writeIndex uint64
    buffer     []unsafe.Pointer
    readIndex  uint64
    alerter    Alerter
}

初始化函数,NewManyToOne(size int, alerter Alerter)

  • if alerter == nil 开头代码块,只是给异常处理函数默认值。
  • d := &ManyToOne{ 这里是go里面声明和初始化指针变量的一个语法糖。
  • d.writeIndex = ^d.writeIndex 把d.witeIndex每个bit位置为1,go里面的uint64是8字节,那d.writeIndex的值就是0xFFFFFFFFFFFFFFFF。
// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
    if alerter == nil {
        alerter = AlertFunc(func(int) {})
    }

    d := &ManyToOne{
        buffer:  make([]unsafe.Pointer, size),
        alerter: alerter,
    }

    // Start write index at the value before 0
    // to allow the first write to use AddUint64
    // and still have a beginning index of 0
    d.writeIndex = ^d.writeIndex
    return d
}

Write: .Set函数(可以在多个go程里面安全使用)

无锁队列里面的黑科技

怎么在不使用sync.Mutex的提前下,如何做到线程安全。答案就是atomic,go里面 "sync/atomic"包下面的函数编译时会转成对应平台(arm/x86/amd64等)的汇编,比如原子加,减等等。

分析细节

取值

  • writeIndex := atomic.AddUint64(&d.writeIndex, 1) 是原子自加函数,
  • idx := writeIndex % uint64(len(d.buffer))计算想写入的下标。
  • old := atomic.LoadPointer(&d.buffer[idx])原子赋值语句,等同加锁版本的 old := &d.buffer[idex])

判断

if old != nil && 打头是一些安全验证的代码,消费者会置空取过的值,这时候还取到值,说明还没有被消费过。

// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
    for {
        writeIndex := atomic.AddUint64(&d.writeIndex, 1)
        idx := writeIndex % uint64(len(d.buffer))
        old := atomic.LoadPointer(&d.buffer[idx])

        if old != nil &&
            (*bucket)(old) != nil &&
            (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
            log.Println("Diode set collision: consider using a larger diode")
            continue
        }

        newBucket := &bucket{
            data: data,
            seq:  writeIndex,
        }

        if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
            log.Println("Diode set collision: consider using a larger diode")
            continue
        }

        return
    }
}

Read: TryNext函数

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
    // Read a value from the ring buffer based on the readIndex.
    idx := d.readIndex % uint64(len(d.buffer))
    result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

    // When the result is nil that means the writer has not had the
    // opportunity to write a value into the diode. This value must be ignored
    // and the read head must not increment.
    if result == nil {
        return nil, false
    }

    // When the seq value is less than the current read index that means a
    // value was read from idx that was previously written but has since has
    // been dropped. This value must be ignored and the read head must not
    // increment.
    //
    // The simulation for this scenario assumes the fast forward occurred as
    // detailed below.
    //
    // 5. The reader reads again getting seq 5. It then reads again expecting
    //    seq 6 but gets seq 2. This is a read of a stale value that was
    //    effectively "dropped" so the read fails and the read head stays put.
    //    `| 4 | 5 | 2 | 3 |` r: 7, w: 6
    //
    if result.seq < d.readIndex {
        return nil, false
    }

    // When the seq value is greater than the current read index that means a
    // value was read from idx that overwrote the value that was expected to
    // be at this idx. This happens when the writer has lapped the reader. The
    // reader needs to catch up to the writer so it moves its write head to
    // the new seq, effectively dropping the messages that were not read in
    // between the two values.
    //
    // Here is a simulation of this scenario:
    //
    // 1. Both the read and write heads start at 0.
    //    `| nil | nil | nil | nil |` r: 0, w: 0
    // 2. The writer fills the buffer.
    //    `| 0 | 1 | 2 | 3 |` r: 0, w: 4
    // 3. The writer laps the read head.
    //    `| 4 | 5 | 2 | 3 |` r: 0, w: 6
    // 4. The reader reads the first value, expecting a seq of 0 but reads 4,
    //    this forces the reader to fast forward to 5.
    //    `| 4 | 5 | 2 | 3 |` r: 5, w: 6
    //
    if result.seq > d.readIndex {
        dropped := result.seq - d.readIndex
        d.readIndex = result.seq
        d.alerter.Alert(int(dropped))
    }

    // Only increment read index if a regular read occurred (where seq was
    // equal to readIndex) or a value was read that caused a fast forward
    // (where seq was greater than readIndex).
    //
    d.readIndex++
    return result.data, true
}

TODO

继续完善

相关文章

网友评论

      本文标题:go-diodes源码阅读--多生产者,单消费者无锁队列(2)

      本文链接:https://www.haomeiwen.com/subject/nakgfhtx.html