美文网首页
Go 数据库事务的源码分析

Go 数据库事务的源码分析

作者: CocoAdapter | 来源:发表于2019-07-29 22:26 被阅读0次

    假设 mobile_applications 表的字段只有两个 app_name, other_info,均为 varchar(256),先上一段简单的业务逻辑代码

    package main
    
    import (
        "database/sql"
        _ "github.com/go-sql-driver/mysql"
    )
    
    func main() {
        db, _ := sql.Open("mysql",
            "root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
    // tx1
        tx, _ := db.Begin()
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第一个应用", "第一个应用的信息")
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第二个应用", "第二个应用的信息")
        _ = tx.Commit()
    
    // tx2
        tx, _ = db.Begin()
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第三个应用", "第三个应用的信息")
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第四个应用", "第四个应用的信息")
        //_ = tx.Commit()
    
    // tx3
        tx, _ = db.Begin()
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第五个应用", "第五个应用的信息")
        _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
            "第六个应用", "第六个应用的信息")
        _ = tx.Rollback()
    }
    
    

    执行结果:

    mysql> select * from mobile_applications;
    +-----------------+--------------------------+------------------+---------------+
    | app_name        | other_info               | xxx_unrecognized | xxx_sizecache |
    +-----------------+--------------------------+------------------+---------------+
    | 第一个应用      | 第一个应用的信息         | NULL             |          NULL |
    | 第二个应用      | 第二个应用的信息         | NULL             |          NULL |
    +-----------------+--------------------------+------------------+---------------+
    2 rows in set (0.00 sec)
    

    显然,只有 tx1 最终落库,tx2 和 tx3 都没有落库。这个符合我们对事务的理解,即只有 commit 的操作才会最终落库。

    这里我们首先要理解,对于计算机而言,什么是事务?数据库系统概念里对“事务”的定义相信很多人耳熟能详,但是对于我们运行的程序而言,事务实际上分为两层,第一层是内存中的上下文,第二层是DBMS控制的我们常说的“事务”。底层事务实际上是通过上下文来定义和操作的,中间隔了层 driver,屏蔽了具体的细节。
    以 go 中的 事务为例,是通过一个 struct 来定义的。

    type Tx struct {
        db *DB
    
        // closemu prevents the transaction from closing while there
        // is an active query. It is held for read during queries
        // and exclusively during close.
        closemu sync.RWMutex
    
        // dc is owned exclusively until Commit or Rollback, at which point
        // it's returned with putConn.
        dc  *driverConn
        txi driver.Tx
    
        // releaseConn is called once the Tx is closed to release
        // any held driverConn back to the pool.
        releaseConn func(error)
    
        // done transitions from 0 to 1 exactly once, on Commit
        // or Rollback. once done, all operations fail with
        // ErrTxDone.
        // Use atomic operations on value when checking value.
        done int32
    
        // All Stmts prepared for this transaction. These will be closed after the
        // transaction has been committed or rolled back.
        stmts struct {
            sync.Mutex
            v []*Stmt
        }
    
        // cancel is called after done transitions from 0 to 1.
        cancel func()
    
        // ctx lives for the life of the transaction.
        ctx context.Context
    }
    

    其中核心是:

    • driverConn
      go 里面的数据库连接,封装了 driver 里的数据库连接。
    • driver.Tx
      定义了 commit 和 rollback 两个方法。

    一个事务,实际上就是所有 CRUD 操作都在同一个数据库连接里,调用 Begin 会通过该连接,经有 driver 执行特定 DBMS 的事务开启指令,比如 mysql 的 driver 就是

    func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
        if mc.closed.IsSet() {
            errLog.Print(ErrInvalidConn)
            return nil, driver.ErrBadConn
        }
        var q string
        if readOnly {
            q = "START TRANSACTION READ ONLY"
        } else {
            q = "START TRANSACTION"
        }
        err := mc.exec(q)
        if err == nil {
            return &mysqlTx{mc}, err
        }
        return nil, mc.markBadConn(err)
    }
    

    后续操作就都在一个事务里了。上层可以通过 Begin 方法返回的 Commit/Rollback 方法来提交或回滚这个事务。那么,go 是如何实现对数据库事务的支持的呢?我们从入口代码一步一步来看。

    db.Beigin

    driver.go 里定义了 ErrBadConn: driver 抛出的错误,表示底层连接不可用或已中断,上层应该重新用一个连接
    sql.go 里定义了两个常量,

    • cachedOrNewConn:
      如果连接池里有 idle 状态的连接,直接返回;如果连接池里的连接数已经达到 MaxOpenCons 定义的数量,则阻塞等待,直到有一个连接 idel;否则,创建新的连接,加入到连接池,然后返回。
    • alwaysNewConn:
      强制使用新的连接而不是从接池里里复用
    // Begin() 方法是带参数版本的一个默认版本
    func (db *DB) Begin() (*Tx, error) {
        return db.BeginTx(context.Background(), nil)
    }
    
    // 可以设置 Contxt 和 事务配置项
    func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
        var tx *Tx
        var err error
    // 如果遇到 ErrBadConn 默认会重试 2 次
        for i := 0; i < maxBadConnRetries; i++ {
            tx, err = db.begin(ctx, opts, cachedOrNewConn)
            if err != driver.ErrBadConn {
                break
            }
        }
        if err == driver.ErrBadConn {
            return db.begin(ctx, opts, alwaysNewConn)
        }
        return tx, err
    }
    
    func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
        dc, err := db.conn(ctx, strategy)
        if err != nil {
            return nil, err
        }
        return db.beginDC(ctx, dc, dc.releaseConn, opts)
    }
    

    外部调用最终进入私有方法 begin 里,begin 主要完成两个操作,一是获取一个数据库连接,二是创建事务的上下文,即开启事务。下面我们一一来看。

    db.conn

    这个方法很长,核心业务逻辑主要分为以下几个部分:

    • 检查
      首先检查 db 是否关闭了,再检查是否 context 过期了,若任一为是都会直接返回错误
    db.mu.Lock() // 临界区的结束点不同情况不相同
    if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
    }
    // Check if the context is expired.
    select {
    default:
      case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
    }
    
    lifetime := db.maxLifetime // 后面逻辑使用
    
    • cachedOrNewConn 且 存在 idle 的连接
    // db.freeConn 是一个 slice,存储了 idle 的连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
      conn := db.freeConn[0]
    // 移走第一个
      copy(db.freeConn, db.freeConn[1:])
      db.freeConn = db.freeConn[:numFree-1]
    
      conn.inUse = true
      db.mu.Unlock() // 释放锁了
    // 判断连接是否过期了
      if conn.expired(lifetime) {
        conn.Close()
        return nil, driver.ErrBadConn
      }
      // lastErr 字段的意义没看懂,好像在返回一个连接之前都要检查这个错误有没有设置
      // Lock around reading lastErr to ensure the session resetter finished.
      conn.Lock()
      err := conn.lastErr
      conn.Unlock()
      if err == driver.ErrBadConn {
        conn.Close()
        return nil, driver.ErrBadConn
      }
      return conn, nil
    }
    
    • 没有空闲连接了,或者强制使用新连接。
      • 连接池达到最大了
        连接池达到最大了,就必须阻塞,这里用了一个 channel,类似于 Java 的条件队列
    // Make the connRequest channel. It's buffered so that the
    // connectionOpener doesn't block while waiting for the req to be read.
    req := make(chan connRequest, 1)
    reqKey := db.nextRequestKeyLocked()
    db.connRequests[reqKey] = req
    db.waitCount++
    db.mu.Unlock()
    
    waitStart := time.Now()
    
    // Timeout the connection request with the context.
    select {
      // Context 结束了
      case <-ctx.Done():
        // Remove the connection request and ensure no value has been sent
        // on it after removing.
        db.mu.Lock()
        delete(db.connRequests, reqKey)
        db.mu.Unlock()
    
        atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
        
        select {
          default:
          // 已经发出去了,且创建成功了,那么这个连接就需要加入连接池
          // 但是不清楚为什么这里没有验证了
          case ret, ok := <-req:
            if ok && ret.conn != nil {
              db.putConn(ret.conn, ret.err, false)
            }
        }
        return nil, ctx.Err()
      // 收到返回
      case ret, ok := <-req:
        atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
        // channel 是被关闭了的
        if !ok {
          return nil, errDBClosed
        }
        // 超时了
        if ret.err == nil && ret.conn.expired(lifetime) {
          ret.conn.Close()
          return nil, driver.ErrBadConn
        }
        if ret.conn == nil {
          return nil, ret.err
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        ret.conn.Lock()
        err := ret.conn.lastErr
        ret.conn.Unlock()
        if err == driver.ErrBadConn {
          ret.conn.Close()
          return nil, driver.ErrBadConn
        }
        return ret.conn, ret.err
    }
    
    • 连接池还没有达到最大
      直接新建连接
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
    

    putConnDBLocked

    前面提到,当连接池已满且没有 idel 连接的时候,是通过注册了一个 channel 来异步接收 free 连接的通知的。维护所有 channel 的是

    connReqeusts map[uint64] chan connRequest
    
    type connRequest struct {
        conn *driverConn
        err  error
    }
    

    使用完一个连接后,需要“归还”连接给 DB。DB 的逻辑是:
    如果存在 connRequest,会将 dc 直接交给它
    否则,放入到连接池里,标记为 idel,并启动清理线程,关闭那些超时的连接

    func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
        if db.closed {
            return false
        }
        if db.maxOpen > 0 && db.numOpen > db.maxOpen {
            return false
        }
        if c := len(db.connRequests); c > 0 {
            var req chan connRequest
            var reqKey uint64
            for reqKey, req = range db.connRequests {
                break
            }
            delete(db.connRequests, reqKey) // Remove from pending requests.
            if err == nil {
                dc.inUse = true
            }
            req <- connRequest{
                conn: dc,
                err:  err,
            }
            return true
        } else if err == nil && !db.closed {
            if db.maxIdleConnsLocked() > len(db.freeConn) {
                db.freeConn = append(db.freeConn, dc)
                db.startCleanerLocked()
                return true
            }
            db.maxIdleClosed++
        }
        return false
    }
    

    db.beginDC

    前面的操作完成了连接的获取(创建/释放),下面就要启动一个事务。

    // beginDC starts a transaction. The provided dc must be valid and ready to use.
    func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
        var txi driver.Tx
        withLock(dc, func() { // 就是个包装方法,加锁,执行函数,放锁
            txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 调用 driver 开启事务
        })
        if err != nil {
            release(err)
            return nil, err
        }
            // 后面的主要是暴露上下文,注册资源清理回调
        // Schedule the transaction to rollback when the context is cancelled.
        // The cancel function in Tx will be called after done is set to true.
        ctx, cancel := context.WithCancel(ctx)
        tx = &Tx{
            db:          db,
            dc:          dc,
            releaseConn: release,
            txi:         txi,
            cancel:      cancel,
            ctx:         ctx,
        }
        go tx.awaitDone()
        return tx, nil
    }
    
    // awaitDone blocks until the context in Tx is canceled and rolls back
    // the transaction if it's not already done.
    func (tx *Tx) awaitDone() {
        // 如果在事务提交/回滚前,就结束阻塞,说明 Context结束了,那就要执行资源清理
        <-tx.ctx.Done()
    
        // 关必并从连接池里删除这个连接,来保证事务已经关闭、资源得到释放
        // 对于已经提交/回滚的事务,这个方法不会由任何影响
        tx.rollback(true)
    }
    
    

    相关文章

      网友评论

          本文标题:Go 数据库事务的源码分析

          本文链接:https://www.haomeiwen.com/subject/vluirctx.html