我用代码模拟了mysql的mvcc版本控制

作者: pq217 | 来源:发表于2022-03-30 18:29 被阅读0次

    事务隔离级别

    innodb的事务隔离级别有四种

    • 读未提交
      可以读到其它事务尚未提交的数据
    • 读已提交
      只能读到其它事务尚已提交的数据
    • 可重复读
      只能读到其它事务尚已提交的数据,并且保证一个事务下多次读取的结果一致
    • 可串行化
      即事务一个一个进行,避免了事务间数据的干扰,但是效率极慢

    其中读未提交可串行化带来的问题都很大,一般很少使用,可重复读一般是首选且是默认,它们的实现主要使用了MVCC(Multi-Version Concurrency Control)机制,今天就以代码形式模拟mvcc,相信自己能写出来,就可以说完全理解了。

    数据库

    为了方便模拟,简化一下数据库的结构,假如我们的数据库只有一行数据,只有两个字段id和name,我们用一个对象current来存储这一行数据,就完成了一个极简数据库

    private Data current = new Data (1l, "小明1"); // 给一个初始值,就不写insert了
    class Data {
        private Long id; // id
        private String name; // 姓名
    }
    
    简化的数据库结构

    事务

    我们再根据常使用的innodb事务命令做一个抽象,此时的我们DB应该有的方法

    • select 查询name
    • beginTransactional 开启事务
    • update 更新name
    • commit 提交事务
    • rollback 回滚事务

    接口

    public interface TransactionalDB {
        String select();
        void update(String name);
        void beginTransactional();
        void commit();
        void rollback();
    }
    

    undo日志版本链

    接下来实现这个DB,上面用一个对象就模拟了一条数据的存储,但实际上远没有这么简单,innodb会保留数据的变化历史版本(可以以此实现回滚),每个版本存储当时的值,记录版本的事务ID,并且指向上一版本,形成一个单向的版本链,当前数据也只是版本链的一个节点,这个版本链叫做undo日志版本链

    undo日志版本链
    也就是说当我们用Navicat等工具看到的只是绿色部分,实际内部存储的是整个黄色部分
    那我们用代码来实现这个版本链
    public class MyDB implements TransactionalDB {
        private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了
        @Data
        @AllArgsConstructor
        class Undo {
            private Long id; // id
            private String name; // 姓名
            private Integer trxId; // 事务ID
            private Undo rollPointer; // 指向上一个版本
        }
    }
    

    这段代码就实现了整个黄色部分的存储结构

    beginTransactional

    通过beginTransactional开启事务,简单做一个自增的事务ID,我们用ThreadLocal模拟存储(一个线程模拟一个事务)

    private volatile Integer maxTid = 0; // 当前最大事务ID
    private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组
    private ThreadLocal<Integer> trxIds = new ThreadLocal<>(); // 存储每个事务的ID
    @Override
    public synchronized void beginTransactional() {
        trxIds.set(++maxTid); // 生成事务ID
        unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
    }
    

    其中unTrxIds存储了未提交的事务ID,这个一定要记录,否则读的时候无法判断版本是否已提交,会造成脏读,当后期commit或rollback时删除,这样就可以实时记录当前未提交事务有哪些

    update

    接下来实现update方法,update就是往undo日志的头部新增一个版本,并且用一个ReentrantLock模拟行锁

    private Lock lock = new ReentrantLock(); // 模拟行锁
    @Override
    public void update(String value) {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        lock.lock(); // 锁住行
        Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
        current = undo; // 从链表头部插入
    }
    

    commit

    commit要做的事刚才已经铺垫了,首先从未提交事务数组里移除当前事务,同时释放行锁

    @Override
    public void commit() {
        unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
        try {
            lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
        } catch (Exception e) {}
        trxIds.remove(); // 无用删掉
    }
    

    rollback

    略,有了undo日志,回滚还不简单吗

    读已提交

    这里比较重点,如果是读已提交,只要遍历undo版本链,并获取第一个不在unTrxIds(未提交事务ID数组)中的事务版本读出值即可实现读已提交

    @Override
    public String select() {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        Undo e= current;
        do {
            if (unTrxIds.contains(e.getTrxId())) { // 在未提交数组中,不可见
                e = e.rollPointer; // 下一个
                continue;
            }
            return e.getName(); // 可见
        } while (e!=null);
        return null;
    }
    

    完整代码如下:

    public class ReadCommit implements TransactionalDB {
    
        private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了
    
        private Lock lock = new ReentrantLock(); // 模拟行锁
    
        // 当前最大事务ID
        private volatile Integer maxTid = 0;
    
        private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组
    
        // 存储每个事务的ID
        private ThreadLocal<Integer> trxIds = new ThreadLocal<>();
    
        @Data
        @AllArgsConstructor
        class Undo {
            private Long id; // id
            private String name; // 姓名
            private Integer trxId; // 事务ID
            private Undo rollPointer; // 指向上一个版本
        }
    
        @Override
        public String select() {
            if (trxIds.get()==null) {
                throw new IllegalArgumentException();
            }
            Undo e= current;
            do {
                if (unTrxIds.contains(e.getTrxId())) { // 在未提交数组中,不可见
                    e = e.rollPointer; // 下一个
                    continue;
                }
                return e.getName(); // 可见
            } while (e!=null);
            return null;
        }
    
        @Override
        public void update(String value) {
            if (trxIds.get()==null) {
                throw new IllegalArgumentException();
            }
            lock.lock(); // 锁住行
            Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
            current = undo; // 从链表头部插入
        }
    
        @Override
        public synchronized void beginTransactional() {
            trxIds.set(++maxTid); // 生成事务ID
            unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
        }
    
        @Override
        public void commit() {
            unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
            try {
                lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
            } catch (Exception e) {}
            trxIds.remove(); // 无用删掉
        }
    
        @Override
        public void rollback() {
            // todo
        }
    }
    

    测试一下

    public static void main(String[] args) throws Exception {
        ReadCommit db = new ReadCommit();
        new Thread(() -> { // 事务1
            try {
                db.beginTransactional();
                db.update("小明2");
                Thread.sleep(400);
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> { // 事务2
            try {
                db.beginTransactional();
                Thread.sleep(200);
                System.out.println(db.select()); // 小明1
                Thread.sleep(400);
                System.out.println(db.select()); // 小明2
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    

    执行时间轴如下


    测试时间轴

    最终输出:

    小明1
    小明2
    
    • 第一次事务2 select时,事务1已经update(小明2),但还未提交,所以读到的是初始值:小明1
    • 第二次事务2 select时,事务1已经commit,所以读到了:小明2

    实现了读已提交

    readview和可重复读

    上面实现了读已提交,但是有一个问题,上例事务2读了两次,得到的结果却不一样,这就是典型的不可重复读
    如何在读已提交的基础上实现可重复读,mvcc的解法是这样的:
    上面的select方法我们遍历undo版本链时判断了当前版本是否已提交,判断方法是判断版本事务ID是否在未提交事务数组里if (unTrxIds.contains(e.getTrxId()))之所以读到的结果不一样,是因为unTrxIds数组是实时变化的,那么为了让读到的结果一样,可以在事务开始时对unTrxIds做一份快照(copy),select改为判断快照中是否包含版本事务ID
    通过快照,每次同一个事务每次select判断条件都一样,得到的结果也一定一样
    但是有个问题,快照保存的unTrxIds是事务开始未提交的事务,那不包含在其中的一定已提交了吗?不一定,因为后续可能会出现新事务并且还未提交,因此快照除了保存unTrxIds的备份,还要存一份事务开始时最大事务ID,所有大于该事务ID的事务都是未来事务,所存储的版本都不可见
    总结一下:可重复读模式下,所有事务开始时未提交的事务事务开始时未生成的事务都是不可见的
    这个快照就是readview,包含了

    • unTrxIds(事务开始时未提交数组)
    • upLimitId(事务开始时最大事务ID)

    代码模拟下结构

    @Data
    @AllArgsConstructor
    class ReadView {
        List<Integer> unTrxIds; // 未提交的事务Id数组
        private Integer upLimitId; // 最大事务ID
    }
    

    因为每个事务都有一份自己的readview,所以还是用ThreadLocal模拟存储

    private ThreadLocal<ReadView> readViews = new ThreadLocal<>();
    

    刚才一直说事务开始时生成readview,实际上通过测试发现生成快照应该是第一次select时生成的,所以代码就是select时判断有没有,没有就生成,有就不生成。
    同时为了兼容不可重复读也就是读已提交模式,加一个参数来设置是否可重复读(只要每次select都重新生成快照就是不可重复读)

    private void createReadView() {
        if (readRepeat && readViews.get()!=null) { // 如果可重复读,且已创建,不需要创建
            return;
        }
        List<Integer> currentUnTrxIds = new ArrayList<>();
        currentUnTrxIds.addAll(unTrxIds); // 生成unTrxIds快照
        readViews.set(new ReadView(currentUnTrxIds, maxTid)); // unTrxIds和maxTid快照
    }
    

    select代码

    @Override
    public String select() {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        // 创建readview
        createReadView();
        Undo e= current;
        ReadView readView = readViews.get(); // 获取事务快照
        do {
            if (e.getTrxId() > readView.getUpLimitId()) { // 大于readview最大事务,不可见
                e = e.rollPointer;
                continue;
            }
            if (readView.getUnTrxIds().contains(e.getTrxId())) { // 在readview未提交数组中,不可见
                e = e.rollPointer;
                continue;
            }
            return e.getName(); // 可见
        } while (e!=null);
        return null;
    }
    

    完整代码如下

    public class MyDB implements TransactionalDB {
    
        private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了
    
        private Lock lock = new ReentrantLock(); // 模拟行锁
    
        // 当前最大事务ID
        private volatile Integer maxTid = 0;
    
        private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组
    
        // 存储每个事务的ID
        private ThreadLocal<Integer> trxIds = new ThreadLocal<>();
    
        // 存储每个事务的readview
        private ThreadLocal<ReadView> readViews = new ThreadLocal<>();
    
        // 是否可重复读
        private Boolean readRepeat;
    
        public MyDB(Boolean readRepeat) {
            this.readRepeat = readRepeat;
        }
    
        @Data
        @AllArgsConstructor
        class Undo {
            private Long id; // id
            private String name; // 姓名
            private Integer trxId; // 事务ID
            private Undo rollPointer; // 指向上一个版本
        }
    
        @Data
        @AllArgsConstructor
        class ReadView {
            List<Integer> unTrxIds; // 未提交的事务Id数组
            private Integer upLimitId; // 最大事务ID
        }
    
        @Override
        public String select() {
            if (trxIds.get()==null) {
                throw new IllegalArgumentException();
            }
            // 创建readview
            createReadView();
            Undo e= current;
            ReadView readView = readViews.get(); // 获取事务快照
            do {
                if (e.getTrxId() > readView.getUpLimitId()) { // 大于readview最大事务,不可见
                    e = e.rollPointer;
                    continue;
                }
                if (readView.getUnTrxIds().contains(e.getTrxId())) { // 在readview未提交数组中,不可见
                    e = e.rollPointer;
                    continue;
                }
                return e.getName(); // 可见
            } while (e!=null);
            return null;
        }
    
        private void createReadView() {
            if (readRepeat && readViews.get()!=null) { // 如果可重复读,且已创建,不需要创建
                return;
            }
            List<Integer> currentUnTrxIds = new ArrayList<>();
            currentUnTrxIds.addAll(unTrxIds); // 生成unTrxIds快照
            readViews.set(new ReadView(currentUnTrxIds, maxTid)); // unTrxIds和maxTid快照
        }
    
        @Override
        public void update(String value) {
            if (trxIds.get()==null) {
                throw new IllegalArgumentException();
            }
            lock.lock(); // 锁住行
            Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
            current = undo; // 从链表头部插入
        }
    
        @Override
        public synchronized void beginTransactional() {
            trxIds.set(++maxTid); // 生成事务ID
            unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
        }
    
        @Override
        public void commit() {
            unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
            try {
                lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
            } catch (Exception e) {}
            trxIds.remove(); // 无用删掉
            readViews.remove(); // 无用删掉
        }
    
        @Override
        public void rollback() {
        }
    }
    

    还是测试一下

    public static void main(String[] args) throws Exception {
        MyDB db = new MyDB(true);
        new Thread(() -> { // 事务1
            try {
                Thread.sleep(1);
                db.beginTransactional();
                db.update("小明2");
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> { // 事务2
            try {
                db.beginTransactional();
                Thread.sleep(100);
                db.update("小明3");
                Thread.sleep(200);
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> { // 事务4
            try {
                Thread.sleep(400);
                db.beginTransactional();
                db.update("小明4");
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> { // 事务3
            try {
                db.beginTransactional();
                Thread.sleep(200);
                System.out.println(db.select()); // 小明2
                Thread.sleep(500);
                System.out.println(db.select()); // 小明2
                db.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    

    时序图如下


    时序图
    • 事务3第一次select,此时生成readview unTrxIds:[2] upLimitId:2,其中事务2在unTrxIds中,不可见,事务1不在unTrxIds中,可见,读出小明2
    • 事务3第二次select,readview不变 unTrxIds:[2] upLimitId:2,其中事务4>upLimitId不可见,事务2在unTrxIds中,不可见,事务1不在unTrxIds中,可见,读出小明2

    到此就模拟出了MVCC的事务版本控制,over~

    总结

    这个真是很复杂,很难一句话做总结,大概捋一下核心思路

    • 如何实现回滚:要有历史版本记录
    • 如何实现读已提交:记录下未提交的事务,读时绕过
    • 如何实现可重复度:第一次读做个快照,以后每次读就都一样了,就像照片每次看都一样,但快照的不是数据,因为数据太多了,快照的是那些当前所有事务的提交状态

    相关文章

      网友评论

        本文标题:我用代码模拟了mysql的mvcc版本控制

        本文链接:https://www.haomeiwen.com/subject/tctbjrtx.html