事务是存储系统中一种重要的机制,一个事务可以包含一个或多个操作,一个事务在逻辑上看,是一个不可分割的执行单元,组成事务的操作,必须全部成功,事务才能提交,一旦其中存在操作失败,整个事务中的全部操作都需要回滚。简单来说,事务是一种原子性操作。
举个常见的例子,银行转账问题,现在要把从A账户,转账1000到B账户。包含两个操作:A -= 1000;B += 1000。我们要保证,这两个操作同时成功或者失败。这是事务的典型用法。
在这里,你可能有疑问,事务不是要具备ACID四个属性吗?我想说,这里说的事务,不仅仅指数据库中的事务,而是一种更宽泛的原子操作;这里说的原子操作,也不是ACID中的A,而是广义上原子性定义,其效果相当于A+I。
下面会从两个方面探讨事务原子性的实现:
- all-or-nothing: 事务中的多个操作,要么全部执行,要么全部不执行(当某个操作失败时,回滚到事务执行之前的状态)
- before-or-after:当有多个事务并行执行时,多个事务的执行结果看起来像串行执行的(意思是说,逻辑上,事务是不可分割的,都应该是串行执行的,但实际上,为了性能,会允许一些并行执行的发生)
另外,本文所探讨的事务,主要是单机事务。分布式事务,可参见两阶段提交算法(2PC),三阶段提交算法(3PC)等。
All-or-Nothing
事务中的多个操作,要么全部执行,要么全部不执行。
要实现这个属性,一般有两种方法:1.为事务中的操作提供回滚机制,当部分操作失败时,回滚事务中所有操作,比如数据库中的undo log;2.在事务提交之前,不去修改持久化的数据,而是将数据存入临时的buffer,在提交时,原子的修改所有数据,这样,当部分操作失败时,直接drop掉buffer里的数据即可。
下面介绍实现All-or-Nothing的一些方法。
Multi-Version
多版本,不是一种具体的实现,而是一种思路,简单来说,就是当数据发生更新操作时,不会修改原数据,而是创建一个新的版本。多版本可以很容易的实现All-or-Nothing,当事务失败时,只需要回滚到旧版本的数据即可,基本不需要额外的操作。
多版本也是Before-or-After实现的一种思路,在后文会详细讨论。
Write-Ahead-Log and Checkpoints
预写式日志是比较常见的手段,在执行操作之前,先将操作内容写到持久化存储的日志中。根据WAL日志内容和用途的不同,又可以划分为几种不同的实现。
要注意,无论是什么实现,WAL的写入一般是direct写到硬盘。
Journal
atomicity_fig1.pngJournal的思想是这样的,在存储引擎的数据层Data之外,额外增加一个Journal层(一般为单独的、性能更好的数据盘,比如ssd),在执行事务时,不会直接修改Data层,而是将事务的每个操作日志同步写入Journal层,写入完成,即表示事务已提交。另外存在一个后台进程,定时将Journal层的日志,异步apply到Disk中,用Checkpoint记录当前已经apply的日志位置,Checkpoint之前的Journal日志,就可以删除了。Ceph中,Filestore的原子操作就是使用了Journal这种方式。
现在来看下,这种实现如何保证all-or-nothing。在执行事务时,如果在写入Journal的过程中,发生错误,那么此时,事务尚未提交,直接drop掉当前事务已经写入Journal的日志项,即可达到nothing的状态。而Journal到Data层的apply,一般情况是不会失败的,因为Data层往往为具体的硬盘或者文件,apply日志,即是修改或删除硬盘某个block处的数据,如果发生错误,表示硬盘出现故障,可以直接让存储引擎crash掉。等待错误排除后,重启存储引擎,从checkpoint开始,重新apply数据即可。
Undo Log
Undo log的思想在于,事务操作执行时,会直接修改存储引擎的数据层Data,但在修改之前,会把修改位置原有的数据记录在日志中,这样,如果操作执行失败,那么则可以通过Undo log回滚事务已经执行的操作,实现nothing状态。
Undo log的内容如下,其中操作类型一般为put、delete,原数据存储修改位置原来的数据,新数据存储修改之后的数据。为什么要包含新数据呢,Undo log并不仅仅用于回滚,其本身一般也会承担Redo log的功能。
----------------------
|操作类型|原数据|新数据|
----------------------
在Undo log模式下,事务操作会直接修改Data层,并且因为WAL一般为direct写,事务在操作Data层时,往往为了提升性能,仅写到磁盘/操作系统的缓存中。那么当存储引擎在执行事务时意外崩溃时,磁盘/操作系统缓存中的内容会全部丢失,此时,Data层的状态是未知的,只能根据日志来重放。因为我们不清楚缓存中到底包含了多少没有持久化到磁盘的操作,所以只能从日志开头全部重放,其效率无疑是低下的。因此,引入Checkpoint的概念,每当存储引擎将缓存中内容刷入硬盘时,记录一个checkpoint在日志中,保证checkpoint之前的日志都已经apply到硬盘。这样,在崩溃后的恢复过程中,只需要从日志的最后一个checkpoint开始重放,大大提升了恢复的效率。
Before-or-After
当有多个事务并行执行时,逻辑上看,多个事务的执行结果需要是串行的。
下面介绍实现Before-or-After的一些方法。
Multi-Version
想必你一定听说过多版本并发控制(MVCC),不错,这里的多版本就是指MVCC。MVCC不是一种具体的实现,而是一种思路:通过保存数据的多个版本,以减少或消除不同事务间读写操作间的互斥性,提高并发性能。MVCC具体的实现方式有很多,有些是乐观并发控制,有些是悲观的,下面给出几种实现。
为了方便下面的讨论,定义事务的状态分为:
- pending:事务创建时的状态,表示等待执行或正在执行
- commited:事务执行成功且已提交
- aborted:事务执行失败且已回滚
定义存储类型:下面讨论的存储为一个key-value存储,通过key,可以定位到该key对应的所有value版本,形如key -> value list,value list中存储了该key的所有版本。
定义事务:每个事务都会有一个transaction_id,是一个全局唯一且递增的整数。存在一个全局事务map,可通过事务id索引到具体的事务信息,包括事务的状态。
Mark-Point
前面提到,多版本的思想是使读操作和写操作读取不同的数据,以避免互斥,来提高性能。但是有一个问题,如果多个事务并发执行时,事务id较小的事务要修改某个key,但是事务id较大的事务,却先于它读取了这个key的数据,这就会造成数据不一致。
atomicity_fig2.png标记点可以解决上述问题,标记点要求当前事务在执行之前,必须等待其前一个事务将所有要修改的key都做好标记,然后才能开始执行。当标记完成后,当前事务的读取操作,遇到被标记的key时,就可以得知该key被前一个事务做了修改,需要等待前事务修改完成,然后才能读取。从而解决了上一段的问题。
下面先介绍标记点方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。
事务的开始操作,获取一个新的事务id,在全局的transaction_map中新增该事务的信息,并初始化事务状态为pending,然后等待前一个事务完成mark,或者事务完成,才会开始执行事务。
func begin_transaction() {
id = new_transaction()
previous_id = id - 1
wait until (transaction_map[previous_id].mark_state == marked)
or (transaction_map[previous_id].state != pending)
return id
}
func new_transaction() {
aquire(transaction_map_lock) // make this before-or-after
id = next_transaction_id() // get a new id
transaction_map[id] = new(transaction)
transaction_map[id].state = pending
transaction_map[id].mark_state = nil
release(transaction_map_lock)
return id
}
当标记完成后,调用该函数修改事务的标记状态,通知下一个事务开始执行。
func mark_transaction(this_transaction_id) {
transaction_map[this_transaction_id].mark_state = marked
}
读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。
func read(key, this_transaction_id) {
starting at end of value list until begining {
v = previous version of key
if v.transaction_id >= this_transaction_id {
skip v
}
wait until (transaction_map[v.transaction_id].state != pending)
if transaction_map[v.transaction_id].state == commited {
return v.value
} else {
skip v
}
}
return error("this key doesn't exist")
}
为要修改的key创建一个新的版本,并将其事务id设为当前事务的id。在标记点的实现中,这个函数不会被并发调用,因为begin_transaction的阻塞。
func new_version(key, this_transaction_id) {
if transaction_map[this_transaction_id].mark_state == marked {
return error("this transaction has already been marked")
}
if this_transaction_id == latest_version(key).transaction_id {
return error("do not call this repeatedly")
}
append new version v to value list
v.value = nil
v.transaction_id = this_transaction_id
}
写入操作,将new_value写入到当前事务版本的value中。
func write(key, new_value, this_transaction_id) {
starting at end of value list until begining {
v = previous version of key
if v.transaction_id == this_transaaction_id {
v.value = new_value
return
}
}
return error("this version doesn't exist")
}
使用上述的方法,解决银行转账问题:
func transfer(account_a, account_b, amount) {
id = begin_transaction()
new_version(account_a, id)
new_version(account_b, id)
mark_transaction(id)
a_money = read(account_a, id)
a_money -= amount
write(account_a, a_money, id)
b_money = read(account_b, id)
b_money += amount
write(account_b, b_money, id)
if a_money < 0 {
abort(id)
}else{
commit(id)
}
}
标记点是一种悲观的并发控制方法,使用标记点,所有事务执行前,必须要知道所有要修改的key,并为每个key创建一个新的版本。当后续冲突的事务读取到这个key的版本,发现该版本对应的事务状态为pending,就知悉冲突的发生,需要等待前面事务commited/aborted后,才能继续执行。标记点方法,除了会让所有冲突的事务串行执行之外,事务为要修改keys创建新版本的过程也是完全串行的。在冲突较多时,这么做当然没问题,但是当冲突的概率比较低时,这样做无疑会大幅降低性能。
Read-Capture
悲观并发控制,在事务执行前会提前检测冲突,当检测到冲突时,阻塞参与冲突的部分事务,仅允许一个事务执行,等待冲突消失后,其他事务再继续执行。而乐观并发控制,假定冲突发生概率很低,不会在执行前提前检测冲突,多个并发的事务直接向下执行,直到事务提交或者执行过程中,如果发现冲突,则回滚部分参与冲突的事务,后续重新调度其执行。相比于悲观并发控制,在冲突发生概率不大的场景,其性能会有所提升。
读捕获是一种乐观并发控制的方法。它不会像Mark-point方法那样提前进行标记,而是放任事务直接执行,在事务调用read方法时,会在被读取的key上做一个标记。这个标记告诉后续可能对这个key做写入操作的,但是事务id比当前事务要小的事务,它不再被允许对这个key的版本做修改,它需要中止事务的执行,进入aborted状态,并重新申请一个新的事务id,重新进行调度执行。
下面同样先介绍读捕获方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。
相比于标记点,begin_transaction函数不再阻塞,仅负责创建新事务。
func begin_transaction() {
id = new_transaction()
return id
}
func new_transaction() {
aquire(transaction_map_lock) // make this before-or-after
defer release(transaction_map_lock)
id = next_transaction_id() // get a new id
transaction_map[id] = new(transaction)
transaction_map[id].state = pending
transaction_map[id].mark_state = nil
return id
}
读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。读取结束前,更新该key的latest_reader。
func read(key, this_transaction_id) {
starting at end of value list until begining {
v = previous version of key
if v.transaction_id >= this_transaction_id {
skip v
}
wait until (transaction_map[v.transaction_id].state != pending)
if transaction_map[v.transaction_id].state == commited {
key.latest_reader = max(key.latest_reader, this_transaction_id)
return v.value
} else {
skip v
}
}
return error("this key doesn't exist")
}
为key创建一个新的版本,此时,因为begin_transaction不再阻塞,所以可能有多个事务并发调用该函数,所以需要加锁来保证before-or-after。
func new_version(key, this_transaction_id) {
aquire(key_lock)
defer release(key_lock)
if this_transaction_id < key.latest_reader
or this_transaction_id < latest_version(key).transaction_id {
abort(this_transaction_id)
} else if this_transaction_id == latest_version(key).transaction_id {
return error("do not call this repeatedly")
}
append new version v to value list
v.value = nil
v.transaction_id = this_transaction_id
}
写入到指定版本,没什么好说的。
func write(key, new_value, this_transaction_id) {
starting at end of value list until begining {
v = previous version of key
if v.transaction_id == this_transaaction_id {
v.value = new_value
return
}
}
return error("this version doesn't exist")
}
使用上述的方法,解决银行转账问题:
func transfer(account_a, account_b, amount) {
id = begin_transaction()
a_money = read(account_a, id)
a_money -= amount
new_version(account_a, id)
write(account_a, a_money, id)
b_money = read(account_b, id)
b_money += amount
new_version(account_b, id)
write(account_b, b_money, id)
if a_money < 0 {
abort(id)
}else{
commit(id)
}
}
Compare-and-Set
compare_and_set(CAS)也是一种配合多版本乐观并发控制的方法。其思路是,前期执行时,不做任何标记,但在读取数据时,需要记录当前读取的数据的版本。在事务的提交过程,需要依赖一个原子的CAS操作,CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,它会重新读取一次之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。
获取事务id并创建新事务,不会阻塞。
func begin_transaction() {
id = new_transaction()
return id
}
func new_transaction() {
aquire(transaction_map_lock) // make this before-or-after
defer release(transaction_map_lock)
id = next_transaction_id() // get a new id
transaction_map[id] = new(transaction)
transaction_map[id].state = pending
transaction_map[id].mark_state = nil
return id
}
在CAS模式实现下的MVCC中,不存在分离的new_version和write操作,两者都被封装到compare_and_set函数中原子的执行,所以没有提交的事务,其不会在数据层留下任何标记。所以不需要像前面几种方法一样等待pending状态的事务结束。
func read(key, this_transaction_id) {
starting at end of value list until begining {
v = previous version of key
if v.transaction_id >= this_transaction_id {
skip v
}
if transaction_map[v.transaction_id].state == commited {
return v.value and v.version_id
} else {
skip v
}
}
return error("this key doesn't exist")
}
基于CAS实现MVCC,写操作不能单独调用,必须封装在CAS中。注意,CAS操作必须是原子的,这一版需要依赖底层接口的支持。CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,首先重新读取一个之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。
再重申一遍,整个compare_and_set函数必须是原子的。
func write(key, new_value, this_transaction_id) {
aquire(key_lock)
defer release(key_lock)
if this_transaction_id < latest_version(key).transaction_id {
abort(this_transaction_id)
} else if this_transaction_id == latest_version(key).transaction_id {
return error("do not call this repeatedly")
}
append new version v to value list
v.version_id = next_version_id()
v.value = new_value
v.transaction_id = this_transaction_id
}
// CAS must be atomicity, this need to be ensured by underlying system
func compare_and_set(key_list, version_list, value_map, this_transaction_id) {
for i:=0; i<len(key_list); i++ {
_, cur_version = read(key_list[i], this_transaction_id)
if key_list[i] != version_list[i] {
abort(this_transaction_id)
}
}
for k, v := range value_map {
write(k, v, this_transaction_id)
}
commit(this_transaction_id)
}
使用上述的方法,解决银行转账问题:
func transfer(account_a, account_b, amount) {
id = begin_transaction()
a_money, a_version = read(account_a, id)
a_money -= amount
b_money, b_version = read(account_b, id)
b_money += amount
if a_money < 0 {
abort(id)
}else{
compare_and_set([]{account_a,account_b}, []{a_version,b_version}, map[key]value{account_a: a_money, account_b: b_money})
}
}
Lock
加锁,是实现Before-or-After最常用的手段。
Simple Locking
最简单的方式,搞一把全局大锁,每个事务执行时先要获取这把锁,提交后释放。这种方式,将事务的实行完全串行化,性能较低。
Two-Phase Locking
降低锁的粒度,每个数据项一把锁,当事务执行时,分为两个阶段,第一个阶段,为所有事务执行锁需要读取和写入的数据项进行加锁;第二个阶段,对数据进行读取和写入,每处理完一个数据,即可释放其对应的锁。
另外,加锁时最好按照一定的顺序进行,以避免死锁的发生。
网友评论