美文网首页Ceph数据库专家
存储系统中事务的实现

存储系统中事务的实现

作者: chnmagnus | 来源:发表于2019-01-31 19:07 被阅读4次

事务是存储系统中一种重要的机制,一个事务可以包含一个或多个操作,一个事务在逻辑上看,是一个不可分割的执行单元,组成事务的操作,必须全部成功,事务才能提交,一旦其中存在操作失败,整个事务中的全部操作都需要回滚。简单来说,事务是一种原子性操作。

举个常见的例子,银行转账问题,现在要把从A账户,转账1000到B账户。包含两个操作:A -= 1000;B += 1000。我们要保证,这两个操作同时成功或者失败。这是事务的典型用法。

在这里,你可能有疑问,事务不是要具备ACID四个属性吗?我想说,这里说的事务,不仅仅指数据库中的事务,而是一种更宽泛的原子操作;这里说的原子操作,也不是ACID中的A,而是广义上原子性定义,其效果相当于A+I。

下面会从两个方面探讨事务原子性的实现:

  1. all-or-nothing: 事务中的多个操作,要么全部执行,要么全部不执行(当某个操作失败时,回滚到事务执行之前的状态)
  2. before-or-after:当有多个事务并行执行时,多个事务的执行结果看起来像串行执行的(意思是说,逻辑上,事务是不可分割的,都应该是串行执行的,但实际上,为了性能,会允许一些并行执行的发生)

另外,本文所探讨的事务,主要是单机事务。分布式事务,可参见两阶段提交算法(2PC),三阶段提交算法(3PC)等。

All-or-Nothing

事务中的多个操作,要么全部执行,要么全部不执行。

要实现这个属性,一般有两种方法:1.为事务中的操作提供回滚机制,当部分操作失败时,回滚事务中所有操作,比如数据库中的undo log;2.在事务提交之前,不去修改持久化的数据,而是将数据存入临时的buffer,在提交时,原子的修改所有数据,这样,当部分操作失败时,直接drop掉buffer里的数据即可。

下面介绍实现All-or-Nothing的一些方法。

Multi-Version

多版本,不是一种具体的实现,而是一种思路,简单来说,就是当数据发生更新操作时,不会修改原数据,而是创建一个新的版本。多版本可以很容易的实现All-or-Nothing,当事务失败时,只需要回滚到旧版本的数据即可,基本不需要额外的操作。

多版本也是Before-or-After实现的一种思路,在后文会详细讨论。

Write-Ahead-Log and Checkpoints

预写式日志是比较常见的手段,在执行操作之前,先将操作内容写到持久化存储的日志中。根据WAL日志内容和用途的不同,又可以划分为几种不同的实现。

要注意,无论是什么实现,WAL的写入一般是direct写到硬盘。

Journal

atomicity_fig1.png

Journal的思想是这样的,在存储引擎的数据层Data之外,额外增加一个Journal层(一般为单独的、性能更好的数据盘,比如ssd),在执行事务时,不会直接修改Data层,而是将事务的每个操作日志同步写入Journal层,写入完成,即表示事务已提交。另外存在一个后台进程,定时将Journal层的日志,异步apply到Disk中,用Checkpoint记录当前已经apply的日志位置,Checkpoint之前的Journal日志,就可以删除了。Ceph中,Filestore的原子操作就是使用了Journal这种方式。

现在来看下,这种实现如何保证all-or-nothing。在执行事务时,如果在写入Journal的过程中,发生错误,那么此时,事务尚未提交,直接drop掉当前事务已经写入Journal的日志项,即可达到nothing的状态。而Journal到Data层的apply,一般情况是不会失败的,因为Data层往往为具体的硬盘或者文件,apply日志,即是修改或删除硬盘某个block处的数据,如果发生错误,表示硬盘出现故障,可以直接让存储引擎crash掉。等待错误排除后,重启存储引擎,从checkpoint开始,重新apply数据即可。

Undo Log

Undo log的思想在于,事务操作执行时,会直接修改存储引擎的数据层Data,但在修改之前,会把修改位置原有的数据记录在日志中,这样,如果操作执行失败,那么则可以通过Undo log回滚事务已经执行的操作,实现nothing状态。

Undo log的内容如下,其中操作类型一般为put、delete,原数据存储修改位置原来的数据,新数据存储修改之后的数据。为什么要包含新数据呢,Undo log并不仅仅用于回滚,其本身一般也会承担Redo log的功能。

----------------------
|操作类型|原数据|新数据|
----------------------

在Undo log模式下,事务操作会直接修改Data层,并且因为WAL一般为direct写,事务在操作Data层时,往往为了提升性能,仅写到磁盘/操作系统的缓存中。那么当存储引擎在执行事务时意外崩溃时,磁盘/操作系统缓存中的内容会全部丢失,此时,Data层的状态是未知的,只能根据日志来重放。因为我们不清楚缓存中到底包含了多少没有持久化到磁盘的操作,所以只能从日志开头全部重放,其效率无疑是低下的。因此,引入Checkpoint的概念,每当存储引擎将缓存中内容刷入硬盘时,记录一个checkpoint在日志中,保证checkpoint之前的日志都已经apply到硬盘。这样,在崩溃后的恢复过程中,只需要从日志的最后一个checkpoint开始重放,大大提升了恢复的效率。

Before-or-After

当有多个事务并行执行时,逻辑上看,多个事务的执行结果需要是串行的。

下面介绍实现Before-or-After的一些方法。

Multi-Version

想必你一定听说过多版本并发控制(MVCC),不错,这里的多版本就是指MVCC。MVCC不是一种具体的实现,而是一种思路:通过保存数据的多个版本,以减少或消除不同事务间读写操作间的互斥性,提高并发性能。MVCC具体的实现方式有很多,有些是乐观并发控制,有些是悲观的,下面给出几种实现。

为了方便下面的讨论,定义事务的状态分为:

  1. pending:事务创建时的状态,表示等待执行或正在执行
  2. commited:事务执行成功且已提交
  3. aborted:事务执行失败且已回滚

定义存储类型:下面讨论的存储为一个key-value存储,通过key,可以定位到该key对应的所有value版本,形如key -> value list,value list中存储了该key的所有版本。

定义事务:每个事务都会有一个transaction_id,是一个全局唯一且递增的整数。存在一个全局事务map,可通过事务id索引到具体的事务信息,包括事务的状态。

Mark-Point

前面提到,多版本的思想是使读操作和写操作读取不同的数据,以避免互斥,来提高性能。但是有一个问题,如果多个事务并发执行时,事务id较小的事务要修改某个key,但是事务id较大的事务,却先于它读取了这个key的数据,这就会造成数据不一致。

atomicity_fig2.png

标记点可以解决上述问题,标记点要求当前事务在执行之前,必须等待其前一个事务将所有要修改的key都做好标记,然后才能开始执行。当标记完成后,当前事务的读取操作,遇到被标记的key时,就可以得知该key被前一个事务做了修改,需要等待前事务修改完成,然后才能读取。从而解决了上一段的问题。

下面先介绍标记点方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。

事务的开始操作,获取一个新的事务id,在全局的transaction_map中新增该事务的信息,并初始化事务状态为pending,然后等待前一个事务完成mark,或者事务完成,才会开始执行事务。

func begin_transaction() {
    id = new_transaction()
    previous_id = id - 1
    wait until (transaction_map[previous_id].mark_state == marked)
        or (transaction_map[previous_id].state != pending)
    return id
}

func new_transaction() {
    aquire(transaction_map_lock) // make this before-or-after
    id = next_transaction_id() // get a new id
    transaction_map[id] = new(transaction)
    transaction_map[id].state = pending
    transaction_map[id].mark_state = nil
    release(transaction_map_lock)
    return id
}

当标记完成后,调用该函数修改事务的标记状态,通知下一个事务开始执行。

func mark_transaction(this_transaction_id) {
    transaction_map[this_transaction_id].mark_state = marked
}

读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。

func read(key, this_transaction_id) {
    starting at end of value list until begining {
        v = previous version of key
        if v.transaction_id >= this_transaction_id {
            skip v
        }
        wait until (transaction_map[v.transaction_id].state != pending)
        if transaction_map[v.transaction_id].state == commited {
            return v.value
        } else {
            skip v
        }
    }
    return error("this key doesn't exist")
}

为要修改的key创建一个新的版本,并将其事务id设为当前事务的id。在标记点的实现中,这个函数不会被并发调用,因为begin_transaction的阻塞。

func new_version(key, this_transaction_id) {
    if transaction_map[this_transaction_id].mark_state == marked {
        return error("this transaction has already been marked")
    }
    if this_transaction_id == latest_version(key).transaction_id {
        return error("do not call this repeatedly")
    }
    append new version v to value list
    v.value = nil
    v.transaction_id = this_transaction_id
}

写入操作,将new_value写入到当前事务版本的value中。

func write(key, new_value, this_transaction_id) {
    starting at end of value list until begining {
        v = previous version of key
        if v.transaction_id == this_transaaction_id {
            v.value = new_value
            return
        }
    }
    return error("this version doesn't exist")
}

使用上述的方法,解决银行转账问题:

func transfer(account_a, account_b, amount) {
    id = begin_transaction()
    new_version(account_a, id)
    new_version(account_b, id)
    mark_transaction(id)

    a_money = read(account_a, id)
    a_money -= amount
    write(account_a, a_money, id)

    b_money = read(account_b, id)
    b_money += amount
    write(account_b, b_money, id)

    if a_money < 0 {
        abort(id)
    }else{
        commit(id)
    }
}

标记点是一种悲观的并发控制方法,使用标记点,所有事务执行前,必须要知道所有要修改的key,并为每个key创建一个新的版本。当后续冲突的事务读取到这个key的版本,发现该版本对应的事务状态为pending,就知悉冲突的发生,需要等待前面事务commited/aborted后,才能继续执行。标记点方法,除了会让所有冲突的事务串行执行之外,事务为要修改keys创建新版本的过程也是完全串行的。在冲突较多时,这么做当然没问题,但是当冲突的概率比较低时,这样做无疑会大幅降低性能。

Read-Capture

悲观并发控制,在事务执行前会提前检测冲突,当检测到冲突时,阻塞参与冲突的部分事务,仅允许一个事务执行,等待冲突消失后,其他事务再继续执行。而乐观并发控制,假定冲突发生概率很低,不会在执行前提前检测冲突,多个并发的事务直接向下执行,直到事务提交或者执行过程中,如果发现冲突,则回滚部分参与冲突的事务,后续重新调度其执行。相比于悲观并发控制,在冲突发生概率不大的场景,其性能会有所提升。

读捕获是一种乐观并发控制的方法。它不会像Mark-point方法那样提前进行标记,而是放任事务直接执行,在事务调用read方法时,会在被读取的key上做一个标记。这个标记告诉后续可能对这个key做写入操作的,但是事务id比当前事务要小的事务,它不再被允许对这个key的版本做修改,它需要中止事务的执行,进入aborted状态,并重新申请一个新的事务id,重新进行调度执行。

下面同样先介绍读捕获方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。

相比于标记点,begin_transaction函数不再阻塞,仅负责创建新事务。

func begin_transaction() {
    id = new_transaction()
    return id
}

func new_transaction() {
    aquire(transaction_map_lock) // make this before-or-after
    defer release(transaction_map_lock)

    id = next_transaction_id() // get a new id
    transaction_map[id] = new(transaction)
    transaction_map[id].state = pending
    transaction_map[id].mark_state = nil
    return id
}

读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。读取结束前,更新该key的latest_reader。

func read(key, this_transaction_id) {
    starting at end of value list until begining {
        v = previous version of key
        if v.transaction_id >= this_transaction_id {
            skip v
        }
        wait until (transaction_map[v.transaction_id].state != pending)
        if transaction_map[v.transaction_id].state == commited {
            key.latest_reader = max(key.latest_reader, this_transaction_id)
            return v.value
        } else {
            skip v
        }
    }
    return error("this key doesn't exist")
}

为key创建一个新的版本,此时,因为begin_transaction不再阻塞,所以可能有多个事务并发调用该函数,所以需要加锁来保证before-or-after。

func new_version(key, this_transaction_id) {
    aquire(key_lock)
    defer release(key_lock)
     
    if this_transaction_id < key.latest_reader
    or this_transaction_id < latest_version(key).transaction_id {
        abort(this_transaction_id)
    } else if this_transaction_id == latest_version(key).transaction_id {
        return error("do not call this repeatedly")
    }
    append new version v to value list
    v.value = nil
    v.transaction_id = this_transaction_id
}

写入到指定版本,没什么好说的。

func write(key, new_value, this_transaction_id) {
    starting at end of value list until begining {
        v = previous version of key
        if v.transaction_id == this_transaaction_id {
            v.value = new_value
            return
        }
    }
    return error("this version doesn't exist")
}

使用上述的方法,解决银行转账问题:

func transfer(account_a, account_b, amount) {
    id = begin_transaction()
    a_money = read(account_a, id)
    a_money -= amount
    new_version(account_a, id)
    write(account_a, a_money, id)

    b_money = read(account_b, id)
    b_money += amount
    new_version(account_b, id)
    write(account_b, b_money, id)

    if a_money < 0 {
        abort(id)
    }else{
        commit(id)
    }
}

Compare-and-Set

compare_and_set(CAS)也是一种配合多版本乐观并发控制的方法。其思路是,前期执行时,不做任何标记,但在读取数据时,需要记录当前读取的数据的版本。在事务的提交过程,需要依赖一个原子的CAS操作,CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,它会重新读取一次之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。

获取事务id并创建新事务,不会阻塞。

func begin_transaction() {
    id = new_transaction()
    return id
}

func new_transaction() {
    aquire(transaction_map_lock) // make this before-or-after
    defer release(transaction_map_lock)

    id = next_transaction_id() // get a new id
    transaction_map[id] = new(transaction)
    transaction_map[id].state = pending
    transaction_map[id].mark_state = nil
    return id
}

在CAS模式实现下的MVCC中,不存在分离的new_version和write操作,两者都被封装到compare_and_set函数中原子的执行,所以没有提交的事务,其不会在数据层留下任何标记。所以不需要像前面几种方法一样等待pending状态的事务结束。

func read(key, this_transaction_id) {
    starting at end of value list until begining {
        v = previous version of key
        if v.transaction_id >= this_transaction_id {
            skip v
        }
        if transaction_map[v.transaction_id].state == commited {
            return v.value and v.version_id
        } else {
            skip v
        }
    }
    return error("this key doesn't exist")
}

基于CAS实现MVCC,写操作不能单独调用,必须封装在CAS中。注意,CAS操作必须是原子的,这一版需要依赖底层接口的支持。CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,首先重新读取一个之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。

再重申一遍,整个compare_and_set函数必须是原子的。

func write(key, new_value, this_transaction_id) {
    aquire(key_lock)
    defer release(key_lock)     
    if this_transaction_id < latest_version(key).transaction_id {
        abort(this_transaction_id)
    } else if this_transaction_id == latest_version(key).transaction_id {
        return error("do not call this repeatedly")
    }
   append new version v to value list
    v.version_id = next_version_id()
    v.value = new_value
    v.transaction_id = this_transaction_id
}

// CAS must be atomicity, this need to be ensured by underlying system 
func compare_and_set(key_list, version_list, value_map, this_transaction_id) {
    for i:=0; i<len(key_list); i++ {
        _, cur_version = read(key_list[i], this_transaction_id)
        if key_list[i] != version_list[i] {
            abort(this_transaction_id)
        }
    }
    for k, v := range value_map {
        write(k, v, this_transaction_id)
    }
    commit(this_transaction_id)
}

使用上述的方法,解决银行转账问题:

func transfer(account_a, account_b, amount) {
    id = begin_transaction()
    a_money, a_version = read(account_a, id)
    a_money -= amount

    b_money, b_version = read(account_b, id)
    b_money += amount

    if a_money < 0 {
        abort(id)
    }else{
        compare_and_set([]{account_a,account_b}, []{a_version,b_version}, map[key]value{account_a: a_money, account_b: b_money})
    }
}

Lock

加锁,是实现Before-or-After最常用的手段。

Simple Locking

最简单的方式,搞一把全局大锁,每个事务执行时先要获取这把锁,提交后释放。这种方式,将事务的实行完全串行化,性能较低。

Two-Phase Locking

降低锁的粒度,每个数据项一把锁,当事务执行时,分为两个阶段,第一个阶段,为所有事务执行锁需要读取和写入的数据项进行加锁;第二个阶段,对数据进行读取和写入,每处理完一个数据,即可释放其对应的锁。

另外,加锁时最好按照一定的顺序进行,以避免死锁的发生。

相关文章

  • 存储系统中事务的实现

    事务是存储系统中一种重要的机制,一个事务可以包含一个或多个操作,一个事务在逻辑上看,是一个不可分割的执行单元,组成...

  • Spring中实现事务方式

    Spring 中实现事务的方式 Spring 并不直接支持事务,只有当数据库支持事务时,Spring 才支持事务,...

  • 面试问题记录(一) Day50 2019-01-09

    并发中的常用锁,乐观锁和悲观锁,实现及使用 事务的特性,原理,spring如何实现事务 spring的实现机制,I...

  • 数据库事务书目录

    数据库事务 事务概念 本地事务 全局事务 全局事务的定义 J2EE中全局事务的实现 全局事务的优缺点 基于消息的分...

  • Spring之声明式事务

    十二、声明式事务 目录:事务、代码实现、Spring中的事务管理 1.事务 事务在项目开发过程非常重要,涉及到数据...

  • MySQL事务

    事务介绍 在MySQL中事务是由存储引擎实现的,而且支持事务的存储引擎不多,我们主要讲解InnoDB存储引擎中的事...

  • 【Spring】Spring事务

    一.Spring事务 配置方式 项目通过声明式事务中的拦截器方法实现事务管理。 首先配置TransactionIn...

  • Spring 事务之声明式事务

    一.声明式事务实现 将编程式事务章节中applicationContext.xml修改下: 声明式事务通过AOP代...

  • Spring-AOP事物管理

    1.概述 Spring 中的事务主要是利用 Aop 思想,简化事务的配置 2.核心接口 Spring事务管理的实现...

  • mysql 物理日志之redo log(重做日志)原理和介绍

    重做日志用来实现事务的持久性,即事务ACID中的D。 InnoDB是事务的存储引擎,其通过 Force Log a...

网友评论

    本文标题:存储系统中事务的实现

    本文链接:https://www.haomeiwen.com/subject/nmjlsqtx.html