美文网首页Ceph数据库专家
存储系统中事务的实现

存储系统中事务的实现

作者: chnmagnus | 来源:发表于2019-01-31 19:07 被阅读4次

    事务是存储系统中一种重要的机制,一个事务可以包含一个或多个操作,一个事务在逻辑上看,是一个不可分割的执行单元,组成事务的操作,必须全部成功,事务才能提交,一旦其中存在操作失败,整个事务中的全部操作都需要回滚。简单来说,事务是一种原子性操作。

    举个常见的例子,银行转账问题,现在要把从A账户,转账1000到B账户。包含两个操作:A -= 1000;B += 1000。我们要保证,这两个操作同时成功或者失败。这是事务的典型用法。

    在这里,你可能有疑问,事务不是要具备ACID四个属性吗?我想说,这里说的事务,不仅仅指数据库中的事务,而是一种更宽泛的原子操作;这里说的原子操作,也不是ACID中的A,而是广义上原子性定义,其效果相当于A+I。

    下面会从两个方面探讨事务原子性的实现:

    1. all-or-nothing: 事务中的多个操作,要么全部执行,要么全部不执行(当某个操作失败时,回滚到事务执行之前的状态)
    2. before-or-after:当有多个事务并行执行时,多个事务的执行结果看起来像串行执行的(意思是说,逻辑上,事务是不可分割的,都应该是串行执行的,但实际上,为了性能,会允许一些并行执行的发生)

    另外,本文所探讨的事务,主要是单机事务。分布式事务,可参见两阶段提交算法(2PC),三阶段提交算法(3PC)等。

    All-or-Nothing

    事务中的多个操作,要么全部执行,要么全部不执行。

    要实现这个属性,一般有两种方法:1.为事务中的操作提供回滚机制,当部分操作失败时,回滚事务中所有操作,比如数据库中的undo log;2.在事务提交之前,不去修改持久化的数据,而是将数据存入临时的buffer,在提交时,原子的修改所有数据,这样,当部分操作失败时,直接drop掉buffer里的数据即可。

    下面介绍实现All-or-Nothing的一些方法。

    Multi-Version

    多版本,不是一种具体的实现,而是一种思路,简单来说,就是当数据发生更新操作时,不会修改原数据,而是创建一个新的版本。多版本可以很容易的实现All-or-Nothing,当事务失败时,只需要回滚到旧版本的数据即可,基本不需要额外的操作。

    多版本也是Before-or-After实现的一种思路,在后文会详细讨论。

    Write-Ahead-Log and Checkpoints

    预写式日志是比较常见的手段,在执行操作之前,先将操作内容写到持久化存储的日志中。根据WAL日志内容和用途的不同,又可以划分为几种不同的实现。

    要注意,无论是什么实现,WAL的写入一般是direct写到硬盘。

    Journal

    atomicity_fig1.png

    Journal的思想是这样的,在存储引擎的数据层Data之外,额外增加一个Journal层(一般为单独的、性能更好的数据盘,比如ssd),在执行事务时,不会直接修改Data层,而是将事务的每个操作日志同步写入Journal层,写入完成,即表示事务已提交。另外存在一个后台进程,定时将Journal层的日志,异步apply到Disk中,用Checkpoint记录当前已经apply的日志位置,Checkpoint之前的Journal日志,就可以删除了。Ceph中,Filestore的原子操作就是使用了Journal这种方式。

    现在来看下,这种实现如何保证all-or-nothing。在执行事务时,如果在写入Journal的过程中,发生错误,那么此时,事务尚未提交,直接drop掉当前事务已经写入Journal的日志项,即可达到nothing的状态。而Journal到Data层的apply,一般情况是不会失败的,因为Data层往往为具体的硬盘或者文件,apply日志,即是修改或删除硬盘某个block处的数据,如果发生错误,表示硬盘出现故障,可以直接让存储引擎crash掉。等待错误排除后,重启存储引擎,从checkpoint开始,重新apply数据即可。

    Undo Log

    Undo log的思想在于,事务操作执行时,会直接修改存储引擎的数据层Data,但在修改之前,会把修改位置原有的数据记录在日志中,这样,如果操作执行失败,那么则可以通过Undo log回滚事务已经执行的操作,实现nothing状态。

    Undo log的内容如下,其中操作类型一般为put、delete,原数据存储修改位置原来的数据,新数据存储修改之后的数据。为什么要包含新数据呢,Undo log并不仅仅用于回滚,其本身一般也会承担Redo log的功能。

    ----------------------
    |操作类型|原数据|新数据|
    ----------------------
    

    在Undo log模式下,事务操作会直接修改Data层,并且因为WAL一般为direct写,事务在操作Data层时,往往为了提升性能,仅写到磁盘/操作系统的缓存中。那么当存储引擎在执行事务时意外崩溃时,磁盘/操作系统缓存中的内容会全部丢失,此时,Data层的状态是未知的,只能根据日志来重放。因为我们不清楚缓存中到底包含了多少没有持久化到磁盘的操作,所以只能从日志开头全部重放,其效率无疑是低下的。因此,引入Checkpoint的概念,每当存储引擎将缓存中内容刷入硬盘时,记录一个checkpoint在日志中,保证checkpoint之前的日志都已经apply到硬盘。这样,在崩溃后的恢复过程中,只需要从日志的最后一个checkpoint开始重放,大大提升了恢复的效率。

    Before-or-After

    当有多个事务并行执行时,逻辑上看,多个事务的执行结果需要是串行的。

    下面介绍实现Before-or-After的一些方法。

    Multi-Version

    想必你一定听说过多版本并发控制(MVCC),不错,这里的多版本就是指MVCC。MVCC不是一种具体的实现,而是一种思路:通过保存数据的多个版本,以减少或消除不同事务间读写操作间的互斥性,提高并发性能。MVCC具体的实现方式有很多,有些是乐观并发控制,有些是悲观的,下面给出几种实现。

    为了方便下面的讨论,定义事务的状态分为:

    1. pending:事务创建时的状态,表示等待执行或正在执行
    2. commited:事务执行成功且已提交
    3. aborted:事务执行失败且已回滚

    定义存储类型:下面讨论的存储为一个key-value存储,通过key,可以定位到该key对应的所有value版本,形如key -> value list,value list中存储了该key的所有版本。

    定义事务:每个事务都会有一个transaction_id,是一个全局唯一且递增的整数。存在一个全局事务map,可通过事务id索引到具体的事务信息,包括事务的状态。

    Mark-Point

    前面提到,多版本的思想是使读操作和写操作读取不同的数据,以避免互斥,来提高性能。但是有一个问题,如果多个事务并发执行时,事务id较小的事务要修改某个key,但是事务id较大的事务,却先于它读取了这个key的数据,这就会造成数据不一致。

    atomicity_fig2.png

    标记点可以解决上述问题,标记点要求当前事务在执行之前,必须等待其前一个事务将所有要修改的key都做好标记,然后才能开始执行。当标记完成后,当前事务的读取操作,遇到被标记的key时,就可以得知该key被前一个事务做了修改,需要等待前事务修改完成,然后才能读取。从而解决了上一段的问题。

    下面先介绍标记点方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。

    事务的开始操作,获取一个新的事务id,在全局的transaction_map中新增该事务的信息,并初始化事务状态为pending,然后等待前一个事务完成mark,或者事务完成,才会开始执行事务。

    func begin_transaction() {
        id = new_transaction()
        previous_id = id - 1
        wait until (transaction_map[previous_id].mark_state == marked)
            or (transaction_map[previous_id].state != pending)
        return id
    }
    
    func new_transaction() {
        aquire(transaction_map_lock) // make this before-or-after
        id = next_transaction_id() // get a new id
        transaction_map[id] = new(transaction)
        transaction_map[id].state = pending
        transaction_map[id].mark_state = nil
        release(transaction_map_lock)
        return id
    }
    

    当标记完成后,调用该函数修改事务的标记状态,通知下一个事务开始执行。

    func mark_transaction(this_transaction_id) {
        transaction_map[this_transaction_id].mark_state = marked
    }
    

    读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。

    func read(key, this_transaction_id) {
        starting at end of value list until begining {
            v = previous version of key
            if v.transaction_id >= this_transaction_id {
                skip v
            }
            wait until (transaction_map[v.transaction_id].state != pending)
            if transaction_map[v.transaction_id].state == commited {
                return v.value
            } else {
                skip v
            }
        }
        return error("this key doesn't exist")
    }
    

    为要修改的key创建一个新的版本,并将其事务id设为当前事务的id。在标记点的实现中,这个函数不会被并发调用,因为begin_transaction的阻塞。

    func new_version(key, this_transaction_id) {
        if transaction_map[this_transaction_id].mark_state == marked {
            return error("this transaction has already been marked")
        }
        if this_transaction_id == latest_version(key).transaction_id {
            return error("do not call this repeatedly")
        }
        append new version v to value list
        v.value = nil
        v.transaction_id = this_transaction_id
    }
    

    写入操作,将new_value写入到当前事务版本的value中。

    func write(key, new_value, this_transaction_id) {
        starting at end of value list until begining {
            v = previous version of key
            if v.transaction_id == this_transaaction_id {
                v.value = new_value
                return
            }
        }
        return error("this version doesn't exist")
    }
    

    使用上述的方法,解决银行转账问题:

    func transfer(account_a, account_b, amount) {
        id = begin_transaction()
        new_version(account_a, id)
        new_version(account_b, id)
        mark_transaction(id)
    
        a_money = read(account_a, id)
        a_money -= amount
        write(account_a, a_money, id)
    
        b_money = read(account_b, id)
        b_money += amount
        write(account_b, b_money, id)
    
        if a_money < 0 {
            abort(id)
        }else{
            commit(id)
        }
    }
    

    标记点是一种悲观的并发控制方法,使用标记点,所有事务执行前,必须要知道所有要修改的key,并为每个key创建一个新的版本。当后续冲突的事务读取到这个key的版本,发现该版本对应的事务状态为pending,就知悉冲突的发生,需要等待前面事务commited/aborted后,才能继续执行。标记点方法,除了会让所有冲突的事务串行执行之外,事务为要修改keys创建新版本的过程也是完全串行的。在冲突较多时,这么做当然没问题,但是当冲突的概率比较低时,这样做无疑会大幅降低性能。

    Read-Capture

    悲观并发控制,在事务执行前会提前检测冲突,当检测到冲突时,阻塞参与冲突的部分事务,仅允许一个事务执行,等待冲突消失后,其他事务再继续执行。而乐观并发控制,假定冲突发生概率很低,不会在执行前提前检测冲突,多个并发的事务直接向下执行,直到事务提交或者执行过程中,如果发现冲突,则回滚部分参与冲突的事务,后续重新调度其执行。相比于悲观并发控制,在冲突发生概率不大的场景,其性能会有所提升。

    读捕获是一种乐观并发控制的方法。它不会像Mark-point方法那样提前进行标记,而是放任事务直接执行,在事务调用read方法时,会在被读取的key上做一个标记。这个标记告诉后续可能对这个key做写入操作的,但是事务id比当前事务要小的事务,它不再被允许对这个key的版本做修改,它需要中止事务的执行,进入aborted状态,并重新申请一个新的事务id,重新进行调度执行。

    下面同样先介绍读捕获方法所需的各种操作函数,然后使用标记点解决一个经典的问题,银行转账问题,即要从A的银行账户,转账1000元到B的银行账户。

    相比于标记点,begin_transaction函数不再阻塞,仅负责创建新事务。

    func begin_transaction() {
        id = new_transaction()
        return id
    }
    
    func new_transaction() {
        aquire(transaction_map_lock) // make this before-or-after
        defer release(transaction_map_lock)
    
        id = next_transaction_id() // get a new id
        transaction_map[id] = new(transaction)
        transaction_map[id].state = pending
        transaction_map[id].mark_state = nil
        return id
    }
    

    读取操作,读取数据版本的事务id小于当前事务id的、最新的数据。如果要读取的key正在被之前的某事务处理,则等待其完成再读取。读取结束前,更新该key的latest_reader。

    func read(key, this_transaction_id) {
        starting at end of value list until begining {
            v = previous version of key
            if v.transaction_id >= this_transaction_id {
                skip v
            }
            wait until (transaction_map[v.transaction_id].state != pending)
            if transaction_map[v.transaction_id].state == commited {
                key.latest_reader = max(key.latest_reader, this_transaction_id)
                return v.value
            } else {
                skip v
            }
        }
        return error("this key doesn't exist")
    }
    

    为key创建一个新的版本,此时,因为begin_transaction不再阻塞,所以可能有多个事务并发调用该函数,所以需要加锁来保证before-or-after。

    func new_version(key, this_transaction_id) {
        aquire(key_lock)
        defer release(key_lock)
         
        if this_transaction_id < key.latest_reader
        or this_transaction_id < latest_version(key).transaction_id {
            abort(this_transaction_id)
        } else if this_transaction_id == latest_version(key).transaction_id {
            return error("do not call this repeatedly")
        }
        append new version v to value list
        v.value = nil
        v.transaction_id = this_transaction_id
    }
    

    写入到指定版本,没什么好说的。

    func write(key, new_value, this_transaction_id) {
        starting at end of value list until begining {
            v = previous version of key
            if v.transaction_id == this_transaaction_id {
                v.value = new_value
                return
            }
        }
        return error("this version doesn't exist")
    }
    

    使用上述的方法,解决银行转账问题:

    func transfer(account_a, account_b, amount) {
        id = begin_transaction()
        a_money = read(account_a, id)
        a_money -= amount
        new_version(account_a, id)
        write(account_a, a_money, id)
    
        b_money = read(account_b, id)
        b_money += amount
        new_version(account_b, id)
        write(account_b, b_money, id)
    
        if a_money < 0 {
            abort(id)
        }else{
            commit(id)
        }
    }
    

    Compare-and-Set

    compare_and_set(CAS)也是一种配合多版本乐观并发控制的方法。其思路是,前期执行时,不做任何标记,但在读取数据时,需要记录当前读取的数据的版本。在事务的提交过程,需要依赖一个原子的CAS操作,CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,它会重新读取一次之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。

    获取事务id并创建新事务,不会阻塞。

    func begin_transaction() {
        id = new_transaction()
        return id
    }
    
    func new_transaction() {
        aquire(transaction_map_lock) // make this before-or-after
        defer release(transaction_map_lock)
    
        id = next_transaction_id() // get a new id
        transaction_map[id] = new(transaction)
        transaction_map[id].state = pending
        transaction_map[id].mark_state = nil
        return id
    }
    

    在CAS模式实现下的MVCC中,不存在分离的new_version和write操作,两者都被封装到compare_and_set函数中原子的执行,所以没有提交的事务,其不会在数据层留下任何标记。所以不需要像前面几种方法一样等待pending状态的事务结束。

    func read(key, this_transaction_id) {
        starting at end of value list until begining {
            v = previous version of key
            if v.transaction_id >= this_transaction_id {
                skip v
            }
            if transaction_map[v.transaction_id].state == commited {
                return v.value and v.version_id
            } else {
                skip v
            }
        }
        return error("this key doesn't exist")
    }
    

    基于CAS实现MVCC,写操作不能单独调用,必须封装在CAS中。注意,CAS操作必须是原子的,这一版需要依赖底层接口的支持。CAS操作传入当前事务之前读取数据时记录的每个数据的版本,还有要设置的值,首先重新读取一个之前读取过的key的版本,比较传入的版本和重新读取的版本是否相同,如果不相同,则中断事务。如果相同,则可以调用write操作,完成写入操作,并提交事务。

    再重申一遍,整个compare_and_set函数必须是原子的。

    func write(key, new_value, this_transaction_id) {
        aquire(key_lock)
        defer release(key_lock)     
        if this_transaction_id < latest_version(key).transaction_id {
            abort(this_transaction_id)
        } else if this_transaction_id == latest_version(key).transaction_id {
            return error("do not call this repeatedly")
        }
       append new version v to value list
        v.version_id = next_version_id()
        v.value = new_value
        v.transaction_id = this_transaction_id
    }
    
    // CAS must be atomicity, this need to be ensured by underlying system 
    func compare_and_set(key_list, version_list, value_map, this_transaction_id) {
        for i:=0; i<len(key_list); i++ {
            _, cur_version = read(key_list[i], this_transaction_id)
            if key_list[i] != version_list[i] {
                abort(this_transaction_id)
            }
        }
        for k, v := range value_map {
            write(k, v, this_transaction_id)
        }
        commit(this_transaction_id)
    }
    

    使用上述的方法,解决银行转账问题:

    func transfer(account_a, account_b, amount) {
        id = begin_transaction()
        a_money, a_version = read(account_a, id)
        a_money -= amount
    
        b_money, b_version = read(account_b, id)
        b_money += amount
    
        if a_money < 0 {
            abort(id)
        }else{
            compare_and_set([]{account_a,account_b}, []{a_version,b_version}, map[key]value{account_a: a_money, account_b: b_money})
        }
    }
    

    Lock

    加锁,是实现Before-or-After最常用的手段。

    Simple Locking

    最简单的方式,搞一把全局大锁,每个事务执行时先要获取这把锁,提交后释放。这种方式,将事务的实行完全串行化,性能较低。

    Two-Phase Locking

    降低锁的粒度,每个数据项一把锁,当事务执行时,分为两个阶段,第一个阶段,为所有事务执行锁需要读取和写入的数据项进行加锁;第二个阶段,对数据进行读取和写入,每处理完一个数据,即可释放其对应的锁。

    另外,加锁时最好按照一定的顺序进行,以避免死锁的发生。

    相关文章

      网友评论

        本文标题:存储系统中事务的实现

        本文链接:https://www.haomeiwen.com/subject/nmjlsqtx.html