go go go继续看源码
上一次看了最后发现是单go程生产,单go程消费的无锁队列。在go-diodes的代码里面翻了翻,竟然还有多生产者,单消费者的无锁队列,后面我们一块,趁着吃午饭的时间,继续研究下。
无锁队列数据结构
- 很精简,其中
writeIndex uint64
标记生产都的位置 -
readIndex uint64
标记消费都的位置 -
alerter Alerter
丢数据调用的回调函数 -
buffer []unsafe.Pointer
存放任意类型的指针slice,这点学习下
// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
writeIndex uint64
buffer []unsafe.Pointer
readIndex uint64
alerter Alerter
}
初始化函数,NewManyToOne(size int, alerter Alerter)
- 以
if alerter == nil
开头代码块,只是给异常处理函数默认值。 -
d := &ManyToOne{
这里是go里面声明和初始化指针变量的一个语法糖。 -
d.writeIndex = ^d.writeIndex
把d.witeIndex每个bit位置为1,go里面的uint64是8字节,那d.writeIndex的值就是0xFFFFFFFFFFFFFFFF。
// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}
d := &ManyToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
// Start write index at the value before 0
// to allow the first write to use AddUint64
// and still have a beginning index of 0
d.writeIndex = ^d.writeIndex
return d
}
Write: .Set函数(可以在多个go程里面安全使用)
无锁队列里面的黑科技
怎么在不使用sync.Mutex的提前下,如何做到线程安全。答案就是atomic,go里面 "sync/atomic"包下面的函数编译时会转成对应平台(arm/x86/amd64等)的汇编,比如原子加,减等等。
分析细节
取值
-
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
是原子自加函数, -
idx := writeIndex % uint64(len(d.buffer))
计算想写入的下标。 -
old := atomic.LoadPointer(&d.buffer[idx])
原子赋值语句,等同加锁版本的old := &d.buffer[idex])
判断
以if old != nil &&
打头是一些安全验证的代码,消费者会置空取过的值,这时候还取到值,说明还没有被消费过。
// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
for {
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
idx := writeIndex % uint64(len(d.buffer))
old := atomic.LoadPointer(&d.buffer[idx])
if old != nil &&
(*bucket)(old) != nil &&
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
newBucket := &bucket{
data: data,
seq: writeIndex,
}
if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}
return
}
}
Read: TryNext函数
// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}
// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}
// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped))
}
// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
//
d.readIndex++
return result.data, true
}
TODO
继续完善
网友评论