美文网首页
[ZooKeeper] zookeeper的持久化之TxnLog

[ZooKeeper] zookeeper的持久化之TxnLog

作者: LZhan | 来源:发表于2020-01-14 09:51 被阅读0次

1 前言

持久化,将数据存储到磁盘或者数据库中 。在系统重启或者出现故障后,能够恢复或者获取到宕机前的信息。

通俗地将,就是将瞬时数据(比如内存中的数据,是不能永久保存的)转换为持久数据(比如持久化至数据库中,能够长久保存)。

2 Zookeeper持久化方式

  • TxnLog —— 增量事务日志,记录系统中所有的增删改记录
  • SnapShot —— 快照,记录内存中的全量数据,但是数据不一定是最新的

持久化的源代码主要在包org.apache.zookeeper.server.persistence

类关系图
  • TxnLog:是一个读取日志的接口,提供了读取事务log的接口方法
  • SnapShot:是一个操作日志快照的接口,提供了对快照文件操作的方法
  • FileTxnLog:实现TxnLog接口,提供了读取事务日志的方法实现
  • FileSnap:实现SnapShot接口,负责存储、序列化、反序列化、访问快照
  • FileTxnSnapLog:封装了TxnLog和SnapShot
  • Util:工具类,提供持久化所需的API

3 源码分析

3.1 SnapShot和TxnLog

SnapShot:

public interface SnapShot {
    
    /**
     * deserialize a data tree from the last valid snapshot and 
     * return the last zxid that was deserialized
     * @param dt the datatree to be deserialized into
     * @param sessions the sessions to be deserialized into
     * @return the last zxid that was deserialized from the snapshot
     * @throws IOException
     */
    long deserialize(DataTree dt, Map<Long, Integer> sessions) 
        throws IOException;
    
    /**
     * persist the datatree and the sessions into a persistence storage
     * @param dt the datatree to be serialized
     * @param sessions 
     * @throws IOException
     */
    void serialize(DataTree dt, Map<Long, Integer> sessions, 
            File name) 
        throws IOException;
    
    /**
     * 找到最新的snapshot文件
     * find the most recent snapshot file
     * @return the most recent snapshot file
     * @throws IOException
     */
    File findMostRecentSnapshot() throws IOException;
    
    /**
     * free resources from this snapshot immediately
     * @throws IOException
     */
    void close() throws IOException;
} 

TxnLog:

public interface TxnLog {
    
    /**
     * 滚动日志,从当前日志滚动到下一个日志,不是回滚
     */
    void rollLog() throws IOException;
    /**
     * 追加事务记录到事务日志中去
     */
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * 可迭代读取事务性日志
     */
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * 事务性操作的最新zxid
     */
    long getLastLoggedZxid() throws IOException;
    
    /**
     * 清空zxid以后的日志
     */
    boolean truncate(long zxid) throws IOException;
    
    /**
     * 获取数据库的id
     */
    long getDbId() throws IOException;
    
    /**
     * 提交事务并进行确认
     */
    void commit() throws IOException;
   
    /** 
     * 关闭事务性日志
     */
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        void close() throws IOException;
    }
}

3.2 FileTxnLog的具体属性

    private static final Logger LOG;
    // 预分配64m大小
    static long preAllocSize =  65536 * 1024;
    // 直接内存
    private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
    // 魔数,用于校验日志文件的正确性,默认为1514884167
    public final static int TXNLOG_MAGIC =
        ByteBuffer.wrap("ZKLG".getBytes()).getInt();

    public final static int VERSION = 2;
    // 日志文件名前缀
    public static final String LOG_FILE_PREFIX = "log";

    /** Maximum time we allow for elapsed fsync before WARNing */
    private final static long fsyncWarningThresholdMS;

    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);
        // 获得系统参数,判断系统参数配置了预分配内存大小
        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        /** Local variable to read fsync.warningthresholdms into */
        Long fsyncWarningThreshold;
        if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
            fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
        fsyncWarningThresholdMS = fsyncWarningThreshold;
    }
    // 最大(也就是最新的)zxid
    long lastZxidSeen;
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;
    // log目录文件
    File logDir;
    // 是否强制同步,默认是yes
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
    long dbId;
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    // 当前配置的大小
    long currentSize;
    // 写日志文件
    File logFileWrite = null;

3.3 FileTxnLog的方法

<1> append方法

append:主要是负责日志追加,在对日志文件的写操作时,zookeeper主要是通过日志追加的方法

    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        // 校验头部不能为空,hdr主要包含了cxid,zxid,clientId等相关信息
        if (hdr == null) {
            return false;
        }
        // 如果待写入的事务的事务id小于本地保存的最新的事务id,提醒
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        // 在第一次新建一个FileTxnLog时候,logStream是空的,这个时候需要为它创建一个新的日志文件
        // 并把logStream指向这个日志文件
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }
            // 根据待写入的事务id创建一个新的日志文件,可以看到文件名包含这个文件存放的事务的最小事务id
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream=new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            // 根据魔数,版本号和数据库id生成日志文件头,dbId默认是0
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            // 确保在用0填充之前,先把魔数信息等写入到文件中,进行依次flush
            logStream.flush();
            currentSize = fos.getChannel().position(); //获取当前文件流的大小
            streamsToFlush.add(fos);
        }
        // 重新计算文件大小,保证文件的大小是预分配大小的整数倍
        // 可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
        currentSize = padFile(fos.getChannel());
        // 序列化TxnHeader Record记录到byte[]
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        // 创建校验和的算法
        Checksum crc = makeChecksumAlgorithm();
        // 使用指定的字节数组更新校验和
        crc.update(buf, 0, buf.length);
        // 将更新的校验和值写入到日志文件中
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        // 将TxnHeader Record数据写入到输出流
        Util.writeTxnBytes(oa, buf);

        return true;
    }

writeTxnBytes方法

    public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
            throws IOException {
        oa.writeBuffer(bytes, "txnEntry");
        oa.writeByte((byte) 0x42, "EOR"); // 'B'
    }
  1. 先计算buf数据长度写入
  2. 写入buf数组数据
  3. 记录尾部以‘B’字符结尾,写入0x42

padFile方法

    private long padFile(FileChannel fileChannel) throws IOException {
        // 计算新的文件大小,并通过填充0先占用未使用的byte空间
        // 这样可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
        // currentSize默认为0
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        // 将整个日志文件中未使用的部分填充为0
        if (currentSize != newFileSize) {
            // ByteBuffer缓冲区,position:
            // 指定了下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新
            // remaining,返回剩余的可用长度
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }

calculateFileSizeWithPadding方法

 /**
     * @param position  通过管道写入的字节长度
     * @param fileSize  当前设置的文件大小
     * @param preAllocSize  预分配大小
     * @return
     */
    public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
        // 如果剩余空间不足4k,且预分配空间大于0
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            // If we have written more than we have previously preallocated we need to make sure the new
            // file size is larger than what we already have
            // 如果已写入的长度超过了文件大小,文件大小扩大为[写入的字节长度+预分配]
            if (position > fileSize){
                fileSize = position + preAllocSize;
                // 这边会重新调整文件大小到预分配长度的整数倍
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }
        return fileSize;
    }

总结:
<1> 在新建FileTxnLog文件时会预分配文件内存大小,并用0来填充,从而保证文件的磁盘占用是连续的,同时通过日志追加的方式,我们可以保证对日志文件的写的顺序性,从而保证写性能
<2> 每次将事务写入到日志文件时,都会先根据写入的事务计算并写入一个检验和,然后再把事务流写入到日志文件中,这样可以充分保证事务日志的安全性和完整型。

<2> read方法
读取文件的方法

 /***
   * zxid:指定迭代读取日志文件中的第一个事务ID
    * 默认fastForward=true
   */
    public TxnIterator read(long zxid) throws IOException {
        return new FileTxnIterator(logDir, zxid);
    }
    public FileTxnIterator(File logDir, long zxid) throws IOException {
         this.logDir = logDir;
         this.zxid = zxid;
         init();
     }

读取事务日志,这个方法在服务宕机恢复的时候,用来遍历事务日志来恢复数据。
根据目标事务zxid,从日志中读取大于该事务id的事务,并返回这些事务构成的迭代器TxnIterator,注意底层在遍历每一个日志文件的时候,会对文件进行魔数检验等,避免文件被损坏。

        void init() throws IOException {
            storedFiles = new ArrayList<File>();
            // 排序目录下的日志文件,文件名称是根据事务id来创建的
            // 文件的排序也等价于事务的排序
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
            for (File f: files) {
                // 找出起始事务id大于等于zxid的日志文件
                if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                    storedFiles.add(f);
                }
                // add the last logfile that is less than the zxid
                // 当第一次遍历到起始id小于zxid的日志文件后,要记得把该文件也作为查找目标文件,因为它里面可能包含大于zxid的事务
                // 同时停止遍历
                else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                    storedFiles.add(f);
                    break;
                }
            }
            // 找出已排好序且可能存在大于zxid的日志文件后,打开第一个日志文件输入流准备读取
            goToNextLog();
            if (!next())
                return;
            while (hdr.getZxid() < zxid) {
                if (!next())
                    return;
            }
        }

goToNextLog()和next()方法

private boolean goToNextLog() throws IOException {
     if (storedFiles.size() > 0) {
          // 可能包含大于zxid的事务
          this.logFile = storedFiles.remove(storedFiles.size()-1);
          ia = createInputArchive(this.logFile);
          return true;
     }
     return false;
}
        public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                // 读取校验和的值
                long crcValue = ia.readLong("crcvalue");
                // 读取事务
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                // 因为我们是采用预分配内存方式,会定义一个EOF作为空的事务。
                // 所以,当我们读取到一个空的,也就表明日志文件已读到末尾
                if (bytes == null || bytes.length==0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                // EOF or corrupted record
                // validate CRC
                // 分析校验和的值是否正确,防止消息被破坏,这就是为什么我们在append的时候加入校验和
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue())
                    throw new IOException(CRC_ERROR);
                // 反序列化事务
                hdr = new TxnHeader();
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) {
                LOG.debug("EOF excepton " + e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                // this means that the file has ended
                // we should go to the next file
                // 日志文件已经读到末尾了,所以跳到下一个文件开始读取
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

getLogFiles:获取可能包含比事务id大的日志文件的数组

   /**
     * @param logDirList 日志文件列表
     * @param snapshotZxid 通过内存快照恢复的最大的事务id,剩余的比snapshotZxid大的就要从日志文件里恢复
     * @return 
     */
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        // 对日志文件进行排序,按照事务id从高到低
        List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
        long logZxid = 0;
        
        // logZxid是小于snapshotZxid的最大事务id
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            // 如果文件名的事务id>快照最新的zxid
            if (fzxid > snapshotZxid) {
                continue;
            }
            // the files
            // are sorted with zxid's
            // 如果fzxid <= snapshotZxid 并且 fzxid > logZxid
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            if (fzxid < logZxid) { // 找出文件id大于logZxid的文件名
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);
    }

<1> 找出比snapshotZxid小的最大事务id(从文件名中获取)
比如说当下snapshotZxid为200,现在日志文件中小于200的最大fzxid为150,即logZxid也为150
<2> 快照能够恢复的最大事务id为200,所以大于200的事务得从TxnLog中恢复,所以读取的TxnLog也是从以150作为前缀的开始读取

getLastLoggedZxid
获取最新的事务ID,该方法只在节点服务器启动的时候被调用

   /**
     * 这个方法只在节点服务器启动的时候被调用
     * 从日志文件中获取最大的zxid
     * get the last zxid that was logged in the transaction logs
     * @return the last zxid logged in the transaction logs
     */
    public long getLastLoggedZxid() {
        // 找出所有的日志文件并排序
        File[] files = getLogFiles(logDir.listFiles(), 0);
        // 排序日志文件,并从日志文件名称中获取包含最大zxid的日志文件的文件名中的日志id
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        // 在最新的日志文件里迭代查找最新的事务id
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            // 根据文件名的事务id遍历迭代该日志文件,获取整个内存数据库的最大事务id
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }

<3> commit方法

commit方法,提交日志并且刷写到磁盘,force方法会把未写磁盘的数据都强制写入磁盘。这是因为在操作系统中出于性能考虑会把数据放入缓冲区,所以不能保证数据在调用write写入文件通道后就及时写到磁盘上了,除非手动调用force方法。force方法需要一个布尔参数,代表是否把meta data也一并强制写入。

<4> truncate方法

truncate方法清空大于给定的zxid事务日志,集群版learner向leader同步的时候,leader告诉learner需要回滚调用Learner#syncWithLeader

4 总结

(1)我们知道zookeeper每次生成的事务日志都带有当前文件的第一条事务的zxid,这有什么好处呢?
a. 这可以帮助快速地定位某一个事务操作所在的日志文件
b. 事务的zxid中高32位包含了epoch,这个是leader所属的周期,因此这样我们可以通过日志文件名就清除的知道,当前运行时的zookeeper所属的leader周期。

zxid:所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch(ZAB协议通过epoch编号来区分Leader周期变化的策略)用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch=(原来的epoch+1),标识当前属于那个leader的统治时期。

(2)在前面,我们知道,每次append写入事务的时,我们都会检测事务文件日志当前剩余的空间是否大于4kb,如果小于4kb,则会在现有的文件基础上加上64MB,然后使用0来填充?那么为什么要使用这种预分配的形式呢?

客户端每次的事务提交,都要将事务写入到事务日志中去,所以事务日志写入的性能决定了zookeeper对客户端的请求的响应。

事务每次的请求可以看作是一次对底层磁盘的IO操作。严格的讲,文件的不断追加写入操作会触发底层磁盘IO为文件不断的开辟新的磁盘块,即磁盘seek。因此为了减少seek的频率,从而提高zookeeper的IO响应的时间,创建事务日志的时候都会进行文件的预分配--在文件处建之时,就会向操作系统预分配一块很大的磁盘块,默认是64mb,而一旦分配的磁盘块剩余的空间<4kb,则会再次分配,这样就可以避免随着每次事务的写入过程中导致日志文件的不断增长而需要不断的触发seek。事务预分配的大小,可以通过系统参数zookeeper.preAllocsize来设置。

相关博客链接:数据和存储-事务日志FileTxnLog

相关文章

网友评论

      本文标题:[ZooKeeper] zookeeper的持久化之TxnLog

      本文链接:https://www.haomeiwen.com/subject/fgiwactx.html