美文网首页
go协程池&&信号量

go协程池&&信号量

作者: DragonRat | 来源:发表于2022-06-27 11:05 被阅读0次
    package gpool
    
    type GJobFunc func(arg ...interface{})
    
    type GJob struct {
        f   GJobFunc
        arg []interface{}
    }
    
    type GWorker struct {
        g    *GPool
        jobC chan *GJob
        done chan interface{}
    }
    
    type GPool struct {
        workerList []*GWorker
        workers    chan *GWorker
        jobs       chan *GJob
        done       chan interface{}
    }
    
    func NewGPool(workerMaxNum, jobMaxNum int) *GPool {
        g := &GPool{
            workerList: make([]*GWorker, workerMaxNum),
            workers:    make(chan *GWorker, workerMaxNum),
            jobs:       make(chan *GJob, jobMaxNum),
            done:       make(chan interface{}, 1),
        }
    
        for i := 0; i < workerMaxNum; i++ {
            w := &GWorker{
                g:    g,
                jobC: make(chan *GJob, 1),
                done: make(chan interface{}, 1),
            }
            g.workerList[i] = w
            go w.run()
        }
    
        go g.dispatch()
    
        return g
    }
    
    func (g *GPool) Free() {
        go func() {
            g.done <- struct{}{}
            for _, w := range g.workerList {
                w.stop()
            }
        }()
    }
    
    func (g *GPool) AddJob(f GJobFunc, arg ...interface{}) {
        g.jobs <- &GJob{
            f:   f,
            arg: arg,
        }
    }
    
    func (g *GPool) dispatch() {
        for {
            select {
            case job := <-g.jobs:
                g.getWorker().putJob(job)
            case <-g.done:
                return
            }
        }
    }
    
    func (g *GPool) putWorker(w *GWorker) {
        g.workers <- w
    }
    
    func (g *GPool) getWorker() *GWorker {
        return <-g.workers
    }
    
    func (w *GWorker) run() {
        for {
            w.g.putWorker(w)
            select {
            case job := <-w.jobC:
                job.f(job.arg...)
            case <-w.done:
                return
            }
    
        }
    }
    
    func (w *GWorker) stop() {
        w.done <- struct{}{}
    }
    
    func (w *GWorker) putJob(job *GJob) {
        w.jobC <- job
    }
    
    package test_gpool
    
    import (
        "fmt"
        "gpool"
        "math/rand"
        "sync"
        "time"
    )
    
    func TestGPool() {
        wg := &sync.WaitGroup{}
        wg.Add(30)
    
        g := gpool.NewGPool(10, 100)
        defer g.Free()
    
        for i := 0; i < 30; i++ {
            now := time.Now()
            g.AddJob(TestTask, now, wg, i, i*30)
        }
    
        wg.Wait()
    }
    
    func TestTask(arg ...interface{}) {
        fmt.Println(time.Now().Format("[2006-01-02 15:04:05.000000]"), arg)
        time.Sleep(time.Duration(rand.Intn(10))*time.Second)
        wg := arg[1].(*sync.WaitGroup)
        wg.Done()
    }
    
    package semaphore
    
    import "time"
    
    type Semaphore struct {
        channel chan interface{}
    }
    
    /* 创建信号量 */
    func NewSemaphore(permits int) *Semaphore {
        return &Semaphore{channel: make(chan interface{}, permits)}
    }
    
    /* 获取许可 */
    func (s *Semaphore) Acquire() {
        s.channel <- struct{}{}
    }
    
    /* 释放许可 */
    func (s *Semaphore) Release() {
        <-s.channel
    }
    
    /* 尝试获取许可 */
    func (s *Semaphore) TryAcquire() bool {
        select {
        case s.channel <- struct{}{}:
            return true
        default:
            return false
        }
    }
    
    /* 尝试指定时间内获取许可 */
    func (s *Semaphore) TryAcquireOnTime(timeout time.Duration) bool {
        for {
            select {
            case s.channel <- struct{}{}:
                return true
            case <-time.After(timeout):
                return false
            }
        }
    }
    
    /* 当前可用的许可数 */
    func (s *Semaphore) AvailablePermits() int {
        return cap(s.channel) - len(s.channel)
    }
    

    相关文章

      网友评论

          本文标题:go协程池&&信号量

          本文链接:https://www.haomeiwen.com/subject/gfbnvrtx.html