美文网首页
Zookeeper(三)-持久化

Zookeeper(三)-持久化

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-12 22:09 被阅读0次

    概述

    同mysql/redis类似zk持久化也分为快照(snapshot)和增量事务日志(txnlog)两种形式,两者结合使用来恢复数据;但是三者底层存储引擎数据结构不同,mysql使用B+树,redis使用全局哈希表,而zk使用LSM数据结构;
    本节先来分析zk的snapshot和txnlog,后续再分析LSM数据结构跟B+树区别;


    Log类图.png

    一、snapshot

    snapshot是内存快照,把当前时刻全量内存写入数据文件中;

    1. 文件内容

    1.1 内容结构
    snapshot.png
    1.2 序列化内容snapshot.xx
    image.png
    1.3 内容解析

    也可以通过SnapshotFormatter进行解析

    ZNode Details (count=22):
    ----
    /
      cZxid = 0x00000000000000
      ctime = Thu Jan 01 08:00:00 CST 1970
      mZxid = 0x00000000000000
      mtime = Thu Jan 01 08:00:00 CST 1970
      pZxid = 0x0000000000007c
      cversion = 10
      dataVersion = 0
      aclVersion = 0
      ephemeralOwner = 0x00000000000000
      dataLength = 0
    ---- 
    。。。
    Session Details (sid, timeout, ephemeralCount):
    0x1001fe6ce840022, 30000, 0
    0x1001fe6ce840023, 30000, 0
    0x1001fe6ce840020, 30000, 0
    0x1001fe6ce840021, 30000, 0
    0x1001fe6ce840026, 30000, 0
    0x1001fe6ce840027, 30000, 0
    0x1001fe6ce840024, 30000, 0
    0x1001fe6ce840025, 30000, 0
    0x1001fe6ce840028, 30000, 0
    0x1001fe6ce840029, 30000, 0
    0x1001fe6ce84001f, 30000, 0
    

    2. 源码分析

    SnapShot接口只定义了四个方法,反序列化、序列化、查找最新的snapshot文件、关闭资源

    public interface SnapShot {   
        // 反序列化
        long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
        // 序列化
        void serialize(DataTree dt, Map<Long, Integer> sessions, File name)  throws IOException;
        // 查询最近的快照文件
        File findMostRecentSnapshot() throws IOException;
        // 关闭释放资源
        void close() throws IOException;
    } 
    

    序列化过程比较简单,主要看下反序列化过程

    public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
        // 查找100个合法的snapshot文件
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        // 默认为不合法
        boolean foundValid = false;
        // 遍历snapshot文件列表
        for (int i = 0; i < snapList.size(); i++) {
            snap = snapList.get(i);
            InputStream snapIS = null;
            CheckedInputStream crcIn = null;
            try {
                LOG.info("Reading snapshot " + snap);
                snapIS = new BufferedInputStream(new FileInputStream(snap));
                crcIn = new CheckedInputStream(snapIS, new Adler32());
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                // 反序列化出 DataTree,反序列化过程就是根据文件结构进行解析
                deserialize(dt,sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                // 比较snapshot中校验值和读取计算出的值是否相等
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch(IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            } finally {
                if (snapIS != null) 
                    snapIS.close();
                if (crcIn != null) 
                    crcIn.close();
            } 
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        // 从文件名中解析出zxid,即snapshot文件后缀
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }
    
    • 1. 在配置的日志文件目录中,按照zxid对snapshot文件进行降序排序,并对文件合法性进行初步校验,然后取出前100个文件
    • 2. 遍历snapshot文件,反序列化出 DataTree(反序列化过程就是根据文件结构进行解析),一旦CRC校验成功就结束遍历,即最终用于反序列化的是最近且合法的snapshot文件;
    • 3.返回最近且合法的zxid,即snapshot文件后缀

    二、txnlog

    txnlog是增量事务日志,实时记录每一条事务命令;

    1. 文件内容

    1.1 内容结构
    txnlog.png
    • 每一次增量日志包含4部分:CRC验证码+消息头+消息体+结尾标志
    • Record:不同请求命令对应不同的Record;
    1.2 序列化内容log.cf
    log.cf
    • 后面全是0,不足64M时用0填充;
    1.3 内容解析

    也可以通过LogFormatter进行解析查看

    ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
    20-12-6 下午12时39分36秒 session 0x100142dc1660000 cxid 0x0 zxid 0x1e createSession 30000
    20-12-6 下午12时39分52秒 session 0x100141757900001 cxid 0x0 zxid 0x1f closeSession null
    20-12-6 下午12时39分59秒 session 0x100142dc1660000 cxid 0x2 zxid 0x20 create '/zk/test5,#353535,v{s{31,s{'world,'anyone}}},F,6
    20-12-6 下午12时53分04秒 session 0x100142dc1660000 cxid 0x0 zxid 0x21 closeSession null
    20-12-6 下午12时53分04秒 session 0x100142dc1660001 cxid 0x0 zxid 0x22 createSession 30000
    20-12-6 下午12时53分36秒 session 0x100142dc166001d cxid 0x0 zxid 0x5f closeSession null
    20-12-6 下午12时53分36秒 session 0x100142dc1660005 cxid 0x0 zxid 0x60 closeSession null
    20-12-6 下午12时54分56秒 session 0x100142dc1660020 cxid 0x4 zxid 0x61 setACL '/zk/test1,v{s{4,s{'world,'anyone}}},1
    20-12-6 下午12时55分39秒 session 0x100142dc1660020 cxid 0x6 zxid 0x62 create '/zk/test7,#373737,v{s{4,s{'world,'anyone}}},F,7
    20-12-6 下午02时33分27秒 session 0x100142dc1660020 cxid 0x0 zxid 0x63 closeSession null
    EOF reached after 70 txns.
    

    2. 源码分析

    TxnLog接口:

    public interface TxnLog {
        // 切换日志,一个日志文件达到一定大小就会生成一个新的
        void rollLog() throws IOException;
        // 添加一个请求到事务日志
        boolean append(TxnHeader hdr, Record r) throws IOException;
        // 通过事务id读取日志
        TxnIterator read(long zxid) throws IOException;
        // 获取事务性操作的最新zxid
        long getLastLoggedZxid() throws IOException;
        // 清空日志,与Leader保持同步
        boolean truncate(long zxid) throws IOException;
        // 获取数据库的id
        long getDbId() throws IOException;
        // 提交事务并进行确认
        void commit() throws IOException;
        // 关闭事务性日志
        void close() throws IOException;
        // 读取事务日志的迭代器接口
        public interface TxnIterator {
            // 获取事务头部
            TxnHeader getHeader();
            // 获取事务
            Record getTxn();
            boolean next() throws IOException;
            // 关闭文件释放资源
            void close() throws IOException;
        }
    }
    

    重点分析下append方法:

    public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
        if (hdr == null) {
            return false;
        }
        // 当前事务的zxid小于等于最后的zxid,打印告警日志,否则设置最后的zxid为当前事务zxid
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        // 判断当前输入流logStream是否已清空(在同步处理器SyncRequestProcessor中据一定算法得出一个count,记录大于count就要rollLog,开启一个新的文件,
        // 算法是: 100000/2 + random.nextInt(100000/2), 这个十万是一个默认值可配置),清空开启一个新的文件写入
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }
            // 创建增量日志文件,文件后缀为当前事务zxid
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream=new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
            // 序列化增量日志文件头
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            // 确保在填充前已经写完魔数
            logStream.flush();
            currentSize = fos.getChannel().position();
            streamsToFlush.add(fos);
        }
        // 当文件大小不满64MB时,向文件填充0以达到64MB大小
        currentSize = padFile(fos.getChannel());
        // 将事务头和事务序列化成byte[]
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header and txn");
        }
        // 生成一个验证算法
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        // 写CRC验证码
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        // 写buf 和 结尾标志  (byte) 0x42
        Util.writeTxnBytes(oa, buf);
        return true;
    }
    
    • 1.判断当前输入流logStream是否已清空(在同步处理器SyncRequestProcessor中据一定算法得出一个count,记录大于count就要rollLog,开启一个新的文件);
    • 2.新常见日志文件时序列化日志文件头部;
    • 3.当文件大小不满64MB时,向文件填充0以达到64MB大小;
    • 4.序列化CRC校验码、消息头、消息体、结束标志

    private long padFile(FileChannel fileChannel) throws IOException {
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }
    

    File Padding是对WAL的优化,在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先找到inode 所在的位置,然后回到新添加 block 的位置进行日志追加。为了减少这些查找(这些查找是磁盘随机IO,效率跟顺序IO相比不在同一个数量级),我们可以预先为WAL 分配 block。
    zk就用到了这种优化,每次为 WAL 分配 64MB 的 block,即不足64M时用0填充,以减少随机磁盘IO;


    再来分析下getLastLoggedZxid方法

    public long getLastLoggedZxid() {
        // 查找开始于快照或快照之前的日志文件。返回此日志和所有后续日志。结果按文件的zxid升序排列
        File[] files = getLogFiles(logDir.listFiles(), 0);
        // 获取最大的zxid(最后一个log文件对应的zxid)
        long maxLog=files.length>0 ? Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
        // 扫描最新的日志文件以找到最高的zxid (从文件头开始遍历,遍历到最后一个就是最大的)
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }
    

    该方法是找到增量事务日志中最大的zxid,即最后一个txnlog文件最后一个写入的事务日志条目里的zxid;

    • 1.从日志路径中查找txnlog,即log开头的日志文件
    • 2.获取最后一个日志文件的zxid
    • 3.读取最后一个日志文件,从文件头开始遍历,遍历到最后一个就是最大的zxid

    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        // 升序排列所有增量日志文件
        List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
        long logZxid = 0;
        // 查找在快照的zxid之前或同时开始的日志文件
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            if (fzxid > snapshotZxid) {
                continue;
            }
            // 找到快照的zxid之前并且最接近的日志文件
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        // 找到快照的zxid之前并且最接近的日志文件,以及其之后的所有日志文件
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            if (fzxid < logZxid) {
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);
    }
    

    该方法返回升序排列后,最接近请求参数snapshot文件的txnlog文件开始及以后的所有txnlog文件;

    • 1.升序排列所有txnlog文件列表;
    • 2.遍历文件列表,找到快照的zxid之前并且最接近的日志文件;
    • 3.返回该日志文件以及其之后的所有日志文件列表;

    小结

    本节主要分析了快照日志FileSnap和增量事务日志FileTxnLog,而FileTxnSnapLog中操作主要都是委托给这两者进行处理;
    本节一个重点是日志文件的加载流程:
    1.首先获取100个snapshot文件,并将其按照文件名降序排列;
    2.循环读取这些文件,并反序列化DataTree,根据文件中的校验值进行合法性校验;
    3.如果文件合法,将最大的zxid赋值给lastProcessZxid,跳出循环;
    4.读取txnlog,按照lastProcessZxid+1,获取该条事务记录,作为新的事务日志记录;
    5.读取该记录直至文件结束,判断其合法性,调用processTransaction,提交到DataTree中;
    ------over------

    相关文章

      网友评论

          本文标题:Zookeeper(三)-持久化

          本文链接:https://www.haomeiwen.com/subject/smeegktx.html