美文网首页
Zookeeper(五)-服务端单机模式-启动流程

Zookeeper(五)-服务端单机模式-启动流程

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-25 14:07 被阅读0次

    概述

    服务端启动分为单机版(Standalone)和集群版(Quorum),主要区别是集群版启动时涉及到选举的过程;


    服务端启动.png

    流程分析

    1. QuorumPeerMain.main

    QuorumPeerMain是整个启动流程的入口;

    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } 
        .......
        System.exit(0);
    }
    
    2. QuorumPeerMain.initializeAndRun
    protected void initializeAndRun(String[] args) throws ConfigException, IOException {
        QuorumPeerConfig config = new QuorumPeerConfig();
        // args[0]:zoo.cfg set ZOOCFGDIR=%~dp0%..\conf   %ZOOMAIN% "%ZOOCFG%"
        if (args.length == 1) {
            config.parse(args[0]);
        }
    
        // Start and schedule the the purge task
        // 启动定时清除日志任务,PurgeInterval默认0,不清理;SnapRetainCount默认3,保留最近3个快照文件;
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
    
        // 配置server,servers > 0,属于集群版,走集群版启动流程;否则走单机版启动流程;
        /**
         * 例如:
         * server.0=192.168.0.1:2888:3888
         * server.0=192.168.0.2:2888:3888
         * server.0=192.168.0.3:2888:3888
         */
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            // 单机版启动
            ZooKeeperServerMain.main(args);
        }
    }
    
    • args[0]启动脚本中定义,默认为conf路径下zoo.cfg文件;
      zkServer.cmd
    @echo off
    setlocal
    call "%~dp0zkEnv.cmd"
    set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
    echo on
    call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
    endlocal
    

    zkEnv.cmd

    set ZOOCFGDIR=%~dp0%..\conf
    set ZOOCFG=%ZOOCFGDIR%\zoo.cfg
    
    • DatadirCleanupManager启动定时清除日志任务,PurgeInterval默认0,不清理;SnapRetainCount默认3,保留最近3个快照文件;同时清理保留的最久的快照文件之前的所有增量事务日志文件;
    • config.servers.size()通过zoo.cfg中server配置判断是集群模式还是单机模式;server配置默认为空,集群模式配置示例如下:
    server.0=192.168.0.1:2888:3888
    server.1=192.168.0.2:2888:3888
    server.2=192.168.0.3:2888:3888
    
    3. 单机模式ZooKeeperServerMain.runFromConfig
    public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the server error or shutdown state changes.
            // 服务状态是ERROR或SHUTDOWN时触发shutdownLatch.countDown()
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));
    
            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            // 默认NIOServerCnxnFactory,可以配置NettyServerCnxnFactory
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);
            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();
    
            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown(true);
            }
        }
        .......
    }
    
    • new CountDownLatch(1)服务启动后进行await阻塞,同时注册服务关闭handler ZooKeeperServerShutdownHandler,当服务状态是ERROR或SHUTDOWN时进行countDown;
    • new FileTxnSnapLog构造FileTxnSnapLog,包含FileTxnLog和FileSnap;(持久化Log相关内容参考《Zookeeper(三)-持久化》)
    • ServerCnxnFactory.createFactory()通过反射构造ServerCnxnFactory,默认为NIOServerCnxnFactory,可以通过启动参数中zookeeper.serverCnxnFactory配置更改为NettyServerCnxnFactory;(下面均以NIOServerCnxnFactory为例进行分析)
    • cnxnFactory.configureNIO服务端启动的常规配置;
    // 创建多路复用器
    final Selector selector = Selector.open();
    // 创建服务端channel
    this.ss = ServerSocketChannel.open();
    // 绑定服务端地址
    ss.socket().bind(addr);
    // 设置非阻塞模式
    ss.configureBlocking(false);
    // 注册OP_ACCEPT事件
    ss.register(selector, SelectionKey.OP_ACCEPT);
    
    4. 单机模式NIOServerCnxnFactory.startup
    public void startup(ZooKeeperServer zks) throws IOException, InterruptedException {
        // 启动Socket服务端
        //TODO 按理说启动socket应该放在最后,这里为什么先启动?如果启动后有客户端请求进来,但是zkserver还未启动完怎么办?
        start();
        setZooKeeperServer(zks);
        // 从磁盘恢复数据
        zks.startdata();
        zks.startup();
    }
    
    • start()启动NIO Socket服务端,开始从多路复用器上轮询注册的事件进行处理;这个地方有个疑问,如果启动后有客户端请求进来,但是zkserver还未启动完怎么办?
    • zks.startdata()从磁盘恢复ZKDatabase/DataTree;把磁盘快照日志+事务日志加载并反序列化出内存对象;(具体解析参考《Zookeeper(三)-持久化》)
    • zks.startup()继续启动ZooKeeperServer;
    5. 单机模式ZooKeeperServer.startup
    public synchronized void startup() {
        if (sessionTracker == null) {
            // 创建会话管理器
            createSessionTracker();
        }
        // 启动会话管理器
        startSessionTracker();
        // 初始化请求业务处理链
        setupRequestProcessors();
        // 注册jmx服务
        registerJMX();
    
        setState(State.RUNNING);
        // 唤醒submitRequest中阻塞线程,解决前面提出的问题
        notifyAll();
    }
    
    • createSessionTracker()/startSessionTracker()创建并启动会话管理器,用来管理session;(后续单独分析)
    • setupRequestProcessors()构造并启动请求业务处理链PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor;(下一节详细分析)
    • registerJMX()注册jmx服务;
    • setState(State.RUNNING)/notifyAll()设置State.RUNNING并唤醒submitRequest中阻塞线程;先进来的客户端请求会被阻塞在ZooKeeper实例上,待此处完全启动之后唤醒,解决前面的疑问;
    public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } 
                ......
            }
        }
    }
    

    至此,单机模式就完成启动

    6. 集群模式QuorumPeer.start
    public synchronized void start() {
        // 加载数据库
        loadDataBase();
        // 启动socket服务端
        cnxnFactory.start();
        // 执行选举流程
        startLeaderElection();
        super.start();
    }
    
    • loadDataBase()先从磁盘日志文件恢复ZKDatabase/DataTree跟单机模式一样;但是加载之后会做一些朝代的校验;
    • cnxnFactory.start()启动socket服务端;
    • startLeaderElection()/super.start()执行选举流程并启动ZooKeeperServer;(后面章节会详细分析)
    7. 集群模式QuorumPeer.loadDataBase

    loadDataBase相比单机模式,增加了朝代的校验,朝代相关概念后续选举流程中还会详细分析;

    // 当前所处的 Leader 年代
    public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
    // Follower 已经接受 Leader 更改 epoch 的 newEpoch
    public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
    // 朝代切换时创建,有这个文件说明正在进行朝代的更替
    public static final String UPDATING_EPOCH_FILENAME = "updatingEpoch";
    private void loadDataBase() {
        File updating = new File(getTxnFactory().getSnapDir(), UPDATING_EPOCH_FILENAME);
        try {
            // 从磁盘加载zkDataBase
            zkDb.loadDataBase();
            // 从最新的zxid恢复epoch变量,zxid64位,前32位是epoch的值,低32位是zxid
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                // 从文件中读取当前的epoch
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
                // 从日志文件中读取的epoch > currentEpoch中epoch,并且正在进行朝代切换
                if (epochOfZxid > currentEpoch && updating.exists()) {
                    LOG.info("{} found. The server was terminated after taking a snapshot but before updating current epoch. Setting current epoch to 1{}.",
                             UPDATING_EPOCH_FILENAME, epochOfZxid);
                    // 拍摄快照后但更新当前代数之前,服务器已终止。将当前代数设置为epochOfZxid;
                    setCurrentEpoch(epochOfZxid);
                    if (!updating.delete()) {
                        throw new IOException("Failed to delete " + updating.toString());
                    }
                }
            } catch(FileNotFoundException e) {
                currentEpoch = epochOfZxid;
                LOG.info(CURRENT_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        currentEpoch);
                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            // 从日志文件中读取的epoch > currentEpoch中epoch,并且没有正在进行朝代切换,则抛出异常
            if (epochOfZxid > currentEpoch) {
                throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
            }
            try {
                // 读取 acceptedEpoch
                acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
                acceptedEpoch = epochOfZxid;
                LOG.info(ACCEPTED_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        acceptedEpoch);
                writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
            }
            // acceptedEpoch 小于 currentEpoch,抛出异常
            if (acceptedEpoch < currentEpoch) {
                throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
            }
        } catch(IOException ie) {
            LOG.error("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
    }
    
    • CURRENT_EPOCH_FILENAME当前所处的 Leader 年代;
    • ACCEPTED_EPOCH_FILENAMEFollower 已经接受 Leader 更改 epoch 的 newEpoch;
    • UPDATING_EPOCH_FILENAME朝代切换时创建,有这个文件说明正在进行朝代的更替;
    • epochOfZxid从日志文件中获取的最新zxid的前32位;(zxid构成:32位epoch + 32位zxid)
    • epochOfZxid > currentEpoch && updating.exists()从日志文件中读取的epoch > currentEpoch中epoch,并且正在进行朝代切换;说明拍摄快照后但更新当前代数之前,服务器宕机,将当前代数设置为epochOfZxid;
    • epochOfZxid > currentEpoch从日志文件中读取的epoch > currentEpoch中epoch,并且没有正在进行朝代切换,则抛出异常;
    • acceptedEpoch < currentEpochacceptedEpoch 小于 currentEpoch,抛出异常;
      ---------over---------

    相关文章

      网友评论

          本文标题:Zookeeper(五)-服务端单机模式-启动流程

          本文链接:https://www.haomeiwen.com/subject/caopnktx.html