美文网首页
zookeeper单机版-数据恢复

zookeeper单机版-数据恢复

作者: whateverblake | 来源:发表于2020-09-15 16:48 被阅读0次

    前言

    zookeeper服务端对于每次接受到的事务性操作(节点的CRUD)都会先写log,同时zookeeper服务端还会周期性的根据事物数来持久化服务端的数据到磁盘(snapshot)
    zookeeper服务端在启动的时候会从log和snapshot文件来恢复数据

    数据恢复过程

    zookeeper数据恢复分成两个部分

    • 从snapshot恢复
    • 从log恢复
      为什么需要从两个部分恢复呢?
      从前言的描述我们知道每一个事物性的操作都会先写log,所以说log包含了所有的事物操作记录。snapshot是zookeeper按照一定的条件产生的服务端数据持久化文件,那么可不可以直接从snapshot文件去恢复数据而不用管log文件呢?答案是不可以,因为存在这样的可能:zookeeper已经处理了一些事物但是这些操作还没有达到触发系统生成新snapshot文件的条件,如果在这个时候zookeeper server 宕机了,zookeeper如果只是从最新的已经生成的snapshot文件去恢复数据的话就会导致从上一个snapshot生成到宕机这段时间内的所有事物操作都会丢失。所有这个时候需要借助log去帮忙恢复那部分没有被snapshot持久化的事物。另一个问题是可不可以直接通过log去恢复,答案是不可以。如果全部的数据都从log中恢复那么就必须保存全的log数据,同时从log恢复数据的效率也是很低的。

    log,snap格式

    log_snap_filename_format.png

    上图是我本机zookeeper生成的log和snapshot列表
    我们注意log文件名称格式为log.x,snapshot文件格式为snapshot.y。在zookeeper中每一个事物操作都会被分配一个事物id,事物id由服务端统一生成,是一个递增的数字,比如事物B比事物A晚发生,事物A分配的事物id是1,那么事物B的事物id一定是大于1的一个数,可能是2,3......具体是什么值取决于在事物A和事物B之间zookeeper是否还处理过别的事物,以及如果处理了处理了多少个。

    • snapshot,log文件名后缀的意义

    我们看到snapshot文件格式为snapshot.x,log文件格式为log.y,我们知道x,y都是代表zookeeper处理事务的id,对于log来说log里面记录的事物id都是大于或者等于y的,也就是说log.y记录的第一个事物的id是y,那么一个log文件会记录多少个事物呢?在zookeeper中每一个log文件都是固定64M,在生成一个新的log文件的时候会预先在磁盘把这64M空间分配好,这样做的目的是为了加快每次写log的速度,因为是直接分配了64M磁盘,这些空间在磁盘上是连续的,这样一方面可以省去随机写磁盘导致的寻道耗时,另一方面操作系统读文件的时候可以缓存整块的log文件,减少了缓存页换进换出的次数。对于snapshot.x来说存储的事物id都是小于x,也就是说snapshot.x中存储的最后一个事物的id是x

    • log文件内容
    log_after_parse.png

    上图是我本机一个zookeeper log文件解析出来的结果,我们使用框出来的这条记录来分析下log包含的主要信息

    • 事物操作发送的时间
    • 事物操作会话id(session)
    • 事物操作在客户端的编码id cxid
    • 事物操作在服务端的id zxid
    • 具体的事物
      1. 操作类型(create2)
      2. 节点名称 /test/hsbxxxxxxx
      3. 节点存储的信息 #xxxxxx
      4. 权限控制信息 v{s{31,s{'world,'anyone}}}
    
    • 摘要信息
    1. 摘要版本 2
    2. 摘要信息 10546528799
    
    • snapshot文件内容
    snapshot_znode_parse.png
    snapshot_session_parse.png

    上面两个图是我们本机一个snapshot文件解析的结果,我们可以看到snapshot包含了两部分信息:znode和session

    1. znode : 在snapshot文件生成的时间点zookeeper会持久化客户端在zookeeper上创建的所有节点信息。
    2. session: 客户端创建的session信息也会被持久化到snapshot中
    znode的数据内容

    znode主要保存以下信息

    • Czxid 创建节点的事物id
    • ctime 创建时间
    • mZxid 修改节点的事物id
    • mtime 修改时间
    • pZxid 孩子节点的最新事物id
    • cversion 创建的版本信息
    • dataVersion 数据版本信息
    • aclVersion acl 版本信息
    • ephemeralOwner 如果是瞬时节点,对应的是sessionid
    • dataLength 节点存储数据的长度
      还有一些信息没有显示比如节点存储的数据,节点的acl信息
    session信息

    session主要保存了下面的信息

    • sessionid
    • 超时时间
    • sessionid所拥有的瞬时节点数量

    通过上面的知识铺垫我们现在正式进入zookeeper数据恢复的流程
    从上面的介绍我们可以推论出,如果我们能找到最新的没有损害的snapshot.x_newest,然后根据x去获得所有比x大的log.m我们标记为log<m> 以及最大的比x小的log.y_less_but_max,这样我们通过snapshot.x_newest,log<m>和log.y_less_but_max就可以恢复出zookeeper的数据了,对于图log_snap_filename_format.png中数据,只需要使用snapshot.30和log.27就可以完成数据的恢复

    ZookeeperServer.startData

    zookeeper在启动的时候通过ZookeeperServer.startData方法进行数据恢复

      public void startdata() throws IOException, InterruptedException {
            //check to see if zkDb is not null  
            //ZKDatabase是zookeeper数据存储对象
            if (zkDb == null) {
                zkDb = new ZKDatabase(this.txnLogFactory);
            }
            if (!zkDb.isInitialized()) {
               //进行数据恢复
                loadData();
            }
        }
    
    loadData
      public void loadData() throws IOException, InterruptedException {
          
            if (zkDb.isInitialized()) {
                setZxid(zkDb.getDataTreeLastProcessedZxid());
            } else {
                //通过zkDb.loadDataBase()进行数据恢复
                setZxid(zkDb.loadDataBase());
            }
    
            // Clean up dead sessions
           //处理掉已经挂掉的会话
            List<Long> deadSessions = new ArrayList<>();
            for (Long session : zkDb.getSessions()) {
                if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                    deadSessions.add(session);
                }
            }
    
            for (long session : deadSessions) {
                // TODO: Is lastProcessedZxid really the best thing to use?
                killSession(session, zkDb.getDataTreeLastProcessedZxid());
            }
           
           //数据恢复完成后对数据库生成一个全新的snapshot
            // Make a clean snapshot
            takeSnapshot();
        }
    
    ZKDatabase.loadDatabase()

    zookeeper数据恢复其实就是恢复ZKDatabase中各个属性的数据

    public long loadDataBase() throws IOException {
            long startTime = Time.currentElapsedTime();
             //snaplog把数据恢复到dataTree,sessionsWithTimeouts中,返回恢复数据后得到的最大的zxid
            //dataTree存储了zookeeper的节点信息,sessionsWithTimeouts存储了会话的信息
            long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
            initialized = true;
            long loadTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
            LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
                    loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
            return zxid;
        }
    

    到这里我们讲解下DataTree。

    DataTree

    DataTree是zookeeper的数据存储引擎类,下图是DataTree关键属性的截图


    dataTree.png

    从上图我们知道nodes属性存储是zookeeper所有的节点信息


    NodeHashMap.png

    zookeeper在内部使用ConcurrentHashMap作为节点的容器,容器的key是节点的名称,value是节点对象的表示类DataNode,DataNode的属性如下


    data_node.png
    snapLog.restore

    我们继续讲解数据恢复方法链上的snapLog.restore方法

     public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
            long snapLoadingStartTime = Time.currentElapsedTime();
           //snapLog是snapshot的表示类,snapLog.deserialize用来恢复snapshot数据
            long deserializeResult = snapLog.deserialize(dt, sessions);
            ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
            //创建日志文件表示类
            FileTxnLog txnLog = new FileTxnLog(dataDir);
            boolean trustEmptyDB;
            File initFile = new File(dataDir.getParent(), "initialize");
            if (Files.deleteIfExists(initFile.toPath())) {
                LOG.info("Initialize file found, an empty database will not block voting participation");
                trustEmptyDB = true;
            } else {
                trustEmptyDB = autoCreateDB;
            }
            //log文件恢复逻辑在RestoreFinalizer.run中实现
            RestoreFinalizer finalizer = () -> {
                long highestZxid = fastForwardFromEdits(dt, sessions, listener);
                // The snapshotZxidDigest will reset after replaying the txn of the
                // zxid in the snapshotZxidDigest, if it's not reset to null after
                // restoring, it means either there are not enough txns to cover that
                // zxid or that txn is missing
                DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
                if (snapshotZxidDigest != null) {
                    LOG.warn(
                            "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                                    + "which might lead to inconsistent state",
                            Long.toHexString(highestZxid),
                            Long.toHexString(snapshotZxidDigest.getZxid()));
                }
                return highestZxid;
            };
    
            if (-1L == deserializeResult) {
                /* this means that we couldn't find any snapshot, so we need to
                 * initialize an empty database (reported in ZOOKEEPER-2325) */
                if (txnLog.getLastLoggedZxid() != -1) {
                    // ZOOKEEPER-3056: provides an escape hatch for users upgrading
                    // from old versions of zookeeper (3.4.x, pre 3.5.3).
                    if (!trustEmptySnapshot) {
                        throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
                    } else {
                        LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                        return finalizer.run();
                    }
                }
    
                if (trustEmptyDB) {
                    /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                     *       or use Map on save() */
                    save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);
    
                    /* return a zxid of 0, since we know the database is empty */
                    return 0L;
                } else {
                    /* return a zxid of -1, since we are possibly missing data */
                    LOG.warn("Unexpected empty data tree, setting zxid to -1");
                    dt.lastProcessedZxid = -1L;
                    return -1L;
                }
            }
    
            return finalizer.run();
        }
    
    
    SnapShot.deserialize

    我看下从snapshot恢复数据的源代码

    public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
            // we run through 100 snapshots (not all of them)
            // if we cannot get it running within 100 snapshots
            // we should  give up
          //findNValidSnapshots方法从存放snapshot文件的文件夹中按照snapshot后缀事物id从大到小排序获取最多前100个,
            List<File> snapList = findNValidSnapshots(100);
            if (snapList.size() == 0) {
                return -1L;
            }
            File snap = null;
            long snapZxid = -1;
            boolean foundValid = false;
            //下面会尝试从snapList保存的snapshot.x文件去恢复数据,只要有一个snapshot.x能恢复成功,剩下的snapshot.x就不会再被解析
            for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
                //获取当前解析的snapshot文件
                snap = snapList.get(i);
                LOG.info("Reading snapshot {}", snap);
                //获取当前snapshot文件的后缀,也就是这个snapshot文件保存的最大事物id
                snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
                try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
                  //使用当前snapshot文件创建数据读取流
                    InputArchive ia = BinaryInputArchive.getArchive(snapIS);
                   //解析snapshot文件保存的节点信息和session信息
                    deserialize(dt, sessions, ia);
                   //检查snapshot中的数据是否被损坏
                    SnapStream.checkSealIntegrity(snapIS, ia);
    
                    // Digest feature was added after the CRC to make it backward
                    // compatible, the older code can still read snapshots which
                    // includes digest.
                    //
                    // To check the intact, after adding digest we added another
                    // CRC check.
                    //解析摘要信息
                    if (dt.deserializeZxidDigest(ia, snapZxid)) {
                   //检查snapshot中的数据是否被损坏
    
                        SnapStream.checkSealIntegrity(snapIS, ia);
                    }
                    //解析成功,停止继续解析剩下的snapshot文件
                    foundValid = true;
                    break;
                } catch (IOException e) {
                    LOG.warn("problem reading snap file {}", snap, e);
                }
            }
            if (!foundValid) {
                throw new IOException("Not able to find valid snapshots in " + snapDir);
            }
             //记录DataTree中目前最新的事物id为snapZxid
            dt.lastProcessedZxid = snapZxid;
            lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
    
            // compare the digest if this is not a fuzzy snapshot, we want to compare
            // and find inconsistent asap.
            if (dt.getDigestFromLoadedSnapshot() != null) {
                dt.compareSnapshotDigests(dt.lastProcessedZxid);
            }
            return dt.lastProcessedZxid;
        }
    
    deserialize

    snapshot文件解析deserialize方法

    public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
            
            FileHeader header = new FileHeader();
           //先反序列化文件头FileHeader
            header.deserialize(ia, "fileheader");   
            if (header.getMagic() != SNAP_MAGIC) {
                throw new IOException("mismatching magic headers " + header.getMagic() + " !=  " + FileSnap.SNAP_MAGIC);
            }
             //
            SerializeUtils.deserializeSnapshot(dt, ia, sessions);
        }
    
    SerializeUtils.deserializeSnapshot
    
    public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
            //先恢复session信息
            int count = ia.readInt("count");
            while (count > 0) {
                //session信息包含两个部分id,timeout,下面分别从文件中反序列出来
                long id = ia.readLong("id");
                int to = ia.readInt("timeout");
                sessions.put(id, to);
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(
                        LOG,
                        ZooTrace.SESSION_TRACE_MASK,
                        "loadData --- session in archive: " + id + " with timeout: " + to);
                }
                count--;
            }
           //dataTree对象的反序列化
            dt.deserialize(ia, "tree");
        }
    
    
    DataTree.deserialize()
    public void deserialize(InputArchive ia, String tag) throws IOException {
            //先解析acl的信息
            aclCache.deserialize(ia);
            nodes.clear();
            pTrie.clear();
            nodeDataSize.set(0);
            //读取节点信息
            String path = ia.readString("path");
            while (!"/".equals(path)) {
                DataNode node = new DataNode();
              //反序列化DataNode对象
                ia.readRecord(node, "node");
                nodes.put(path, node);
                synchronized (node) {
                    aclCache.addUsage(node.acl);
                }
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1) {
                    root = node;
                } else {
                    String parentPath = path.substring(0, lastSlash);
                    DataNode parent = nodes.get(parentPath);
                    if (parent == null) {
                        throw new IOException("Invalid Datatree, unable to find "
                                              + "parent "
                                              + parentPath
                                              + " of path "
                                              + path);
                    }
                   //那自己加入到父节点信息中
                    parent.addChild(path.substring(lastSlash + 1));
                    long eowner = node.stat.getEphemeralOwner();
                    EphemeralType ephemeralType = EphemeralType.get(eowner);
                    if (ephemeralType == EphemeralType.CONTAINER) {
                        containers.add(path);
                    } else if (ephemeralType == EphemeralType.TTL) {
                        ttls.add(path);
                    } else if (eowner != 0) {
                        HashSet<String> list = ephemerals.get(eowner);
                        if (list == null) {
                            list = new HashSet<String>();
                            ephemerals.put(eowner, list);
                        }
                        list.add(path);
                    }
                }
                path = ia.readString("path");
            }
            // have counted digest for root node with "", ignore here to avoid
            // counting twice for root node
            nodes.putWithoutDigest("/", root);
    
            nodeDataSize.set(approximateDataSize());
    
            // we are done with deserializing the
            // the datatree
            // update the quotas - create path trie
            // and also update the stat nodes
           //跟新各个节点的用户设置的配额信息
            setupQuota();
           //把没有使用到的acl信息清楚
            aclCache.purgeUnused();
        }
    

    上面是snapshot恢复,我接下来看下log文件的恢复

    RestoreFinalizer.run
       
    long highestZxid = fastForwardFromEdits(dt, sessions, listener);
                // The snapshotZxidDigest will reset after replaying the txn of the
                // zxid in the snapshotZxidDigest, if it's not reset to null after
                // restoring, it means either there are not enough txns to cover that
                // zxid or that txn is missing
                DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
                if (snapshotZxidDigest != null) {
                    LOG.warn(
                            "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                                    + "which might lead to inconsistent state",
                            Long.toHexString(highestZxid),
                            Long.toHexString(snapshotZxidDigest.getZxid()));
                }
                return highestZxid;
    
    
    fastForwardFromEdits
     public long fastForwardFromEdits(
            DataTree dt,
            Map<Long, Integer> sessions,
            PlayBackListener listener) throws IOException {
           // txnLog.read(dt.lastProcessedZxid + 1) 生成的就是所有log后缀大于lastProcessedZxid和最大的小于lastProcessedZxid + 1的log的iterator对象
            TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
            long highestZxid = dt.lastProcessedZxid;
            TxnHeader hdr;
            int txnLoaded = 0;
            long startTime = Time.currentElapsedTime();
            try {
                while (true) {
                    // iterator points to
                    // the first valid txn when initialized
                    hdr = itr.getHeader();
                    if (hdr == null) {
                        //empty logs
                        return dt.lastProcessedZxid;
                    }
                    if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                         //如果解析到的事物日志id小于当前highestZxid的值那么打印错误
                        LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
                    } else {
                        //更新highestZxid为当前事物日志的id
                        highestZxid = hdr.getZxid();
                    }
                    try {
                        //解析这条事物日志结果写入到DataTree中,这个地方在这里我们就不展开去讲解了,在zookeeper创建节点的分析中我们会解析
                        processTransaction(hdr, dt, sessions, itr.getTxn());
                        dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
                        txnLoaded++;
                    } catch (KeeperException.NoNodeException e) {
                        throw new IOException("Failed to process transaction type: "
                                              + hdr.getType()
                                              + " error: "
                                              + e.getMessage(),
                                              e);
                    }
                    listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
                    
                    if (!itr.next()) {
                        break;
                    }
                }
            } finally {
                if (itr != null) {
                    itr.close();
                }
            }
    
            long loadTime = Time.currentElapsedTime() - startTime;
            LOG.info("{} txns loaded in {} ms", txnLoaded, loadTime);
            ServerMetrics.getMetrics().STARTUP_TXNS_LOADED.add(txnLoaded);
            ServerMetrics.getMetrics().STARTUP_TXNS_LOAD_TIME.add(loadTime);
             //返回zookeeper已经处理的最大事物id
            return highestZxid;
        }
    
    
    TxnIterator.next()

    next()是解析log文件下一条记录的方法

    public boolean next() throws IOException {
                if (ia == null) {
                    return false;
                }
                try {
                    //解析日志条目的crc码
                    long crcValue = ia.readLong("crcvalue");
                    //读取一行日志
                    byte[] bytes = Util.readTxnBytes(ia);
                    // Since we preallocate, we define EOF to be an
                    if (bytes == null || bytes.length == 0) {
                        throw new EOFException("Failed to read " + logFile);
                    }
                    // EOF or corrupted record
                    // validate CRC
                    Checksum crc = makeChecksumAlgorithm();
                    crc.update(bytes, 0, bytes.length);
                    if (crcValue != crc.getValue()) {
                        throw new IOException(CRC_ERROR);
                    }
                    //把日志二进制数据转化成TxnLogEntry对象
                    TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
                    hdr = logEntry.getHeader();
                   //record是日志操作体
                    record = logEntry.getTxn();
                  //日志的摘要
                    digest = logEntry.getDigest();
                } catch (EOFException e) {
                    LOG.debug("EOF exception", e);
                    inputStream.close();
                    inputStream = null;
                    ia = null;
                    hdr = null;
                    // this means that the file has ended
                    // we should go to the next file
                    //本log已经解析完成,创建下一个文件的读取流
                    if (!goToNextLog()) {
                        return false;
                    }
                    // if we went to the next log file, we should call next() again
                    //解析下一个文件的第一条记录
                    return next();
                } catch (IOException e) {
                    inputStream.close();
                    throw e;
                }
                return true;
            }
    

    结语

    上面就是zookeeper数据恢复的流程和源码

    相关文章

      网友评论

          本文标题:zookeeper单机版-数据恢复

          本文链接:https://www.haomeiwen.com/subject/wzbpektx.html