美文网首页
Go消费队列Queue并监听变化

Go消费队列Queue并监听变化

作者: 小鱼宠ZZ | 来源:发表于2021-08-23 10:25 被阅读0次

    温馨提示:觉得有帮助的话, 给我点赞哦~

    1. Go消费队列实现

    package libs
    
    import (
        "sync"
        "time"
    )
    
    // QueueItem 队列元素
    type QueueItem = interface{}
    
    // QueueConsumer 队列消费者
    type QueueConsumer func(QueueItem)
    
    type Queue struct {
        sync.RWMutex
        // 队列元素
        items []QueueItem
        // 当前并发数
        count int
        // 并发控制
        max int
        // 是否运行
        running bool
    }
    
    // Enqueue 入队
    func (q *Queue) Enqueue(v ...QueueItem) {
        q.Lock()
        q.items = append(q.items, v...)
        q.Unlock()
    }
    
    // Dequeue 出队
    func (q *Queue) Dequeue() QueueItem {
        q.Lock()
        defer q.Unlock()
    
        if len(q.items) == 0 {
            return nil
        }
    
        // 取出第一个元素
        v := q.items[0]
        // 调整队列
        q.items = q.items[1:]
    
        return v
    }
    
    // Size 队列大小
    func (q *Queue) Size() int {
        q.RLock()
        defer q.RUnlock()
    
        return len(q.items)
    }
    
    // Done 完成
    func (q *Queue) Done() {
        q.Lock()
        defer q.Unlock()
    
        if q.count > 0 {
            q.count--
        }
    }
    
    // Max 设置最大并发消费
    func (q *Queue) Max(max int) {
        if !q.running {
            q.max = max
        }
    }
    
    // Run 运行队列
    func (q *Queue) Run(consumer QueueConsumer) {
        if q.running {
            return
        }
    
        q.running = true
    
        ticker := time.NewTicker(time.Millisecond)
    
        // 监听队列
        for range ticker.C {
            q.worker(consumer)
        }
    }
    
    func (q *Queue) worker(consumer QueueConsumer) {
        q.RLock()
        // 剩余协程数
        num := q.max - q.count
        q.RUnlock()
    
        if num > 0 {
            for i := 0;i < num;i++ {
                if v := q.Dequeue(); v != nil {
                    q.Lock()
                    q.count++
                    q.Unlock()
    
                    go consumer(v)
                }
            }
        }
    }
    
    func NewQueue() *Queue {
        return &Queue{
            max: 2,
            running: false,
        }
    }
    

    2. 测试例子

    // queue_test.go
    func TestQueue(t *testing.T) {
        q := NewQueue()
    
        items := []QueueItem{1,2,3,4,5,6,7,8,9, 0}
    
           // 初始生成数据
        q.Enqueue(items...)
    
        ticker := time.NewTicker(time.Second)
    
        go func() {
                   // 模拟生成数据
            for range ticker.C {
                q.Enqueue(items...)
            }
        }()
    
           // 队列消费者
           consumer := func(item QueueItem) {
                  t.Logf("=>: %v", item)
    
                  // 注意: 消费完成要调用
                  q.Done()
        }
    
        q.Run(consumer)
    }
    

    温馨提示:觉得有帮助的话, 给我点赞哦~

    相关文章

      网友评论

          本文标题:Go消费队列Queue并监听变化

          本文链接:https://www.haomeiwen.com/subject/uuwkiltx.html