美文网首页
golang标准库database连接池实现

golang标准库database连接池实现

作者: 香菜__ | 来源:发表于2021-03-02 11:11 被阅读0次

所有代码片段我都做了省略,只保留了连接池的实现,如果要看完整的看官方库databases/sql/sql.go
DB 和 连接对象 结构体

type DB struct {
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    numOpen      int    // number of opened and pending open connections
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
}

// 连接对象
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // guards following
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close has been called
    openStmt    map[*driverStmt]bool
    lastErr     error // lastError captures the result of the session resetter.

    // guarded by db.mu
    inUse      bool
    onPut      []func() // code (with db.mu held) run when conn is next returned
    dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

freeConn 是用来维护链接池的slice
numOpen 标示打开或者正在使用中的链接数
maxIdle 最大空闲链接数
maxOpen 最大打开链接数
maxLifetime 可重复使用连接的最长时间

获取数据库链接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }

        //判断池中是否有链接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

        //如果链接池中没有 生成新的 driverConn 对象
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}

重置连接对象

func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err, true)
}

// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    db.mu.Lock()
    if db.closed {
        // Connections do not need to be reset if they will be closed.
        // Prevents writing to resetterCh after the DB has closed.
        resetSession = false
    }
    if resetSession {
        if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
            // Lock the driverConn here so it isn't released until
            // the connection is reset.
            // The lock must be taken before the connection is put into
            // the pool to prevent it from being taken out before it is reset.
            dc.Lock()
        }
    }
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

    if !added {
        if resetSession {
            dc.Unlock()
        }
        dc.Close()
        return
    }
    if !resetSession {
        return
    }
    select {
    default:
        // If the resetterCh is blocking then mark the connection
        // as bad and continue on.
        dc.lastErr = driver.ErrBadConn
        dc.Unlock()
    case db.resetterCh <- dc:
    }
}

将连接对象放入连接池

// Satisfy a connRequest or put the driverConn in the idle pool and return true
// or return false.
// putConnDBLocked will satisfy a connRequest if there is one, or it will
// return the *driverConn to the freeConn list if err == nil and the idle
// connection limit will not be exceeded.
// If err != nil, the value of dc is ignored.
// If err == nil, then dc must not equal nil.
// If a connRequest was fulfilled or the *driverConn was placed in the
// freeConn list, then true is returned, otherwise false is returned.
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true
        }
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false
}

相关文章

网友评论

      本文标题:golang标准库database连接池实现

      本文链接:https://www.haomeiwen.com/subject/riutqltx.html