美文网首页
Go 数据库事务的源码分析

Go 数据库事务的源码分析

作者: CocoAdapter | 来源:发表于2019-07-29 22:26 被阅读0次

假设 mobile_applications 表的字段只有两个 app_name, other_info,均为 varchar(256),先上一段简单的业务逻辑代码

package main

import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)

func main() {
    db, _ := sql.Open("mysql",
        "root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
// tx1
    tx, _ := db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第一个应用", "第一个应用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第二个应用", "第二个应用的信息")
    _ = tx.Commit()

// tx2
    tx, _ = db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第三个应用", "第三个应用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第四个应用", "第四个应用的信息")
    //_ = tx.Commit()

// tx3
    tx, _ = db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第五个应用", "第五个应用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第六个应用", "第六个应用的信息")
    _ = tx.Rollback()
}

执行结果:

mysql> select * from mobile_applications;
+-----------------+--------------------------+------------------+---------------+
| app_name        | other_info               | xxx_unrecognized | xxx_sizecache |
+-----------------+--------------------------+------------------+---------------+
| 第一个应用      | 第一个应用的信息         | NULL             |          NULL |
| 第二个应用      | 第二个应用的信息         | NULL             |          NULL |
+-----------------+--------------------------+------------------+---------------+
2 rows in set (0.00 sec)

显然,只有 tx1 最终落库,tx2 和 tx3 都没有落库。这个符合我们对事务的理解,即只有 commit 的操作才会最终落库。

这里我们首先要理解,对于计算机而言,什么是事务?数据库系统概念里对“事务”的定义相信很多人耳熟能详,但是对于我们运行的程序而言,事务实际上分为两层,第一层是内存中的上下文,第二层是DBMS控制的我们常说的“事务”。底层事务实际上是通过上下文来定义和操作的,中间隔了层 driver,屏蔽了具体的细节。
以 go 中的 事务为例,是通过一个 struct 来定义的。

type Tx struct {
    db *DB

    // closemu prevents the transaction from closing while there
    // is an active query. It is held for read during queries
    // and exclusively during close.
    closemu sync.RWMutex

    // dc is owned exclusively until Commit or Rollback, at which point
    // it's returned with putConn.
    dc  *driverConn
    txi driver.Tx

    // releaseConn is called once the Tx is closed to release
    // any held driverConn back to the pool.
    releaseConn func(error)

    // done transitions from 0 to 1 exactly once, on Commit
    // or Rollback. once done, all operations fail with
    // ErrTxDone.
    // Use atomic operations on value when checking value.
    done int32

    // All Stmts prepared for this transaction. These will be closed after the
    // transaction has been committed or rolled back.
    stmts struct {
        sync.Mutex
        v []*Stmt
    }

    // cancel is called after done transitions from 0 to 1.
    cancel func()

    // ctx lives for the life of the transaction.
    ctx context.Context
}

其中核心是:

  • driverConn
    go 里面的数据库连接,封装了 driver 里的数据库连接。
  • driver.Tx
    定义了 commit 和 rollback 两个方法。

一个事务,实际上就是所有 CRUD 操作都在同一个数据库连接里,调用 Begin 会通过该连接,经有 driver 执行特定 DBMS 的事务开启指令,比如 mysql 的 driver 就是

func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
    if mc.closed.IsSet() {
        errLog.Print(ErrInvalidConn)
        return nil, driver.ErrBadConn
    }
    var q string
    if readOnly {
        q = "START TRANSACTION READ ONLY"
    } else {
        q = "START TRANSACTION"
    }
    err := mc.exec(q)
    if err == nil {
        return &mysqlTx{mc}, err
    }
    return nil, mc.markBadConn(err)
}

后续操作就都在一个事务里了。上层可以通过 Begin 方法返回的 Commit/Rollback 方法来提交或回滚这个事务。那么,go 是如何实现对数据库事务的支持的呢?我们从入口代码一步一步来看。

db.Beigin

driver.go 里定义了 ErrBadConn: driver 抛出的错误,表示底层连接不可用或已中断,上层应该重新用一个连接
sql.go 里定义了两个常量,

  • cachedOrNewConn:
    如果连接池里有 idle 状态的连接,直接返回;如果连接池里的连接数已经达到 MaxOpenCons 定义的数量,则阻塞等待,直到有一个连接 idel;否则,创建新的连接,加入到连接池,然后返回。
  • alwaysNewConn:
    强制使用新的连接而不是从接池里里复用
// Begin() 方法是带参数版本的一个默认版本
func (db *DB) Begin() (*Tx, error) {
    return db.BeginTx(context.Background(), nil)
}

// 可以设置 Contxt 和 事务配置项
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
    var tx *Tx
    var err error
// 如果遇到 ErrBadConn 默认会重试 2 次
    for i := 0; i < maxBadConnRetries; i++ {
        tx, err = db.begin(ctx, opts, cachedOrNewConn)
        if err != driver.ErrBadConn {
            break
        }
    }
    if err == driver.ErrBadConn {
        return db.begin(ctx, opts, alwaysNewConn)
    }
    return tx, err
}

func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
    dc, err := db.conn(ctx, strategy)
    if err != nil {
        return nil, err
    }
    return db.beginDC(ctx, dc, dc.releaseConn, opts)
}

外部调用最终进入私有方法 begin 里,begin 主要完成两个操作,一是获取一个数据库连接,二是创建事务的上下文,即开启事务。下面我们一一来看。

db.conn

这个方法很长,核心业务逻辑主要分为以下几个部分:

  • 检查
    首先检查 db 是否关闭了,再检查是否 context 过期了,若任一为是都会直接返回错误
db.mu.Lock() // 临界区的结束点不同情况不相同
if db.closed {
  db.mu.Unlock()
  return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
  case <-ctx.Done():
  db.mu.Unlock()
  return nil, ctx.Err()
}

lifetime := db.maxLifetime // 后面逻辑使用
  • cachedOrNewConn 且 存在 idle 的连接
// db.freeConn 是一个 slice,存储了 idle 的连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
  conn := db.freeConn[0]
// 移走第一个
  copy(db.freeConn, db.freeConn[1:])
  db.freeConn = db.freeConn[:numFree-1]

  conn.inUse = true
  db.mu.Unlock() // 释放锁了
// 判断连接是否过期了
  if conn.expired(lifetime) {
    conn.Close()
    return nil, driver.ErrBadConn
  }
  // lastErr 字段的意义没看懂,好像在返回一个连接之前都要检查这个错误有没有设置
  // Lock around reading lastErr to ensure the session resetter finished.
  conn.Lock()
  err := conn.lastErr
  conn.Unlock()
  if err == driver.ErrBadConn {
    conn.Close()
    return nil, driver.ErrBadConn
  }
  return conn, nil
}
  • 没有空闲连接了,或者强制使用新连接。
    • 连接池达到最大了
      连接池达到最大了,就必须阻塞,这里用了一个 channel,类似于 Java 的条件队列
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()

waitStart := time.Now()

// Timeout the connection request with the context.
select {
  // Context 结束了
  case <-ctx.Done():
    // Remove the connection request and ensure no value has been sent
    // on it after removing.
    db.mu.Lock()
    delete(db.connRequests, reqKey)
    db.mu.Unlock()

    atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
    
    select {
      default:
      // 已经发出去了,且创建成功了,那么这个连接就需要加入连接池
      // 但是不清楚为什么这里没有验证了
      case ret, ok := <-req:
        if ok && ret.conn != nil {
          db.putConn(ret.conn, ret.err, false)
        }
    }
    return nil, ctx.Err()
  // 收到返回
  case ret, ok := <-req:
    atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
    // channel 是被关闭了的
    if !ok {
      return nil, errDBClosed
    }
    // 超时了
    if ret.err == nil && ret.conn.expired(lifetime) {
      ret.conn.Close()
      return nil, driver.ErrBadConn
    }
    if ret.conn == nil {
      return nil, ret.err
    }
    // Lock around reading lastErr to ensure the session resetter finished.
    ret.conn.Lock()
    err := ret.conn.lastErr
    ret.conn.Unlock()
    if err == driver.ErrBadConn {
      ret.conn.Close()
      return nil, driver.ErrBadConn
    }
    return ret.conn, ret.err
}
  • 连接池还没有达到最大
    直接新建连接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
  db.mu.Lock()
  db.numOpen-- // correct for earlier optimism
  db.maybeOpenNewConnections()
  db.mu.Unlock()
  return nil, err
}
db.mu.Lock()
dc := &driverConn{
  db:        db,
  createdAt: nowFunc(),
  ci:        ci,
  inUse:     true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil

putConnDBLocked

前面提到,当连接池已满且没有 idel 连接的时候,是通过注册了一个 channel 来异步接收 free 连接的通知的。维护所有 channel 的是

connReqeusts map[uint64] chan connRequest

type connRequest struct {
    conn *driverConn
    err  error
}

使用完一个连接后,需要“归还”连接给 DB。DB 的逻辑是:
如果存在 connRequest,会将 dc 直接交给它
否则,放入到连接池里,标记为 idel,并启动清理线程,关闭那些超时的连接

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true
        }
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false
}

db.beginDC

前面的操作完成了连接的获取(创建/释放),下面就要启动一个事务。

// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
    var txi driver.Tx
    withLock(dc, func() { // 就是个包装方法,加锁,执行函数,放锁
        txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 调用 driver 开启事务
    })
    if err != nil {
        release(err)
        return nil, err
    }
        // 后面的主要是暴露上下文,注册资源清理回调
    // Schedule the transaction to rollback when the context is cancelled.
    // The cancel function in Tx will be called after done is set to true.
    ctx, cancel := context.WithCancel(ctx)
    tx = &Tx{
        db:          db,
        dc:          dc,
        releaseConn: release,
        txi:         txi,
        cancel:      cancel,
        ctx:         ctx,
    }
    go tx.awaitDone()
    return tx, nil
}

// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
    // 如果在事务提交/回滚前,就结束阻塞,说明 Context结束了,那就要执行资源清理
    <-tx.ctx.Done()

    // 关必并从连接池里删除这个连接,来保证事务已经关闭、资源得到释放
    // 对于已经提交/回滚的事务,这个方法不会由任何影响
    tx.rollback(true)
}

相关文章

网友评论

      本文标题:Go 数据库事务的源码分析

      本文链接:https://www.haomeiwen.com/subject/vluirctx.html