Go语言小技巧(4) - FIFO队列
队列是非常常见的数据结构,看下K8S是怎么实现的
详见client-go\tools\cache\fifo.go
struct
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
type FIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]interface{}
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
closed bool
closedLock sync.Mutex
}
数据结构和接口没什么好说的,都比较常见,重点关注Add和Pop。
实现
Add
func (f *FIFO) Add(obj interface{}) error {
id, err := f.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = obj
f.cond.Broadcast()
return nil
}
- 根据keyFunc计算key
- lock
- 如果key不存在,就加入queue切片
- items赋值
- goroutine通信,唤醒wait
- unlock
Pop
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
Note: 如果fifo空,则Pop会block
- lock
- 如果没有值,就goroutine通信,wait等待有值,释放锁
- 取queue切片第一个key
- 重点,切片操作f.queue = f.queue[1:],缺点显然底层数组前端一直空闲,空间换时间。
- initialPopulationCount操作,暂时不知道用途
- 取出对应值,为什么取值会不ok?遗留问题。如果没有值,就继续循环,外面是个死循环。
- 删除对应值
- unlock
Note: 第四步用了非常讨巧的方法,浪费内存,可以优化
Delete
func (f *FIFO) Delete(obj interface{}) error {
id, err := f.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
delete(f.items, id)
return err
}
解谜了。Delete只删除items,而没有删除queue切片,因为删除切片时间复杂度比较高,这个设计nice!
总结
- 使用切片实现顺序性
- pop的时候,使用切片操作[1:]
- 删除的时候只删除items,不要删除切片
- pop找不到值的时候,继续下一个
网友评论