美文网首页
Golang连接池

Golang连接池

作者: ES_KYW | 来源:发表于2020-10-09 07:54 被阅读0次
    package controllers
    
    import (
        "io"
        "sync"
        "time"
        "errors"
        "fmt"
    )
    
    var (
        ErrInvalidConfig = errors.New("invalid pool config")//error().Error()//errors.New("invalid pool config")
        ErrPoolClosed    = errors.New("pool closed")
    )
    
    // 1、如果连接时数量已经满了,等待,等待一段时间后重新获取连接,尝试几次后返回连接失败错误,需要开启线程维护超时连接
    // 2、资源释放,每次连接完后,释放连接,回归到连接池,连接池中空闲连接设置最大时间,空闲时间过长,断开连接,保留最新连接数
    type PoolInterface interface {
        Acquire() (io.Closer, error) // 获取资源
        Release(io.Closer) error     // 释放资源
        Close(io.Closer) error       // 关闭资源
        Shutdown() error             // 关闭池
    }
    
    type factory func()(io.Closer, error)
    
    type GenericPool struct {
        s sync.Mutex // 互斥锁
        Pool chan io.Closer  // 缓存channel
        maxOpen int   // 最大连接数
        minOpen int   // 最小连接数
        NumOpen int   // 当前连接数
        closed bool   // 池是否关闭
        maxIdle int  // 最大空闲数
        maxLifetime time.Duration // 最大生命周期
        factory factory  // 创建连接
    }
    
    
    
    func NewGenericPool(maxIdle,minOpen int, maxOpen int, maxLifetime time.Duration, factory factory)(*GenericPool, error) {
        if maxOpen <= 0 || minOpen > maxOpen {
            return nil, ErrInvalidConfig
        }
        p := &GenericPool{
            sync.Mutex{},
            make(chan io.Closer,maxOpen),
            maxOpen,
            minOpen,
            0,
            false,
            maxIdle,
            maxLifetime,
            factory,
        }
        for i:=0;i < minOpen;i++ {
            closer,err := factory()
            if err != nil {
                continue
            }
            p.NumOpen++
            p.Pool<-closer
            
        }
        p.closeIdleConn()
        return p,nil
    }
    
    // 尝试连接,3次连接失败返回错误
    func (p *GenericPool)TryConnect()(io.Closer,error) {
        //NewGenericPool(0,0,0,0,factory)
        var closer io.Closer
        for i:=0;i<3;i++ {
            var err error
            closer,err = p.Acquire()
            if err != nil {
                if i >= 3 {
                    fmt.Println("尝试连接失败")
                    return nil,errors.New("连接失败")
                }
                time.Sleep(1)
            }
            return closer,nil
        }
        return closer,nil
    }
    
    // 获取连接
    func (p *GenericPool)Acquire()(io.Closer,error) {
        if p.closed {
            return nil,ErrPoolClosed
        }
    
        for {
    
            closer,err := p.getOrCreate()
            if err != nil {
                // 没有获取到连接该如何处理,返回失败,然后等待几秒后重新Acquire,如果多次Acquire已经失败直接返回报错信息
                fmt.Println("超过最大数,创建失败")
                return nil,err
            }
    
            //todo maxlifetime处理
    
            return closer,nil
        }
    }
    
    func (p *GenericPool)getOrCreate()(io.Closer,error)  {
        // 监听channel相关的IO操作
        select{
        case closer:= <- p.Pool:// 如果通道中有数据直接返回,没有数据下次访问时,如果有资源释放依旧会获取到
            p.NumOpen ++
            return closer,nil
        default:
            //fmt.Println("获取池中失败")
        }
    
        p.s.Lock()
    
        // 如果池中当前数量大于等于最大资源,应该直接返回获取不到的,不明白为何还有返回,即使返回,为何是不先判断当前是否空闲状态
        if p.NumOpen >= p.maxOpen {
    
            //closer :=<-p.Pool
            p.s.Unlock()
            //fmt.Println("当前连接数大于最大连接数")
            return nil,errors.New("当前连接数大于最大连接数")
    
        }
        // 创建新连接
        closer,err := p.factory()
        if err != nil{
            p.s.Unlock()
            return nil,err
        }
        p.NumOpen ++
        p.s.Unlock()
        fmt.Println("创建新连接")
        return closer,nil
    }
    
    // 释放单个资源到连接池
    func (p *GenericPool)Release(closer io.Closer)error  {
        if p.closed {
            return ErrPoolClosed
        }
        p.s.Lock()
        p.Pool <- closer
        p.s.Unlock()
        return nil
    }
    
    // 关闭池中长期不使用的连接
    func (p *GenericPool)closeIdleConn()  {
        // 开启子线程监控空闲连接
        go func() {
            for {
                time.Sleep(time.Duration(p.maxLifetime)*time.Second)
                if len(p.Pool) > 1 && len(p.Pool) > p.minOpen {
                    fmt.Println("3244:",len(p.Pool))
                        for i:=0;i<len(p.Pool)-p.minOpen;i++  {
                            closer := <- p.Pool
                            fmt.Println("清除超时空闲:",i,closer)
                            break
                        }
    
                }
            }
        }()
    }
    
    // 检测Pool里面空闲的close,取出其中部分数据关闭,关闭单个资源
    func (p *GenericPool)Close(closer io.Closer)error  {
        p.s.Lock()
        p.Pool<-closer
        p.NumOpen --
        p.s.Unlock()
        return nil
    }
    
    // 关闭连接池 释放所有资源
    func (p *GenericPool)ShutDown()error{
        //fmt.Println('2')
        if p.closed {
            return ErrPoolClosed
        }
        close(p.Pool)
    
        for closer := range p.Pool{
            fmt.Println("1")
                p.NumOpen --
                closer.Close()
                // _,ok := <-closer
        }
    
        p.closed = true
        //p.s.Unlock()
        return nil
    }
    
    
    func main() {
    
        GPool,err := controllers.NewGenericPool(10,2,10,5, func() (io.Closer, error){
            //a := io.Closer()
            return nil,nil
        })
        if err != nil {
            fmt.Println("创建失败")
        }
        fmt.Println("创建成功:",len(GPool.Pool))
    
        //for i:=0;i<3 ;i++  {
        closer,err :=GPool.TryConnect()
            GPool.TryConnect()
            GPool.TryConnect()
        GPool.TryConnect()
        GPool.TryConnect()
        GPool.TryConnect()
        closer,err = GPool.TryConnect()
        closer,err = GPool.TryConnect()
        closer,err = GPool.TryConnect()
        closer,err = GPool.TryConnect()
        //GPool.TryConnect()
        //GPool.TryConnect()
        //GPool.TryConnect()
        //GPool.TryConnect()
        //GPool.TryConnect()
        //}
        fmt.Println("当前空闲:",len(GPool.Pool),"累积创建:",GPool.NumOpen)
        time.Sleep(3)
        if err != nil {
            fmt.Println("获取失败")
        }
        //GPool.ShutDown()
        GPool.Close(closer)
        GPool.Close(closer)
        GPool.Close(closer)
        GPool.Close(closer)
        GPool.Close(closer)
        GPool.Close(closer)
        //GPool.Close(closer)
        fmt.Println("当前空闲1:",len(GPool.Pool),"当前在用1:",GPool.NumOpen)
        //GPool.TryConnect()
        fmt.Println("当前空闲2:",len(GPool.Pool),"当前在用2:",GPool.NumOpen)
        
        conn,err := GPool.TryConnect()
        if err != nil {
            if err != nil {
                fmt.Println("获取失败")
            }
        }
        fmt.Printf("%T",conn)
        time.Sleep(time.Duration(1000)*time.Second)
        return
    }
    

    相关文章

      网友评论

          本文标题:Golang连接池

          本文链接:https://www.haomeiwen.com/subject/njbeuktx.html