美文网首页
go 资源控制实现思路

go 资源控制实现思路

作者: Best博客 | 来源:发表于2021-01-28 15:08 被阅读0次

https://github.com/tal-tech/go-zero

倾向于控制调用资源次数的阀值

package syncx

import (
    "errors"

    "github.com/tal-tech/go-zero/core/lang"
)

// ErrLimitReturn indicates that the more than borrowed elements were returned.
var ErrLimitReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times")

// Limit controls the concurrent requests.
type Limit struct {
    pool chan lang.PlaceholderType
}

// NewLimit creates a Limit that can borrow n elements from it concurrently.
func NewLimit(n int) Limit {
    return Limit{
        pool: make(chan lang.PlaceholderType, n),
    }
}

// Borrow borrows an element from Limit in blocking mode.
func (l Limit) Borrow() {
    l.pool <- lang.Placeholder
}

// Return returns the borrowed resource, returns error only if returned more than borrowed.
func (l Limit) Return() error {
    select {
    case <-l.pool:
        return nil
    default:
        return ErrLimitReturn
    }
}

// TryBorrow tries to borrow an element from Limit, in non-blocking mode.
// If success, true returned, false for otherwise.
func (l Limit) TryBorrow() bool {
    select {
    case l.pool <- lang.Placeholder:
        return true
    default:
        return false
    }
}


//这个包简单的通过channel实现的,可以用于对资源使用的速率控制
//比如你需要尽快发送100万邮件,你只希望最多100个邮件在并发发送,避免把系统资源耗光了
func main() {
    //生成一个pool大小为100的池
    limit := NewLimit(100)

    //发送1000000万邮件
    for i := 0; i < 1000000; i++ {
        func() {

            //取池中取资源,如果没有资源了就阻塞
            limit.Borrow()

            go func() {
                //释放资源回池
                defer func() {
                    limit.Return()
                }()

                //你发邮件的逻辑

            }()

        }()
    }
    
}

资源池管理

package syncx

import (
    "sync"
    "time"

    "github.com/tal-tech/go-zero/core/timex"
)

type (
    PoolOption func(*Pool)

    node struct {
        item     interface{}
        next     *node
        lastUsed time.Duration
    }

    Pool struct {
        limit   int
        created int
        maxAge  time.Duration
        lock    sync.Locker
        cond    *sync.Cond
        head    *node
        create  func() interface{}
        destroy func(interface{})
    }
)

func NewPool(n int, create func() interface{}, destroy func(interface{}), opts ...PoolOption) *Pool {
    if n <= 0 {
        panic("pool size can't be negative or zero")
    }

    lock := new(sync.Mutex)
    pool := &Pool{
        limit:   n,
        lock:    lock,
        cond:    sync.NewCond(lock),
        create:  create,
        destroy: destroy,
    }

    for _, opt := range opts {
        opt(pool)
    }

    return pool
}

func (p *Pool) Get() interface{} {
    p.lock.Lock()
    defer p.lock.Unlock()

    for {
        if p.head != nil {
            head := p.head
            p.head = head.next
            if p.maxAge > 0 && head.lastUsed+p.maxAge < timex.Now() {
                p.created--
                p.destroy(head.item)
                continue
            } else {
                return head.item
            }
        }

        if p.created < p.limit {
            p.created++
            return p.create()
        }

        p.cond.Wait()
    }
}

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }

    p.lock.Lock()
    defer p.lock.Unlock()

    p.head = &node{
        item:     x,
        next:     p.head,
        lastUsed: timex.Now(),
    }
    p.cond.Signal()
}

func WithMaxAge(duration time.Duration) PoolOption {
    return func(pool *Pool) {
        pool.maxAge = duration
    }
}

// 测试池
func TestPoolGet(t *testing.T) {
    stack := NewPool(limit, create, destroy)
    ch := make(chan lang.PlaceholderType)

    for i := 0; i < limit; i++ {
        go func() {
            v := stack.Get()
            if v.(int) != 1 {
                t.Fatal("unmatch value")
            }
            ch <- lang.Placeholder
        }()

        select {
        case <-ch:
        case <-time.After(time.Second):
            t.Fail()
        }
    }
}
//上面用到的池管理创建函数
func create() interface{} {
    return 1
}
//上面用到的池管理删除资源,如果是mysql连接的话,这里就需要触发mysql的关闭连接
func destroy(_ interface{}) {
}

相关文章

网友评论

      本文标题:go 资源控制实现思路

      本文链接:https://www.haomeiwen.com/subject/otattltx.html