温馨提示:觉得有帮助的话, 给我点赞哦~
1. Go消费队列实现
package libs
import (
"sync"
"time"
)
// QueueItem 队列元素
type QueueItem = interface{}
// QueueConsumer 队列消费者
type QueueConsumer func(QueueItem)
type Queue struct {
sync.RWMutex
// 队列元素
items []QueueItem
// 当前并发数
count int
// 并发控制
max int
// 是否运行
running bool
}
// Enqueue 入队
func (q *Queue) Enqueue(v ...QueueItem) {
q.Lock()
q.items = append(q.items, v...)
q.Unlock()
}
// Dequeue 出队
func (q *Queue) Dequeue() QueueItem {
q.Lock()
defer q.Unlock()
if len(q.items) == 0 {
return nil
}
// 取出第一个元素
v := q.items[0]
// 调整队列
q.items = q.items[1:]
return v
}
// Size 队列大小
func (q *Queue) Size() int {
q.RLock()
defer q.RUnlock()
return len(q.items)
}
// Done 完成
func (q *Queue) Done() {
q.Lock()
defer q.Unlock()
if q.count > 0 {
q.count--
}
}
// Max 设置最大并发消费
func (q *Queue) Max(max int) {
if !q.running {
q.max = max
}
}
// Run 运行队列
func (q *Queue) Run(consumer QueueConsumer) {
if q.running {
return
}
q.running = true
ticker := time.NewTicker(time.Millisecond)
// 监听队列
for range ticker.C {
q.worker(consumer)
}
}
func (q *Queue) worker(consumer QueueConsumer) {
q.RLock()
// 剩余协程数
num := q.max - q.count
q.RUnlock()
if num > 0 {
for i := 0;i < num;i++ {
if v := q.Dequeue(); v != nil {
q.Lock()
q.count++
q.Unlock()
go consumer(v)
}
}
}
}
func NewQueue() *Queue {
return &Queue{
max: 2,
running: false,
}
}
2. 测试例子
// queue_test.go
func TestQueue(t *testing.T) {
q := NewQueue()
items := []QueueItem{1,2,3,4,5,6,7,8,9, 0}
// 初始生成数据
q.Enqueue(items...)
ticker := time.NewTicker(time.Second)
go func() {
// 模拟生成数据
for range ticker.C {
q.Enqueue(items...)
}
}()
// 队列消费者
consumer := func(item QueueItem) {
t.Logf("=>: %v", item)
// 注意: 消费完成要调用
q.Done()
}
q.Run(consumer)
}
温馨提示:觉得有帮助的话, 给我点赞哦~
网友评论