美文网首页
设计一个协程池

设计一个协程池

作者: 彳亍口巴 | 来源:发表于2023-06-16 14:18 被阅读0次

具体的池子

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

const (
    defaultCapacity = 10
    expireSecond    = 2
)

type Pool struct {
    capacity    int32       // 最大同时执行的协程
    running     int32       // 当前在执行的
    waiting     int32       // 当前等待的数量
    state       int32       // 当前池子的状态,如果为1,则关闭
    lock        sync.Locker // 自旋锁,实现了加锁和解锁方法
    workers     workerQueue // worker队列,实现了一个接口,不同实现方式代表不同的队列
    workerCache sync.Pool   // 存放每个创建出来的worker
    cond        *sync.Cond  // 当协程过多的时候需要阻塞,然后唤醒
}

func newPool(size int32) *Pool {
    if size <= 0 {
        size = defaultCapacity
    }
    p := &Pool{
        capacity: size,
        workers:  newWorkerStack(size),
        lock:     NewSpinLock(),
    }
    p.cond = sync.NewCond(p.lock)
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), 1),
        }
    }
    go p.purgeStableWorkers()
    return p
}

func (p *Pool) submit(task func()) error {
    if w := p.getWorker(); w != nil {
        w.inputFunc(task)
        fmt.Println("放入成功")
        return nil
    }
    fmt.Println("放入失败")
    return fmt.Errorf("放入失败")
}

func (p *Pool) getWorker() (w worker) {
    // 这里只有启动的时候会执行,后续基本不会执行,因为都是直接从队列中获取worker
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }
    //
    p.lock.Lock()
    w = p.workers.detach()
    if w != nil {
        fmt.Println("获得worker")
        p.lock.Unlock()
        return w
    } else if p.running < p.capacity {
        fmt.Println("启动")
        p.lock.Unlock()
        spawnWorker()
    } else {
        if p.isClosed() {
            p.lock.Unlock()
            return
        }
        fmt.Println("开始重试")
    retry:
        // 阻塞和重试
        p.addWaiting(1)
        p.cond.Wait()
        p.addWaiting(-1)

        if p.isClosed() {
            p.lock.Unlock()
            return
        }

        if w = p.workers.detach(); w == nil {
            fmt.Println("没有获得任务")
            if p.free() {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }
        fmt.Println("获得任务成功")
        p.lock.Unlock()
    }
    return
}

func (p *Pool) addRunning(delta int) {
    atomic.AddInt32(&p.running, int32(delta))
}

func (p *Pool) revertWorker(w *goWorker) bool {
    fmt.Println("恢复worker")
    if p.capacity < p.running || p.isClosed() {
        p.cond.Broadcast()
        fmt.Println("已关闭1")
        return false
    }
    p.lock.Lock()
    if p.isClosed() {
        fmt.Println("已关闭1")
        p.lock.Unlock()
        return false
    }
    w.lastUsed = time.Now().Unix()
    if err := p.workers.insert(w); err != nil {
        p.lock.Unlock()
        return false
    }
    p.cond.Signal()
    p.lock.Unlock()
    return true
}

// isClosed 是否关闭
func (p *Pool) isClosed() bool {
    return atomic.LoadInt32(&p.state) == CLOSE
}

func (p *Pool) addWaiting(delta int) {
    atomic.AddInt32(&p.waiting, int32(delta))
}

// free 当前池子是否有空闲
func (p *Pool) free() bool {
    return p.capacity-p.running > 0
}

func (p *Pool) purgeStableWorkers() {
    //ctx, _ := context.WithCancel(context.Background())
    ticker := time.NewTicker(time.Second * 2)
    defer func() {
        ticker.Stop()
    }()
    //
    for {
        select {
        case <-ticker.C:

        }
        fmt.Println("开始清楚")
        // 开始执行主逻辑
        var isDormant bool
        p.lock.Lock()
        stableWorker := p.workers.refresh(time.Now().Unix() - expireSecond)
        fmt.Printf("空闲数量:%d\n", len(stableWorker))
        n := p.running
        isDormant = n == 0 || len(stableWorker) == int(n)
        p.lock.Unlock()
        for i := range stableWorker {
            stableWorker[i].finish()
            stableWorker[i] = nil
        }
        // 全部空闲
        if isDormant && p.waiting > 0 {
            fmt.Println("全部空闲")
            p.cond.Broadcast()
        }
    }
}

自旋锁:对worker进行操作时需要上锁,因为切片不是线程安全的

package main

import (
    "runtime"
    "sync"
    "sync/atomic"
)

type spinLock uint32

const maxBackoff = 16

// Lock 加锁 使用CAS将0换成1 成功了就加锁成功,否则一直重试,重试次数从1增加到16
func (sl *spinLock) Lock() {
    backOff := 1
    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        for i := 0; i < backOff; i++ {
            runtime.Gosched()
        }
        if backOff < maxBackoff {
            backOff <<= 1 // 左移1位
        }
    }
}

// Unlock 解锁,CAS将1改成0
func (sl *spinLock) Unlock() {
    atomic.StoreUint32((*uint32)(sl), 0)
}

func NewSpinLock() sync.Locker {
    return new(spinLock)
}

具体的worker实现

package main

import (
    "fmt"
)

type goWorker struct {
    // pool who owns this worker.
    pool *Pool

    // task is a job should be done.
    task chan func()

    // lastUsed will be updated when putting a worker back into queue.
    lastUsed int64
}

func (w *goWorker) inputFunc(fn func()) {
    w.task <- fn
}

func (w *goWorker) run() {
    w.pool.addRunning(1)
    fmt.Printf("当前运行的go:%d\n", w.pool.running)
    go func() {
        defer func() {
            w.pool.addRunning(-1)
            w.pool.workerCache.Put(w)
            w.pool.cond.Signal() // 唤醒一个协程
            fmt.Println("任务结束----------------")
        }()
        //  在此遍历执行task
        for task := range w.task {
            if task == nil {
                return
            }
            task()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
            //time.Sleep(time.Second * 5)
        }

    }()
}

// finish 往task中放入一个nil,代表本次worker结束
func (w *goWorker) finish() {
    w.task <- nil
}

func (w *goWorker) lastUsedTime() int64 {
    return w.lastUsed
}

worker接口

package main

// 工作队列interface

type worker interface {
    run()    // 执行任务
    finish() // 强制当前worker完成任务
    lastUsedTime() int64
    inputFunc(func())
    //inputParam(interface{})
}

type workerQueue interface {
    len() int
    //isEmpty() bool
    insert(worker) error
    detach() worker
    refresh(duration int64) []worker // clean up the stale workers and return them
    //reset()
}

队列类型的worker

package main

import (
    "fmt"
)

type workerStack struct {
    items  []worker
    expiry []worker
}

func newWorkerStack(size int32) *workerStack {
    return &workerStack{
        items: make([]worker, 0, size),
    }
}

// insert 任务完成后,将worker放回队列中
func (ws *workerStack) insert(w worker) error {
    ws.items = append(ws.items, w)
    fmt.Printf("当前insert数量:%d\n", len(ws.items))
    return nil
}

// detach 每次任务都要先从这里获取,获取不到的话就从池中获取或者循环从这里获取
func (ws *workerStack) detach() worker {
    l := len(ws.items)
    if l <= 0 {
        return nil
    }
    w := ws.items[l-1]
    ws.items[l-1] = nil
    ws.items = ws.items[:l-1]
    return w
}

// refresh 将空闲worker回收
func (ws *workerStack) refresh(expireTime int64) []worker {
    n := ws.len()
    if n <= 0 {
        return nil
    }

    index := ws.binarySearch(0, n-1, expireTime)
    ws.expiry = ws.expiry[:0] // 重置
    if index != -1 {
        ws.expiry = append(ws.expiry, ws.items[:index+1]...)
        m := copy(ws.items, ws.items[index+1:]) // 把未过期的worker拷贝到数组前面
        for i := m; i < n; i++ {
            ws.items[m] = nil
        }
        ws.items = ws.items[:m]
    }
    return ws.expiry
}

func (ws *workerStack) len() int {
    return len(ws.items)
}

func (ws *workerStack) binarySearch(left, right int, expireTime int64) int {
    fmt.Printf("过期时间:%d", expireTime)
    for left <= right {
        mid := (left + right) >> 1
        fmt.Println(ws.items[mid].lastUsedTime())
        if ws.items[mid].lastUsedTime() > expireTime {
            right = mid - 1
        } else {
            left = mid + 1
        }
    }
    fmt.Println(right)
    return right
}

相关文章

  • Go语言——goroutine并发模型

    Go语言——goroutine并发模型 参考: Goroutine并发调度模型深度解析&手撸一个协程池 Golan...

  • Unity协程基础用法

    //通过StartCoroutine()开始一个协程 //通过StopCoroutine();关闭一个协程 ...

  • 一学就会的协程使用——基础篇(一)协程启动

    1. 启动一个协程 来,来启动第一个协程吧: 就这么简单,就可以在任意一个地方启动一个协程,而且这个协程必然会执行...

  • golang实现协程池

    golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩...

  • 协程技术

    1、协程(Coroutine):比线程的单位更小,在一个线程中可以开启多个协程,利用多个协程实现并发。 2、协程跟...

  • 通道--golang

    通道(channels) 是连接多个协程的管道,可以从一个协程将值发送到通道,然后在另一个协程中接收,由关键字ch...

  • 用多个协程顺序打印1-100

    这个题目要求有10个协程,每个协程打印0-9作为后缀的数字,总共打印100个

  • kotlin & Coroutine

    协程是采用就是并发的设计模式,这句话的大多数环境下是没有问题。但是,如果某个协程满足以下几点,那它里面的子协程将会...

  • 【golang 学习总结1】轻松开启100个并发

    代码 代码解释: 使用go 开启一个协程,一共开启了100个协程 为什么使用 time.Sleep因为 main函...

  • 协程和线程

    协程应该是一个更小的执行单元,一个线程可以拥有多个协程,但同时只能执行一个协程。 协程拥有自己的寄存器上下文和栈空...

网友评论

      本文标题:设计一个协程池

      本文链接:https://www.haomeiwen.com/subject/shurydtx.html