美文网首页
go源码走读-database/sql

go源码走读-database/sql

作者: 温岭夹糕 | 来源:发表于2024-07-27 23:19 被阅读0次

目录

链接地址

学习目的

主要通过学习该库来加深对连接池的印象

前文复习

在前文gorm源码的走读中我们了解到:

  1. gorm实际不负责处理数据库的真正交互(如curd语言),而是通过外部驱动注入处理函数
  2. 以在gorm的注入驱动mysql的查询函数Query为例,内部的gorm.DB.Statement的ConnPool才是实际的sql查询类执行者
 rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)
  1. 在一般模式下,gorm.DB.Statement.ConnPool是golang的基础库database/sql库下的DB类型

那因此今天就来走读这个连接池database/sql.DB的源码
实例代码如下

package main

import (
    "context"
    "database/sql"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    db, err := sql.Open("mysql", "user:password@(ip:port)/database")
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    row := db.QueryRowContext(ctx, "SELECT * FROM t limit 1")
    if row.Err() != nil {
        return
    }
    type user struct {
        Id string
    }
    var u user
    if err = row.Scan(&u); err != nil {
        return
    }
}

1.核心DB类

sql.Open返回的DB类就是非常核心类,对应为数据库的具象化实例

type DB struct {
    // 所有 goroutine 阻塞等待数据库连接的总等待时长
    waitDuration int64 
    
    // 指定数据库驱动用于生成连接的连接器
    connector driver.Connector
    
    // 已关闭的连接总数
    numClosed uint64


    // 互斥锁保证 db 实例并发安全
    mu           sync.Mutex    
    
    // 可用的数据库连接,及本质意义上的连接池. 其中连接按照创建/归还时间正序排列
    freeConn     []*driverConn 
    
    // 存储了所有用于唤醒阻塞等待连接的 goroutine 的 channel
    connRequests map[uint64]chan connRequest
    
    // 维护了一个全局递增的计数器,作为 connRequests 中的 key
    nextRequest  uint64 
    
    // 已开启使用或等待使用的连接数量
    numOpen      int   
    
    // 用于向 opener 传递创建连接信号的 chan
    openerCh          chan struct{}
    
    // 标识数据库是否已关闭  
    closed            bool
    
    // 在关闭数据库前,进行依赖梳理
    dep               map[finalCloser]depSet
    
    // 最大空闲连接数. 若设置为 0,则取默认值 2;若设置为负值,则取 0,代表不启用连接池
    maxIdleCount      int       
              
    // 最多可以打开的连接数. 若设为非正值,则代表不作限制
    maxOpen           int      
               
    // 一个连接最多可以使用多长时间
    maxLifetime       time.Duration          
    
    // 一个空闲连接最多可以存在多长时间
    maxIdleTime       time.Duration          
    
    // 用于向 cleaner 传递清理连接信号的 chan
    cleanerCh         chan struct{
      
    // 有多少 goroutine 在阻塞等待连接
    waitCount         int64 
    
    // 总共有多少空闲连接被关闭了 
    maxIdleClosed     int64 
    
    // 所有因为 maxIdleTime 被关闭的连接的总闲置时长
    maxIdleTimeClosed int64 
    
    // 所有因为 maxLifetime 被关闭的连接的总生存时长
    maxLifetimeClosed int64


    // 用于终止 opener 的控制器
    stop func() 
}
  • connector: 用于创建数据库的连接的抽象连接器,由第三方数据库提供具体实现(上文实例就是通过mysql库配合init函数注入)
type Connector interface {
    // 获取一个数据库连接
    Connect(context.Context) (Conn, error)


    // 获取数据库驱动
    Driver() Driver
}
  • connRequests:唤醒通道集合,和阻塞等待连接的协程是一对一的关系,这里是参考了go的channel的send非阻塞发送方法,当有等待协程时,新来的连接直接给它
  • openerCh:创建连接信号通道. 用于向连接创建协程 opener goroutine 发送信号
  • stop:连接创建协程 opener goroutine 的终止器,用于停止该协程
  • freeConn :数据库连接池,缓存可用的连接(学习过tcp三次握手就知道建立连接的代价非常大)
type driverConn struct {
    // 该连接所属的 db 实例
    db        *DB
    // 该连接被创建出来的时间
    createdAt time.Time


    // 连接粒度的互斥锁
    sync.Mutex  
    
    // 真实的数据库连接. 由第三方驱动实现
    ci          driver.Conn
    
    // 连接使用前,是否需要对会话进行重置
    needReset   bool 
    
    // 连接是否处于关闭流程
    closed      bool
    // 连接是否已最终关闭
    finalClosed bool 
    
    // 该连接下所有的 statement
    openStmt    map[*driverStmt]bool


    // 该连接是否正在被使用
    inUse      bool
    
    // 该连接被放回连接池的时间
    returnedAt time.Time 
    
    // 连接被放回连接池时的回调函数
    onPut      []func() 
    
    // 连接是否已关闭,作用和 closed 相同
    dbmuClosed bool     
}

type driverStmt struct {
    // 锁
    sync.Locker 
    // 真正的 statement,由第三方数据库驱动实现
    si          driver.Stmt
    // statement 是否已关闭
    closed      bool
    // statement 关闭操作返回的错误
    closeErr    error 
}

1.1 创建DB类

即Open方法,实际是openDB的再封装

type Driver interface {
    // 开启一个新的数据库连接
    Open(name string) (Conn, error)
}
// 创建数据库
func Open(driverName, dataSourceName string) (*DB, error) {
    // 首先根据驱动类型获取数据库驱动
    driversMu.RLock()
    driveri, ok := drivers[driverName]
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }


    // 若驱动实现了对应的连接器 connector,则获取之并进行 db 实例创建
    if driverCtx, ok := driveri.(driver.DriverContext); ok {
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil
    }


    // 默认使用 dsn 数据库连接器,进行 db 创建
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
  1. 首先检查对应的驱动是否已经注册
  2. 调用openDB方法创建db实例
func OpenDB(c driver.Connector) *DB {
    ctx, cancel := context.WithCancel(context.Background())
    db := &DB{
        connector:    c,
        openerCh:     make(chan struct{}, connectionRequestQueueSize),
        lastPut:      make(map[*driverConn]string),
        connRequests: make(map[uint64]chan connRequest),
        stop:         cancel,
    }

    go db.connectionOpener(ctx)

    return db
}
  1. 创建db实例
  2. 启动一个协程,当连接池资源不足时补充创建连接
func (db *DB) connectionOpener(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-db.openerCh:
            db.openNewConnection(ctx)
        }
    }
}

每当接收到来自 openerCh 的信号后,会调用 openNewConnection 方法进行连接的补充创建, openNewConnection会调用第三方驱动的connect方法创建新的数据库连接,然后补充到DB.freeConn中

func (db *DB) openNewConnection(ctx context.Context) {
    ci, err := db.connector.Connect(ctx)
    dc := &driverConn{
        db:         db,
        createdAt:  nowFunc(),
        returnedAt: nowFunc(),
        ci:         ci,
    }
    
    // 将连接添加到连接池中
    if db.putConnDBLocked(dc, err) {
        db.addDepLocked(dc, dc)
    } else {
        db.numOpen--
        ci.Close()
    }
}

至此Open方法结束:

  • 先检测是否注册第三方驱动
  • 再初始化连接池,启动连接创建协程
  • 连接创建协程在收到OpenCh信号后通过第三方驱动函数来创建连接

1.2查询流程Query

在调用 db.QueryContext 方法时,会通过 for 循环建立有限的请求重试机制. 这是因为在请求过程中,可能会因为连接过期而导致发生偶发性的 ErrBadConn 错误,针对这种错误,可以采用重试的方式来提高请求的成功率.

type Rows interface {
    // 返回所有列名
    Columns() []string


    // 关闭 rows 迭代器
    Close() error


    // 遍历
    Next(dest []Value) error
}
const maxBadConnRetries = 2


// 执行查询类 sql 
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
    var rows *Rows
    var err error
    var isBadConn bool
    
    // 最多可以因为 BadConn 类型的错误重试两次
    for i := 0; i < maxBadConnRetries; i++ {
        // 执行 sql,此时采用的是 连接池有缓存连接优先复用 的策略
        rows, err = db.query(ctx, query, args, cachedOrNewConn)
        // 属于 badConn 类型的错误可以重试
        isBadConn = errors.Is(err, driver.ErrBadConn)
        if !isBadConn {
            break
        }
    }
    
    // 重试了两轮 badConn 错误后,第三轮会采用
    if isBadConn {
        return db.query(ctx, query, args, alwaysNewConn)
    }
    return rows, err
}

注意1.20使用db.retry函数对其进行包裹,但底层逻辑不变还是循环二次
1.20如下

    err = db.retry(func(strategy connReuseStrategy) error {
        rows, err = db.query(ctx, query, args, strategy)
        return err
    })

const maxBadConnRetries = 2

func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
    for i := int64(0); i < maxBadConnRetries; i++ {
        err := fn(cachedOrNewConn)
        // retry if err is driver.ErrBadConn
        if err == nil || !errors.Is(err, driver.ErrBadConn) {
            return err
        }
    }

    return fn(alwaysNewConn)
}

query方法

func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
    dc, err := db.conn(ctx, strategy)
    if err != nil {
        return nil, err
    }

    return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

query方法在获取连接后,执行queryDC完成sql语句执行,queryDC负责:

  1. 通过连接sql.driverConn将sql预处理成statement上下文
  2. 执行请求返回结果
  3. 将连接放回连接池,如果满或连接过期则关闭连接
    因为DB主要还是一个连接池,所以重点关注归还连接方式
    归还连接函数为releaseConn
func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err, true)
}

在putConn中,先判断连接是否失效,失效则直接关闭,然后加锁

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    if !errors.Is(err, driver.ErrBadConn) {
        if !dc.validateConnection(resetSession) {
            err = driver.ErrBadConn
        }
    }
    db.mu.Lock()

如果放入的连接已经超时,则将错误类型置为失效,这里相当于进行了二次检查

    if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
        db.maxLifetimeClosed++
        err = driver.ErrBadConn
    }

然后执行放回连接池时的回调函数,之后再进行一次错误判断

    dc.inUse = false
    dc.returnedAt = nowFunc()
    
    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil

    if errors.Is(err, driver.ErrBadConn) {        
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        dc.Close()
        return
    }

啥都没问题就放回连接池并释放锁

    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

putConnDBLocked逻辑很简单,当DB.connRequests有等待协程时直接把连接给他,否则放回连接池(放入前进行一次空闲连接数上限判断,超过则直接释放),大致代码如下

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if c := len(db.connRequests); c > 0 {
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false

1.3获取数据库连接

回头看上文query提到的DB.conn,这实际上是从连接池获取连接,具体逻辑为:

  1. 如果开启了连接池策略且连接池有空闲连接则直接获取
  2. 如果连接数上限则挂起当前协程,并将其添加到DB.connRequests中
  3. 未达上限就调用第三方驱动创建新连接
    具体代码如下
    检查超时和数据库关闭
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    // 倘若数据库已关闭,返回错误
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    
    // Check if the context is expired.
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }

如果开启了连接池策略,则优先获取尾部连接

    last := len(db.freeConn) - 1
    if strategy == cachedOrNewConn && last >= 0 {
        // Reuse the lowest idle time connection so we can close
        // connections which remain idle as soon as possible.
        conn := db.freeConn[last]
        db.freeConn = db.freeConn[:last]
        conn.inUse = true

当获取连接达到最大生存时间,则直接返回ErrBadConn,还记得吗?QueryContext遇到该错误会进行最多两次的重试

        if conn.expired(lifetime) {
            db.maxLifetimeClosed++
            db.mu.Unlock()
            conn.Close()
            return nil, driver.ErrBadConn
        }
        db.mu.Unlock()

当连接数到上限则挂起

  if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        db.connRequests[reqKey] = req

未命中则创建新连接

    dc := &driverConn{
        db:         db,
        createdAt:  nowFunc(),
        returnedAt: nowFunc(),
        ci:         ci,
        inUse:      true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc,nil

1.4连接的销毁

除了上文提到的归还连接时会主动销毁,获取连接时主动销毁,实际上还有一个cleaner协程也会进行主动销毁,由startCleanerLocked 方法尝试执行 cleaner 的创建:

func (db *DB) startCleanerLocked() {
    if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
        db.cleanerCh = make(chan struct{}, 1)
        go db.connectionCleaner(db.shortestIdleTimeLocked())
    }
}

db.CleanerCH==nil的判断保证了该协程的全局唯一性,该函数的调用时机有三处:

  1. 用户设置连接最大生存时间时(剩余两处代码不贴了)SetConnMaxLifetime
func (db *DB) SetConnMaxLifetime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    // Wake cleaner up when lifetime is shortened.
    if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxLifetime = d
    db.startCleanerLocked()
    db.mu.Unlock()
}
  1. 用户设置连接最大空闲时长时:SetConnMaxIdleTime
  2. 有连接被归还回连接池时:putConnDBLocked

这里主要分析cleaner协程
整体是通过for和select方式来实现常驻,还创建了一个定时器来触发任务执行

func (db *DB) connectionCleaner(d time.Duration) {
    for {
        select {
        case <-t.C:
        case <-db.cleanerCh: // maxLifetime was changed or db was closed.
        }

具体执行任务:
1.判断db是否关闭和连接数是否为0,是则退出

        if db.closed || db.numOpen == 0 || d <= 0 {
            db.cleanerCh = nil
            db.mu.Unlock()
            return
        }
  1. 调用connectionCleanerRunLocked 对连接池中过期的连接进行清理
        d, closing := db.connectionCleanerRunLocked(d)
        db.mu.Unlock()
        for _, c := range closing {
            c.Close()
        }

在 connectionCleanerRunLocked 方法中,会分别将达到 maxIdleTime 和 maxLifeTime 的连接从连接池 freeConn 中清除,并把这部分连接返回给上游进行批量关闭操作:

func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
    if db.maxIdleTime > 0 {
        for i := last; i >= 0; i-- {
            c := db.freeConn[i]
            if c.returnedAt.Before(idleSince) {
                i++
                closing = db.freeConn[:i:i]
                db.freeConn = db.freeConn[i:]
                idleClosing = int64(len(closing))
                db.maxIdleTimeClosed += idleClosing
                break
            }
        }
    if db.maxLifetime > 0 {....}

相关文章

网友评论

      本文标题:go源码走读-database/sql

      本文链接:https://www.haomeiwen.com/subject/sxybhjtx.html