美文网首页
go 资源控制实现思路

go 资源控制实现思路

作者: Best博客 | 来源:发表于2021-01-28 15:08 被阅读0次

    https://github.com/tal-tech/go-zero

    倾向于控制调用资源次数的阀值

    package syncx
    
    import (
        "errors"
    
        "github.com/tal-tech/go-zero/core/lang"
    )
    
    // ErrLimitReturn indicates that the more than borrowed elements were returned.
    var ErrLimitReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times")
    
    // Limit controls the concurrent requests.
    type Limit struct {
        pool chan lang.PlaceholderType
    }
    
    // NewLimit creates a Limit that can borrow n elements from it concurrently.
    func NewLimit(n int) Limit {
        return Limit{
            pool: make(chan lang.PlaceholderType, n),
        }
    }
    
    // Borrow borrows an element from Limit in blocking mode.
    func (l Limit) Borrow() {
        l.pool <- lang.Placeholder
    }
    
    // Return returns the borrowed resource, returns error only if returned more than borrowed.
    func (l Limit) Return() error {
        select {
        case <-l.pool:
            return nil
        default:
            return ErrLimitReturn
        }
    }
    
    // TryBorrow tries to borrow an element from Limit, in non-blocking mode.
    // If success, true returned, false for otherwise.
    func (l Limit) TryBorrow() bool {
        select {
        case l.pool <- lang.Placeholder:
            return true
        default:
            return false
        }
    }
    
    
    //这个包简单的通过channel实现的,可以用于对资源使用的速率控制
    //比如你需要尽快发送100万邮件,你只希望最多100个邮件在并发发送,避免把系统资源耗光了
    func main() {
        //生成一个pool大小为100的池
        limit := NewLimit(100)
    
        //发送1000000万邮件
        for i := 0; i < 1000000; i++ {
            func() {
    
                //取池中取资源,如果没有资源了就阻塞
                limit.Borrow()
    
                go func() {
                    //释放资源回池
                    defer func() {
                        limit.Return()
                    }()
    
                    //你发邮件的逻辑
    
                }()
    
            }()
        }
        
    }
    
    

    资源池管理

    package syncx
    
    import (
        "sync"
        "time"
    
        "github.com/tal-tech/go-zero/core/timex"
    )
    
    type (
        PoolOption func(*Pool)
    
        node struct {
            item     interface{}
            next     *node
            lastUsed time.Duration
        }
    
        Pool struct {
            limit   int
            created int
            maxAge  time.Duration
            lock    sync.Locker
            cond    *sync.Cond
            head    *node
            create  func() interface{}
            destroy func(interface{})
        }
    )
    
    func NewPool(n int, create func() interface{}, destroy func(interface{}), opts ...PoolOption) *Pool {
        if n <= 0 {
            panic("pool size can't be negative or zero")
        }
    
        lock := new(sync.Mutex)
        pool := &Pool{
            limit:   n,
            lock:    lock,
            cond:    sync.NewCond(lock),
            create:  create,
            destroy: destroy,
        }
    
        for _, opt := range opts {
            opt(pool)
        }
    
        return pool
    }
    
    func (p *Pool) Get() interface{} {
        p.lock.Lock()
        defer p.lock.Unlock()
    
        for {
            if p.head != nil {
                head := p.head
                p.head = head.next
                if p.maxAge > 0 && head.lastUsed+p.maxAge < timex.Now() {
                    p.created--
                    p.destroy(head.item)
                    continue
                } else {
                    return head.item
                }
            }
    
            if p.created < p.limit {
                p.created++
                return p.create()
            }
    
            p.cond.Wait()
        }
    }
    
    func (p *Pool) Put(x interface{}) {
        if x == nil {
            return
        }
    
        p.lock.Lock()
        defer p.lock.Unlock()
    
        p.head = &node{
            item:     x,
            next:     p.head,
            lastUsed: timex.Now(),
        }
        p.cond.Signal()
    }
    
    func WithMaxAge(duration time.Duration) PoolOption {
        return func(pool *Pool) {
            pool.maxAge = duration
        }
    }
    
    // 测试池
    func TestPoolGet(t *testing.T) {
        stack := NewPool(limit, create, destroy)
        ch := make(chan lang.PlaceholderType)
    
        for i := 0; i < limit; i++ {
            go func() {
                v := stack.Get()
                if v.(int) != 1 {
                    t.Fatal("unmatch value")
                }
                ch <- lang.Placeholder
            }()
    
            select {
            case <-ch:
            case <-time.After(time.Second):
                t.Fail()
            }
        }
    }
    //上面用到的池管理创建函数
    func create() interface{} {
        return 1
    }
    //上面用到的池管理删除资源,如果是mysql连接的话,这里就需要触发mysql的关闭连接
    func destroy(_ interface{}) {
    }
    

    相关文章

      网友评论

          本文标题:go 资源控制实现思路

          本文链接:https://www.haomeiwen.com/subject/otattltx.html