目录
学习目的
主要通过学习该库来加深对连接池的印象
前文复习
在前文gorm源码的走读中我们了解到:
- gorm实际不负责处理数据库的真正交互(如curd语言),而是通过外部驱动注入处理函数
- 以在gorm的注入驱动mysql的查询函数Query为例,内部的gorm.DB.Statement的ConnPool才是实际的sql查询类执行者
rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)
- 在一般模式下,gorm.DB.Statement.ConnPool是golang的基础库database/sql库下的DB类型
那因此今天就来走读这个连接池database/sql.DB的源码
实例代码如下
package main
import (
"context"
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, err := sql.Open("mysql", "user:password@(ip:port)/database")
if err != nil {
panic(err)
}
ctx := context.Background()
row := db.QueryRowContext(ctx, "SELECT * FROM t limit 1")
if row.Err() != nil {
return
}
type user struct {
Id string
}
var u user
if err = row.Scan(&u); err != nil {
return
}
}
1.核心DB类
sql.Open返回的DB类就是非常核心类,对应为数据库的具象化实例
type DB struct {
// 所有 goroutine 阻塞等待数据库连接的总等待时长
waitDuration int64
// 指定数据库驱动用于生成连接的连接器
connector driver.Connector
// 已关闭的连接总数
numClosed uint64
// 互斥锁保证 db 实例并发安全
mu sync.Mutex
// 可用的数据库连接,及本质意义上的连接池. 其中连接按照创建/归还时间正序排列
freeConn []*driverConn
// 存储了所有用于唤醒阻塞等待连接的 goroutine 的 channel
connRequests map[uint64]chan connRequest
// 维护了一个全局递增的计数器,作为 connRequests 中的 key
nextRequest uint64
// 已开启使用或等待使用的连接数量
numOpen int
// 用于向 opener 传递创建连接信号的 chan
openerCh chan struct{}
// 标识数据库是否已关闭
closed bool
// 在关闭数据库前,进行依赖梳理
dep map[finalCloser]depSet
// 最大空闲连接数. 若设置为 0,则取默认值 2;若设置为负值,则取 0,代表不启用连接池
maxIdleCount int
// 最多可以打开的连接数. 若设为非正值,则代表不作限制
maxOpen int
// 一个连接最多可以使用多长时间
maxLifetime time.Duration
// 一个空闲连接最多可以存在多长时间
maxIdleTime time.Duration
// 用于向 cleaner 传递清理连接信号的 chan
cleanerCh chan struct{
// 有多少 goroutine 在阻塞等待连接
waitCount int64
// 总共有多少空闲连接被关闭了
maxIdleClosed int64
// 所有因为 maxIdleTime 被关闭的连接的总闲置时长
maxIdleTimeClosed int64
// 所有因为 maxLifetime 被关闭的连接的总生存时长
maxLifetimeClosed int64
// 用于终止 opener 的控制器
stop func()
}
- connector: 用于创建数据库的连接的抽象连接器,由第三方数据库提供具体实现(上文实例就是通过mysql库配合init函数注入)
type Connector interface {
// 获取一个数据库连接
Connect(context.Context) (Conn, error)
// 获取数据库驱动
Driver() Driver
}
- connRequests:唤醒通道集合,和阻塞等待连接的协程是一对一的关系,这里是参考了go的channel的send非阻塞发送方法,当有等待协程时,新来的连接直接给它
- openerCh:创建连接信号通道. 用于向连接创建协程 opener goroutine 发送信号
- stop:连接创建协程 opener goroutine 的终止器,用于停止该协程
- freeConn :数据库连接池,缓存可用的连接(学习过tcp三次握手就知道建立连接的代价非常大)
type driverConn struct {
// 该连接所属的 db 实例
db *DB
// 该连接被创建出来的时间
createdAt time.Time
// 连接粒度的互斥锁
sync.Mutex
// 真实的数据库连接. 由第三方驱动实现
ci driver.Conn
// 连接使用前,是否需要对会话进行重置
needReset bool
// 连接是否处于关闭流程
closed bool
// 连接是否已最终关闭
finalClosed bool
// 该连接下所有的 statement
openStmt map[*driverStmt]bool
// 该连接是否正在被使用
inUse bool
// 该连接被放回连接池的时间
returnedAt time.Time
// 连接被放回连接池时的回调函数
onPut []func()
// 连接是否已关闭,作用和 closed 相同
dbmuClosed bool
}
type driverStmt struct {
// 锁
sync.Locker
// 真正的 statement,由第三方数据库驱动实现
si driver.Stmt
// statement 是否已关闭
closed bool
// statement 关闭操作返回的错误
closeErr error
}
1.1 创建DB类
即Open方法,实际是openDB的再封装
type Driver interface {
// 开启一个新的数据库连接
Open(name string) (Conn, error)
}
// 创建数据库
func Open(driverName, dataSourceName string) (*DB, error) {
// 首先根据驱动类型获取数据库驱动
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
// 若驱动实现了对应的连接器 connector,则获取之并进行 db 实例创建
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
// 默认使用 dsn 数据库连接器,进行 db 创建
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
- 首先检查对应的驱动是否已经注册
- 调用openDB方法创建db实例
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
return db
}
- 创建db实例
- 启动一个协程,当连接池资源不足时补充创建连接
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
每当接收到来自 openerCh 的信号后,会调用 openNewConnection 方法进行连接的补充创建, openNewConnection会调用第三方驱动的connect方法创建新的数据库连接,然后补充到DB.freeConn中
func (db *DB) openNewConnection(ctx context.Context) {
ci, err := db.connector.Connect(ctx)
dc := &driverConn{
db: db,
createdAt: nowFunc(),
returnedAt: nowFunc(),
ci: ci,
}
// 将连接添加到连接池中
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
至此Open方法结束:
- 先检测是否注册第三方驱动
- 再初始化连接池,启动连接创建协程
- 连接创建协程在收到OpenCh信号后通过第三方驱动函数来创建连接
1.2查询流程Query
在调用 db.QueryContext 方法时,会通过 for 循环建立有限的请求重试机制. 这是因为在请求过程中,可能会因为连接过期而导致发生偶发性的 ErrBadConn 错误,针对这种错误,可以采用重试的方式来提高请求的成功率.
type Rows interface {
// 返回所有列名
Columns() []string
// 关闭 rows 迭代器
Close() error
// 遍历
Next(dest []Value) error
}
const maxBadConnRetries = 2
// 执行查询类 sql
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
var rows *Rows
var err error
var isBadConn bool
// 最多可以因为 BadConn 类型的错误重试两次
for i := 0; i < maxBadConnRetries; i++ {
// 执行 sql,此时采用的是 连接池有缓存连接优先复用 的策略
rows, err = db.query(ctx, query, args, cachedOrNewConn)
// 属于 badConn 类型的错误可以重试
isBadConn = errors.Is(err, driver.ErrBadConn)
if !isBadConn {
break
}
}
// 重试了两轮 badConn 错误后,第三轮会采用
if isBadConn {
return db.query(ctx, query, args, alwaysNewConn)
}
return rows, err
}
注意1.20使用db.retry函数对其进行包裹,但底层逻辑不变还是循环二次
1.20如下
err = db.retry(func(strategy connReuseStrategy) error {
rows, err = db.query(ctx, query, args, strategy)
return err
})
const maxBadConnRetries = 2
func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
for i := int64(0); i < maxBadConnRetries; i++ {
err := fn(cachedOrNewConn)
// retry if err is driver.ErrBadConn
if err == nil || !errors.Is(err, driver.ErrBadConn) {
return err
}
}
return fn(alwaysNewConn)
}
query方法
func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
query方法在获取连接后,执行queryDC完成sql语句执行,queryDC负责:
- 通过连接sql.driverConn将sql预处理成statement上下文
- 执行请求返回结果
- 将连接放回连接池,如果满或连接过期则关闭连接
因为DB主要还是一个连接池,所以重点关注归还连接方式
归还连接函数为releaseConn
func (dc *driverConn) releaseConn(err error) {
dc.db.putConn(dc, err, true)
}
在putConn中,先判断连接是否失效,失效则直接关闭,然后加锁
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if !errors.Is(err, driver.ErrBadConn) {
if !dc.validateConnection(resetSession) {
err = driver.ErrBadConn
}
}
db.mu.Lock()
如果放入的连接已经超时,则将错误类型置为失效,这里相当于进行了二次检查
if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
db.maxLifetimeClosed++
err = driver.ErrBadConn
}
然后执行放回连接池时的回调函数,之后再进行一次错误判断
dc.inUse = false
dc.returnedAt = nowFunc()
for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil
if errors.Is(err, driver.ErrBadConn) {
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
啥都没问题就放回连接池并释放锁
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
putConnDBLocked逻辑很简单,当DB.connRequests有等待协程时直接把连接给他,否则放回连接池(放入前进行一次空闲连接数上限判断,超过则直接释放),大致代码如下
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if c := len(db.connRequests); c > 0 {
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
1.3获取数据库连接
回头看上文query提到的DB.conn,这实际上是从连接池获取连接,具体逻辑为:
- 如果开启了连接池策略且连接池有空闲连接则直接获取
- 如果连接数上限则挂起当前协程,并将其添加到DB.connRequests中
- 未达上限就调用第三方驱动创建新连接
具体代码如下
检查超时和数据库关闭
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
// 倘若数据库已关闭,返回错误
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
如果开启了连接池策略,则优先获取尾部连接
last := len(db.freeConn) - 1
if strategy == cachedOrNewConn && last >= 0 {
// Reuse the lowest idle time connection so we can close
// connections which remain idle as soon as possible.
conn := db.freeConn[last]
db.freeConn = db.freeConn[:last]
conn.inUse = true
当获取连接达到最大生存时间,则直接返回ErrBadConn,还记得吗?QueryContext遇到该错误会进行最多两次的重试
if conn.expired(lifetime) {
db.maxLifetimeClosed++
db.mu.Unlock()
conn.Close()
return nil, driver.ErrBadConn
}
db.mu.Unlock()
当连接数到上限则挂起
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
db.connRequests[reqKey] = req
未命中则创建新连接
dc := &driverConn{
db: db,
createdAt: nowFunc(),
returnedAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc,nil
1.4连接的销毁
除了上文提到的归还连接时会主动销毁,获取连接时主动销毁,实际上还有一个cleaner协程也会进行主动销毁,由startCleanerLocked 方法尝试执行 cleaner 的创建:
func (db *DB) startCleanerLocked() {
if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
go db.connectionCleaner(db.shortestIdleTimeLocked())
}
}
db.CleanerCH==nil的判断保证了该协程的全局唯一性,该函数的调用时机有三处:
- 用户设置连接最大生存时间时(剩余两处代码不贴了)SetConnMaxLifetime
func (db *DB) SetConnMaxLifetime(d time.Duration) {
if d < 0 {
d = 0
}
db.mu.Lock()
// Wake cleaner up when lifetime is shortened.
if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
select {
case db.cleanerCh <- struct{}{}:
default:
}
}
db.maxLifetime = d
db.startCleanerLocked()
db.mu.Unlock()
}
- 用户设置连接最大空闲时长时:SetConnMaxIdleTime
- 有连接被归还回连接池时:putConnDBLocked
这里主要分析cleaner协程
整体是通过for和select方式来实现常驻,还创建了一个定时器来触发任务执行
func (db *DB) connectionCleaner(d time.Duration) {
for {
select {
case <-t.C:
case <-db.cleanerCh: // maxLifetime was changed or db was closed.
}
具体执行任务:
1.判断db是否关闭和连接数是否为0,是则退出
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
- 调用connectionCleanerRunLocked 对连接池中过期的连接进行清理
d, closing := db.connectionCleanerRunLocked(d)
db.mu.Unlock()
for _, c := range closing {
c.Close()
}
在 connectionCleanerRunLocked 方法中,会分别将达到 maxIdleTime 和 maxLifeTime 的连接从连接池 freeConn 中清除,并把这部分连接返回给上游进行批量关闭操作:
func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
if db.maxIdleTime > 0 {
for i := last; i >= 0; i-- {
c := db.freeConn[i]
if c.returnedAt.Before(idleSince) {
i++
closing = db.freeConn[:i:i]
db.freeConn = db.freeConn[i:]
idleClosing = int64(len(closing))
db.maxIdleTimeClosed += idleClosing
break
}
}
if db.maxLifetime > 0 {....}
网友评论