假设 mobile_applications 表的字段只有两个 app_name, other_info,均为 varchar(256),先上一段简单的业务逻辑代码
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, _ := sql.Open("mysql",
"root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
// tx1
tx, _ := db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第一个应用", "第一个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第二个应用", "第二个应用的信息")
_ = tx.Commit()
// tx2
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第三个应用", "第三个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第四个应用", "第四个应用的信息")
//_ = tx.Commit()
// tx3
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第五个应用", "第五个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第六个应用", "第六个应用的信息")
_ = tx.Rollback()
}
执行结果:
mysql> select * from mobile_applications;
+-----------------+--------------------------+------------------+---------------+
| app_name | other_info | xxx_unrecognized | xxx_sizecache |
+-----------------+--------------------------+------------------+---------------+
| 第一个应用 | 第一个应用的信息 | NULL | NULL |
| 第二个应用 | 第二个应用的信息 | NULL | NULL |
+-----------------+--------------------------+------------------+---------------+
2 rows in set (0.00 sec)
显然,只有 tx1 最终落库,tx2 和 tx3 都没有落库。这个符合我们对事务的理解,即只有 commit 的操作才会最终落库。
这里我们首先要理解,对于计算机而言,什么是事务?数据库系统概念里对“事务”的定义相信很多人耳熟能详,但是对于我们运行的程序而言,事务实际上分为两层,第一层是内存中的上下文,第二层是DBMS控制的我们常说的“事务”。底层事务实际上是通过上下文来定义和操作的,中间隔了层 driver,屏蔽了具体的细节。
以 go 中的 事务为例,是通过一个 struct 来定义的。
type Tx struct {
db *DB
// closemu prevents the transaction from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned exclusively until Commit or Rollback, at which point
// it's returned with putConn.
dc *driverConn
txi driver.Tx
// releaseConn is called once the Tx is closed to release
// any held driverConn back to the pool.
releaseConn func(error)
// done transitions from 0 to 1 exactly once, on Commit
// or Rollback. once done, all operations fail with
// ErrTxDone.
// Use atomic operations on value when checking value.
done int32
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
sync.Mutex
v []*Stmt
}
// cancel is called after done transitions from 0 to 1.
cancel func()
// ctx lives for the life of the transaction.
ctx context.Context
}
其中核心是:
- driverConn
go 里面的数据库连接,封装了 driver 里的数据库连接。 - driver.Tx
定义了 commit 和 rollback 两个方法。
一个事务,实际上就是所有 CRUD 操作都在同一个数据库连接里,调用 Begin 会通过该连接,经有 driver 执行特定 DBMS 的事务开启指令,比如 mysql 的 driver 就是
func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
if mc.closed.IsSet() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
var q string
if readOnly {
q = "START TRANSACTION READ ONLY"
} else {
q = "START TRANSACTION"
}
err := mc.exec(q)
if err == nil {
return &mysqlTx{mc}, err
}
return nil, mc.markBadConn(err)
}
后续操作就都在一个事务里了。上层可以通过 Begin 方法返回的 Commit/Rollback 方法来提交或回滚这个事务。那么,go 是如何实现对数据库事务的支持的呢?我们从入口代码一步一步来看。
db.Beigin
driver.go 里定义了 ErrBadConn: driver 抛出的错误,表示底层连接不可用或已中断,上层应该重新用一个连接
sql.go 里定义了两个常量,
-
cachedOrNewConn:
如果连接池里有 idle 状态的连接,直接返回;如果连接池里的连接数已经达到 MaxOpenCons 定义的数量,则阻塞等待,直到有一个连接 idel;否则,创建新的连接,加入到连接池,然后返回。 -
alwaysNewConn:
强制使用新的连接而不是从接池里里复用
// Begin() 方法是带参数版本的一个默认版本
func (db *DB) Begin() (*Tx, error) {
return db.BeginTx(context.Background(), nil)
}
// 可以设置 Contxt 和 事务配置项
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
var tx *Tx
var err error
// 如果遇到 ErrBadConn 默认会重试 2 次
for i := 0; i < maxBadConnRetries; i++ {
tx, err = db.begin(ctx, opts, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.begin(ctx, opts, alwaysNewConn)
}
return tx, err
}
func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}
外部调用最终进入私有方法 begin 里,begin 主要完成两个操作,一是获取一个数据库连接,二是创建事务的上下文,即开启事务。下面我们一一来看。
db.conn
这个方法很长,核心业务逻辑主要分为以下几个部分:
- 检查
首先检查 db 是否关闭了,再检查是否 context 过期了,若任一为是都会直接返回错误
db.mu.Lock() // 临界区的结束点不同情况不相同
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime // 后面逻辑使用
- cachedOrNewConn 且 存在 idle 的连接
// db.freeConn 是一个 slice,存储了 idle 的连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
// 移走第一个
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock() // 释放锁了
// 判断连接是否过期了
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// lastErr 字段的意义没看懂,好像在返回一个连接之前都要检查这个错误有没有设置
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
- 没有空闲连接了,或者强制使用新连接。
- 连接池达到最大了
连接池达到最大了,就必须阻塞,这里用了一个 channel,类似于 Java 的条件队列
- 连接池达到最大了
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
// Context 结束了
case <-ctx.Done():
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
// 已经发出去了,且创建成功了,那么这个连接就需要加入连接池
// 但是不清楚为什么这里没有验证了
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
// 收到返回
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
// channel 是被关闭了的
if !ok {
return nil, errDBClosed
}
// 超时了
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
- 连接池还没有达到最大
直接新建连接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
putConnDBLocked
前面提到,当连接池已满且没有 idel 连接的时候,是通过注册了一个 channel 来异步接收 free 连接的通知的。维护所有 channel 的是
connReqeusts map[uint64] chan connRequest
type connRequest struct {
conn *driverConn
err error
}
使用完一个连接后,需要“归还”连接给 DB。DB 的逻辑是:
如果存在 connRequest,会将 dc 直接交给它
否则,放入到连接池里,标记为 idel,并启动清理线程,关闭那些超时的连接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
db.beginDC
前面的操作完成了连接的获取(创建/释放),下面就要启动一个事务。
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
withLock(dc, func() { // 就是个包装方法,加锁,执行函数,放锁
txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 调用 driver 开启事务
})
if err != nil {
release(err)
return nil, err
}
// 后面的主要是暴露上下文,注册资源清理回调
// Schedule the transaction to rollback when the context is cancelled.
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
db: db,
dc: dc,
releaseConn: release,
txi: txi,
cancel: cancel,
ctx: ctx,
}
go tx.awaitDone()
return tx, nil
}
// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// 如果在事务提交/回滚前,就结束阻塞,说明 Context结束了,那就要执行资源清理
<-tx.ctx.Done()
// 关必并从连接池里删除这个连接,来保证事务已经关闭、资源得到释放
// 对于已经提交/回滚的事务,这个方法不会由任何影响
tx.rollback(true)
}
网友评论