美文网首页
设计一个协程池

设计一个协程池

作者: 彳亍口巴 | 来源:发表于2023-06-16 14:18 被阅读0次

    具体的池子

    package main
    
    import (
        "fmt"
        "sync"
        "sync/atomic"
        "time"
    )
    
    const (
        defaultCapacity = 10
        expireSecond    = 2
    )
    
    type Pool struct {
        capacity    int32       // 最大同时执行的协程
        running     int32       // 当前在执行的
        waiting     int32       // 当前等待的数量
        state       int32       // 当前池子的状态,如果为1,则关闭
        lock        sync.Locker // 自旋锁,实现了加锁和解锁方法
        workers     workerQueue // worker队列,实现了一个接口,不同实现方式代表不同的队列
        workerCache sync.Pool   // 存放每个创建出来的worker
        cond        *sync.Cond  // 当协程过多的时候需要阻塞,然后唤醒
    }
    
    func newPool(size int32) *Pool {
        if size <= 0 {
            size = defaultCapacity
        }
        p := &Pool{
            capacity: size,
            workers:  newWorkerStack(size),
            lock:     NewSpinLock(),
        }
        p.cond = sync.NewCond(p.lock)
        p.workerCache.New = func() interface{} {
            return &goWorker{
                pool: p,
                task: make(chan func(), 1),
            }
        }
        go p.purgeStableWorkers()
        return p
    }
    
    func (p *Pool) submit(task func()) error {
        if w := p.getWorker(); w != nil {
            w.inputFunc(task)
            fmt.Println("放入成功")
            return nil
        }
        fmt.Println("放入失败")
        return fmt.Errorf("放入失败")
    }
    
    func (p *Pool) getWorker() (w worker) {
        // 这里只有启动的时候会执行,后续基本不会执行,因为都是直接从队列中获取worker
        spawnWorker := func() {
            w = p.workerCache.Get().(*goWorker)
            w.run()
        }
        //
        p.lock.Lock()
        w = p.workers.detach()
        if w != nil {
            fmt.Println("获得worker")
            p.lock.Unlock()
            return w
        } else if p.running < p.capacity {
            fmt.Println("启动")
            p.lock.Unlock()
            spawnWorker()
        } else {
            if p.isClosed() {
                p.lock.Unlock()
                return
            }
            fmt.Println("开始重试")
        retry:
            // 阻塞和重试
            p.addWaiting(1)
            p.cond.Wait()
            p.addWaiting(-1)
    
            if p.isClosed() {
                p.lock.Unlock()
                return
            }
    
            if w = p.workers.detach(); w == nil {
                fmt.Println("没有获得任务")
                if p.free() {
                    p.lock.Unlock()
                    spawnWorker()
                    return
                }
                goto retry
            }
            fmt.Println("获得任务成功")
            p.lock.Unlock()
        }
        return
    }
    
    func (p *Pool) addRunning(delta int) {
        atomic.AddInt32(&p.running, int32(delta))
    }
    
    func (p *Pool) revertWorker(w *goWorker) bool {
        fmt.Println("恢复worker")
        if p.capacity < p.running || p.isClosed() {
            p.cond.Broadcast()
            fmt.Println("已关闭1")
            return false
        }
        p.lock.Lock()
        if p.isClosed() {
            fmt.Println("已关闭1")
            p.lock.Unlock()
            return false
        }
        w.lastUsed = time.Now().Unix()
        if err := p.workers.insert(w); err != nil {
            p.lock.Unlock()
            return false
        }
        p.cond.Signal()
        p.lock.Unlock()
        return true
    }
    
    // isClosed 是否关闭
    func (p *Pool) isClosed() bool {
        return atomic.LoadInt32(&p.state) == CLOSE
    }
    
    func (p *Pool) addWaiting(delta int) {
        atomic.AddInt32(&p.waiting, int32(delta))
    }
    
    // free 当前池子是否有空闲
    func (p *Pool) free() bool {
        return p.capacity-p.running > 0
    }
    
    func (p *Pool) purgeStableWorkers() {
        //ctx, _ := context.WithCancel(context.Background())
        ticker := time.NewTicker(time.Second * 2)
        defer func() {
            ticker.Stop()
        }()
        //
        for {
            select {
            case <-ticker.C:
    
            }
            fmt.Println("开始清楚")
            // 开始执行主逻辑
            var isDormant bool
            p.lock.Lock()
            stableWorker := p.workers.refresh(time.Now().Unix() - expireSecond)
            fmt.Printf("空闲数量:%d\n", len(stableWorker))
            n := p.running
            isDormant = n == 0 || len(stableWorker) == int(n)
            p.lock.Unlock()
            for i := range stableWorker {
                stableWorker[i].finish()
                stableWorker[i] = nil
            }
            // 全部空闲
            if isDormant && p.waiting > 0 {
                fmt.Println("全部空闲")
                p.cond.Broadcast()
            }
        }
    }
    
    

    自旋锁:对worker进行操作时需要上锁,因为切片不是线程安全的

    package main
    
    import (
        "runtime"
        "sync"
        "sync/atomic"
    )
    
    type spinLock uint32
    
    const maxBackoff = 16
    
    // Lock 加锁 使用CAS将0换成1 成功了就加锁成功,否则一直重试,重试次数从1增加到16
    func (sl *spinLock) Lock() {
        backOff := 1
        for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
            for i := 0; i < backOff; i++ {
                runtime.Gosched()
            }
            if backOff < maxBackoff {
                backOff <<= 1 // 左移1位
            }
        }
    }
    
    // Unlock 解锁,CAS将1改成0
    func (sl *spinLock) Unlock() {
        atomic.StoreUint32((*uint32)(sl), 0)
    }
    
    func NewSpinLock() sync.Locker {
        return new(spinLock)
    }
    
    

    具体的worker实现

    package main
    
    import (
        "fmt"
    )
    
    type goWorker struct {
        // pool who owns this worker.
        pool *Pool
    
        // task is a job should be done.
        task chan func()
    
        // lastUsed will be updated when putting a worker back into queue.
        lastUsed int64
    }
    
    func (w *goWorker) inputFunc(fn func()) {
        w.task <- fn
    }
    
    func (w *goWorker) run() {
        w.pool.addRunning(1)
        fmt.Printf("当前运行的go:%d\n", w.pool.running)
        go func() {
            defer func() {
                w.pool.addRunning(-1)
                w.pool.workerCache.Put(w)
                w.pool.cond.Signal() // 唤醒一个协程
                fmt.Println("任务结束----------------")
            }()
            //  在此遍历执行task
            for task := range w.task {
                if task == nil {
                    return
                }
                task()
                if ok := w.pool.revertWorker(w); !ok {
                    return
                }
                //time.Sleep(time.Second * 5)
            }
    
        }()
    }
    
    // finish 往task中放入一个nil,代表本次worker结束
    func (w *goWorker) finish() {
        w.task <- nil
    }
    
    func (w *goWorker) lastUsedTime() int64 {
        return w.lastUsed
    }
    
    

    worker接口

    package main
    
    // 工作队列interface
    
    type worker interface {
        run()    // 执行任务
        finish() // 强制当前worker完成任务
        lastUsedTime() int64
        inputFunc(func())
        //inputParam(interface{})
    }
    
    type workerQueue interface {
        len() int
        //isEmpty() bool
        insert(worker) error
        detach() worker
        refresh(duration int64) []worker // clean up the stale workers and return them
        //reset()
    }
    
    

    队列类型的worker

    package main
    
    import (
        "fmt"
    )
    
    type workerStack struct {
        items  []worker
        expiry []worker
    }
    
    func newWorkerStack(size int32) *workerStack {
        return &workerStack{
            items: make([]worker, 0, size),
        }
    }
    
    // insert 任务完成后,将worker放回队列中
    func (ws *workerStack) insert(w worker) error {
        ws.items = append(ws.items, w)
        fmt.Printf("当前insert数量:%d\n", len(ws.items))
        return nil
    }
    
    // detach 每次任务都要先从这里获取,获取不到的话就从池中获取或者循环从这里获取
    func (ws *workerStack) detach() worker {
        l := len(ws.items)
        if l <= 0 {
            return nil
        }
        w := ws.items[l-1]
        ws.items[l-1] = nil
        ws.items = ws.items[:l-1]
        return w
    }
    
    // refresh 将空闲worker回收
    func (ws *workerStack) refresh(expireTime int64) []worker {
        n := ws.len()
        if n <= 0 {
            return nil
        }
    
        index := ws.binarySearch(0, n-1, expireTime)
        ws.expiry = ws.expiry[:0] // 重置
        if index != -1 {
            ws.expiry = append(ws.expiry, ws.items[:index+1]...)
            m := copy(ws.items, ws.items[index+1:]) // 把未过期的worker拷贝到数组前面
            for i := m; i < n; i++ {
                ws.items[m] = nil
            }
            ws.items = ws.items[:m]
        }
        return ws.expiry
    }
    
    func (ws *workerStack) len() int {
        return len(ws.items)
    }
    
    func (ws *workerStack) binarySearch(left, right int, expireTime int64) int {
        fmt.Printf("过期时间:%d", expireTime)
        for left <= right {
            mid := (left + right) >> 1
            fmt.Println(ws.items[mid].lastUsedTime())
            if ws.items[mid].lastUsedTime() > expireTime {
                right = mid - 1
            } else {
                left = mid + 1
            }
        }
        fmt.Println(right)
        return right
    }
    
    

    相关文章

      网友评论

          本文标题:设计一个协程池

          本文链接:https://www.haomeiwen.com/subject/shurydtx.html