1 前言
持久化,将数据存储到磁盘或者数据库中 。在系统重启或者出现故障后,能够恢复或者获取到宕机前的信息。
2 Zookeeper持久化方式
- TxnLog —— 增量事务日志,记录系统中所有的增删改记录
- SnapShot —— 快照,记录内存中的全量数据,但是数据不一定是最新的
- TxnLog:是一个读取日志的接口,提供了读取事务log的接口方法
- SnapShot:是一个操作日志快照的接口,提供了对快照文件操作的方法
- FileTxnLog:实现TxnLog接口,提供了读取事务日志的方法实现
- FileSnap:实现SnapShot接口,负责存储、序列化、反序列化、访问快照
- FileTxnSnapLog:封装了TxnLog和SnapShot
- Util:工具类,提供持久化所需的API
3 源码分析
3.1 SnapShot和TxnLog
public interface SnapShot {
* deserialize a data tree from the last valid snapshot and
* return the last zxid that was deserialized
* @param dt the datatree to be deserialized into
* @param sessions the sessions to be deserialized into
* @return the last zxid that was deserialized from the snapshot
* @throws IOException
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
* persist the datatree and the sessions into a persistence storage
* @param dt the datatree to be serialized
* @param sessions
* @throws IOException
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name)
throws IOException;
* 找到最新的snapshot文件
* find the most recent snapshot file
* @return the most recent snapshot file
* @throws IOException
File findMostRecentSnapshot() throws IOException;
* free resources from this snapshot immediately
* @throws IOException
void close() throws IOException;
public interface TxnLog {
* 滚动日志,从当前日志滚动到下一个日志,不是回滚
void rollLog() throws IOException;
* 追加事务记录到事务日志中去
boolean append(TxnHeader hdr, Record r) throws IOException;
* 可迭代读取事务性日志
TxnIterator read(long zxid) throws IOException;
* 事务性操作的最新zxid
long getLastLoggedZxid() throws IOException;
* 清空zxid以后的日志
boolean truncate(long zxid) throws IOException;
* 获取数据库的id
long getDbId() throws IOException;
* 提交事务并进行确认
void commit() throws IOException;
* 关闭事务性日志
void close() throws IOException;
* an iterating interface for reading
* transaction logs.
public interface TxnIterator {
* return the transaction header.
* @return return the transaction header.
TxnHeader getHeader();
* return the transaction record.
* @return return the transaction record.
Record getTxn();
* go to the next transaction record.
* @throws IOException
boolean next() throws IOException;
* close files and release the
* resources
* @throws IOException
void close() throws IOException;
3.2 FileTxnLog的具体属性
private static final Logger LOG;
// 预分配64m大小
static long preAllocSize = 65536 * 1024;
// 直接内存
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
// 魔数,用于校验日志文件的正确性,默认为1514884167
public final static int TXNLOG_MAGIC =
public final static int VERSION = 2;
// 日志文件名前缀
public static final String LOG_FILE_PREFIX = "log";
/** Maximum time we allow for elapsed fsync before WARNing */
private final static long fsyncWarningThresholdMS;
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
// 获得系统参数,判断系统参数配置了预分配内存大小
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
/** Local variable to read fsync.warningthresholdms into */
Long fsyncWarningThreshold;
if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
fsyncWarningThresholdMS = fsyncWarningThreshold;
// 最大(也就是最新的)zxid
long lastZxidSeen;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
// log目录文件
File logDir;
// 是否强制同步,默认是yes
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
long dbId;
private LinkedList<FileOutputStream> streamsToFlush =
new LinkedList<FileOutputStream>();
// 当前配置的大小
long currentSize;
// 写日志文件
File logFileWrite = null;
3.3 FileTxnLog的方法
<1> append方法
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
// 校验头部不能为空,hdr主要包含了cxid,zxid,clientId等相关信息
if (hdr == null) {
return false;
// 如果待写入的事务的事务id小于本地保存的最新的事务id,提醒
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
// 在第一次新建一个FileTxnLog时候,logStream是空的,这个时候需要为它创建一个新的日志文件
// 并把logStream指向这个日志文件
if (logStream==null) {
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
// 根据待写入的事务id创建一个新的日志文件,可以看到文件名包含这个文件存放的事务的最小事务id
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
// 根据魔数,版本号和数据库id生成日志文件头,dbId默认是0
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 确保在用0填充之前,先把魔数信息等写入到文件中,进行依次flush
currentSize = fos.getChannel().position(); //获取当前文件流的大小
// 重新计算文件大小,保证文件的大小是预分配大小的整数倍
// 可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
currentSize = padFile(fos.getChannel());
// 序列化TxnHeader Record记录到byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
// 创建校验和的算法
Checksum crc = makeChecksumAlgorithm();
// 使用指定的字节数组更新校验和
crc.update(buf, 0, buf.length);
// 将更新的校验和值写入到日志文件中
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 将TxnHeader Record数据写入到输出流
Util.writeTxnBytes(oa, buf);
return true;
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
oa.writeBuffer(bytes, "txnEntry");
oa.writeByte((byte) 0x42, "EOR"); // 'B'
- 先计算buf数据长度写入
- 写入buf数组数据
- 记录尾部以‘B’字符结尾,写入0x42
private long padFile(FileChannel fileChannel) throws IOException {
// 计算新的文件大小,并通过填充0先占用未使用的byte空间
// 这样可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
// currentSize默认为0
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
// 将整个日志文件中未使用的部分填充为0
if (currentSize != newFileSize) {
// ByteBuffer缓冲区,position:
// 指定了下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新
// remaining,返回剩余的可用长度
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
return currentSize;
* @param position 通过管道写入的字节长度
* @param fileSize 当前设置的文件大小
* @param preAllocSize 预分配大小
* @return
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
// 如果剩余空间不足4k,且预分配空间大于0
if (preAllocSize > 0 && position + 4096 >= fileSize) {
// If we have written more than we have previously preallocated we need to make sure the new
// file size is larger than what we already have
// 如果已写入的长度超过了文件大小,文件大小扩大为[写入的字节长度+预分配]
if (position > fileSize){
fileSize = position + preAllocSize;
// 这边会重新调整文件大小到预分配长度的整数倍
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
return fileSize;
<1> 在新建FileTxnLog文件时会预分配文件内存大小,并用0来填充,从而保证文件的磁盘占用是连续的,同时通过日志追加的方式,我们可以保证对日志文件的写的顺序性,从而保证写性能
<2> 每次将事务写入到日志文件时,都会先根据写入的事务计算并写入一个检验和,然后再把事务流写入到日志文件中,这样可以充分保证事务日志的安全性和完整型。
<2> read方法
* zxid:指定迭代读取日志文件中的第一个事务ID
* 默认fastForward=true
public TxnIterator read(long zxid) throws IOException {
return new FileTxnIterator(logDir, zxid);
public FileTxnIterator(File logDir, long zxid) throws IOException {
this.logDir = logDir;
this.zxid = zxid;
void init() throws IOException {
storedFiles = new ArrayList<File>();
// 排序目录下的日志文件,文件名称是根据事务id来创建的
// 文件的排序也等价于事务的排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {
// 找出起始事务id大于等于zxid的日志文件
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
// add the last logfile that is less than the zxid
// 当第一次遍历到起始id小于zxid的日志文件后,要记得把该文件也作为查找目标文件,因为它里面可能包含大于zxid的事务
// 同时停止遍历
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
// 找出已排好序且可能存在大于zxid的日志文件后,打开第一个日志文件输入流准备读取
if (!next())
while (hdr.getZxid() < zxid) {
if (!next())
private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) {
// 可能包含大于zxid的事务
this.logFile = storedFiles.remove(storedFiles.size()-1);
ia = createInputArchive(this.logFile);
return true;
return false;
public boolean next() throws IOException {
if (ia == null) {
return false;
try {
// 读取校验和的值
long crcValue = ia.readLong("crcvalue");
// 读取事务
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
// 因为我们是采用预分配内存方式,会定义一个EOF作为空的事务。
// 所以,当我们读取到一个空的,也就表明日志文件已读到末尾
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
// EOF or corrupted record
// validate CRC
// 分析校验和的值是否正确,防止消息被破坏,这就是为什么我们在append的时候加入校验和
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
// 反序列化事务
hdr = new TxnHeader();
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF excepton " + e);
inputStream = null;
ia = null;
hdr = null;
// this means that the file has ended
// we should go to the next file
// 日志文件已经读到末尾了,所以跳到下一个文件开始读取
if (!goToNextLog()) {
return false;
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
throw e;
return true;
* @param logDirList 日志文件列表
* @param snapshotZxid 通过内存快照恢复的最大的事务id,剩余的比snapshotZxid大的就要从日志文件里恢复
* @return
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
// 对日志文件进行排序,按照事务id从高到低
List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
long logZxid = 0;
// logZxid是小于snapshotZxid的最大事务id
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
// 如果文件名的事务id>快照最新的zxid
if (fzxid > snapshotZxid) {
// the files
// are sorted with zxid's
// 如果fzxid <= snapshotZxid 并且 fzxid > logZxid
if (fzxid > logZxid) {
logZxid = fzxid;
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid < logZxid) { // 找出文件id大于logZxid的文件名
return v.toArray(new File[0]);
<1> 找出比snapshotZxid小的最大事务id(从文件名中获取)
<2> 快照能够恢复的最大事务id为200,所以大于200的事务得从TxnLog中恢复,所以读取的TxnLog也是从以150作为前缀的开始读取
* 这个方法只在节点服务器启动的时候被调用
* 从日志文件中获取最大的zxid
* get the last zxid that was logged in the transaction logs
* @return the last zxid logged in the transaction logs
public long getLastLoggedZxid() {
// 找出所有的日志文件并排序
File[] files = getLogFiles(logDir.listFiles(), 0);
// 排序日志文件,并从日志文件名称中获取包含最大zxid的日志文件的文件名中的日志id
long maxLog=files.length>0?
// if a log file is more recent we must scan it to find
// the highest zxid
// 在最新的日志文件里迭代查找最新的事务id
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
// 根据文件名的事务id遍历迭代该日志文件,获取整个内存数据库的最大事务id
itr = txn.read(maxLog);
while (true) {
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
return zxid;
<3> commit方法
commit方法,提交日志并且刷写到磁盘,force方法会把未写磁盘的数据都强制写入磁盘。这是因为在操作系统中出于性能考虑会把数据放入缓冲区,所以不能保证数据在调用write写入文件通道后就及时写到磁盘上了,除非手动调用force方法。force方法需要一个布尔参数,代表是否把meta data也一并强制写入。
<4> truncate方法
4 总结
a. 这可以帮助快速地定位某一个事务操作所在的日志文件
b. 事务的zxid中高32位包含了epoch,这个是leader所属的周期,因此这样我们可以通过日志文件名就清除的知道,当前运行时的zookeeper所属的leader周期。