美文网首页Mycat
DBLE 2.17.08.1与MyCat 1.6.5的启动过程(

DBLE 2.17.08.1与MyCat 1.6.5的启动过程(

作者: john_zhong | 来源:发表于2017-10-23 14:13 被阅读60次

    概述


    DBLE在启动时,由作为入口的com.actiontech.dble.DbleServerStartup.main()控制,依次调用com.actiontech.dble.DbleServer的以下函数,来完成整个启动过程。这与MyCat的行为相仿。

    com.actiontech.dble.DbleServer()
    com.actiontech.dble.DbleServer.beforeStart()
    com.actiontech.dble.DbleServer.startup()
    

    无论是MyCat的MycatServer,还是DBLE的DbleServerXXXServer这个类并没有自己的线程。完成上面的函数之后,主线程离开DbleServer的相关函数,并完全交出CPU控制权,后续将由持有子线程的其他模块来驱动程序运作。

    换句话说,DbleServer是把各个功能模块封装起来的逻辑盒子,它主要的工作就是初始化和启动DBLE的各种功能模块。

    第一阶段:com.actiontech.dble.DbleServer()


    DBLE在全局公用类的设计上,继承了MyCat的做法,大量使用单实例设计模式(Singleton)。DbleServer本身是单实例设计,其中还包含了大量非传递(non-transitivity)成员变量,并以此来实现其他一些类(或者说是功能模块)的单实例设计。

    DbleServer()的主要工作就是对非传递成员变量进行赋值:

    1. 读取配置文件
    private final ServerConfig config;
    // ...
    this.config = new ServerConfig();
    
    1. 创建定时任务的计时线程池
    private final ScheduledExecutorService scheduler;
    // ...
    scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
    
    1. 设置服务上线标识
    private final AtomicBoolean isOnline;
    // ...
    this.isOnline = new AtomicBoolean(true);
    
    1. 创建缓存服务
    private final CacheService cacheService;
    // ...
    cacheService = new CacheService(config.getSystem().isLowerCaseTableNames());
    
    1. 创建路由服务
    private final RouteService routerService;
    // ...
    routerService = new RouteService(cacheService);
    
    1. 记录启动时间
    private final long startupTime;
    // ...
    this.startupTime = TimeUtil.currentTimeMillis();
    
    1. 创建会话XA检查服务

    这是MyCat没有,DBLE特有的功能组件。原理上是提供commitSessionrollbackSession这两个map,和它们的commit/rollback状态检查,来协助实现XA中的全局事务管理器概念。

    虽然没有进行final修饰,但是它是实例私有(private),也没有提供setter,而且DbleServer中没有更改过它的对象引用,所以也可以认为它是单实例模式的设计。

    private XASessionCheck xaSessionCheck;
    // ...
    xaSessionCheck = new XASessionCheck();
    
    1. 创建序列号发生器
     private final SequenceHandler sequenceHandler;
     // ...
     sequenceHandler = initSequenceHandler(config.getSystem().getSequnceHandlerType());
    

    MycatServer相比,DbleServer在这个阶段(MycatServer的实例化过程中),并不实例化DynaClassLoader(catlet加载器)和SQLInterceptor(SQL解析器,封装了druid)。这是因为DbleServer从设计上就移除了这两个成员,是设计上的显著不同。

    第二阶段:com.actiontech.dble.DbleServer.beforeStart()


    和MyCat一样,beforeStart()仅仅调用SystemConfig.getHomePath(),这是一个只读不写的静态方法,所以并不会引起任何变化。

    public void beforeStart() {
        SystemConfig.getHomePath();
    }
    

    另外,com.actiontech.dble.DbleServerStartup.main()在实例化DbleServer之前也调用同一方法,用来确认DBLE_HOME。

    所以,从功能上来看这个步骤是没有意义的一步。MyCat和DBLE都没有移除它,可能是想保留一个扩展点吧。

    第三阶段:com.actiontech.dble.DbleServer.startup()


    该阶段主要是创建了使外部能够访问和利用DBLE的协作类:业务前端SocketAcceptor server、管理前端SocketAcceptor manager和处理器组NIOProcessor processors[]

    但是,从最终效果来说,其实这些类的实例化与初始化工作前移到DbleServer的实例化阶段(启动的第一阶段)也没有问题(reload操作针对的是ServerConfig)。将这些类的初始化和启动混放在本阶段,各阶段间的逻辑区分不太清晰,应该是来自MyCat的的历史包袱。

    1. 创建备份时用的业务阻塞锁

    这是MyCat所不具备的特性,这个特性对于集群备份非常必要。

    private AtomicBoolean backupLocked;
    // ...
    backupLocked = new AtomicBoolean(false);
    
    1. 创建业务端口(TCP 8066)的聆听器
    ServerConnectionFactory sf = new ServerConnectionFactory();
    SocketAcceptor server = null;
    // ...
    server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", system.getBindIp(),
            system.getServerPort(), system.getServerBacklog(), sf, reactorPool);
    // ...
    server.start();
    
    1. 创建管理端口(TCP 9066)的聆听器
    ManagerConnectionFactory mf = new ManagerConnectionFactory();
    SocketAcceptor manager = null;
    // ...
    manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", system.getBindIp(),
            system.getManagerPort(), 100, mf, reactorPool);
    // ...
    manager.start();
    
    1. 分配内存缓冲池
    int bufferPoolPageSize = system.getBufferPoolPageSize();
    // total page number
    short bufferPoolPageNumber = system.getBufferPoolPageNumber();
    //minimum allocation unit
    short bufferPoolChunkSize = system.getBufferPoolChunkSize();
    totalNetWorkBufferSize = bufferPoolPageSize * bufferPoolPageNumber;
    if (totalNetWorkBufferSize > Platform.getMaxDirectMemory()) {
        LOGGER.error("Direct BufferPool size lager than MaxDirectMemory");
        throw new IOException("Direct BufferPool size lager than MaxDirectMemory");
    }
    bufferPool = new DirectByteBufferPool(bufferPoolPageSize, bufferPoolChunkSize, bufferPoolPageNumber);
    
    1. 分配用于跨节点结果整合/排序/分组/分页用的堆外内存
    if (system.getUseOffHeapForMerge() == 1) {
        try {
            serverMemory = new SeverMemory(system, totalNetWorkBufferSize);
        } catch (NoSuchFieldException e) {
            LOGGER.error("NoSuchFieldException", e);
        } catch (IllegalAccessException e) {
            LOGGER.error("Error", e);
        }
    }
    
    1. 创建业务处理池
    int threadPoolSize = system.getProcessorExecutor();
    // ...
    businessExecutor = ExecutorUtil.createFixed("BusinessExecutor", threadPoolSize);
    
    1. 创建复杂查询处理池
    int threadPoolSize = system.getProcessorExecutor();
    // ...
    complexQueryExecutor = ExecutorUtil.createCached("complexQueryExecutor", threadPoolSize);
    
    1. 创建定时任务处理池
    timerExecutor = ExecutorUtil.createFixed("Timer", 1);
    
    1. 将内存缓冲池和业务处理池组合成一个抽象对象,处理器
    int processorCount = system.getProcessors();
    processors = new NIOProcessor[processorCount];
    // ...
    for (int i = 0; i < processors.length; i++) {
        processors[i] = new NIOProcessor("Processor" + i, bufferPool, businessExecutor);
    }
    
    1. 创建交易语句记录器
    if (config.getSystem().getRecordTxn() == 1) {
        txnLogProcessor = new TxnLogProcessor(bufferPool);
        txnLogProcessor.setName("TxnLogProcessor");
        txnLogProcessor.start();
    }
    
    1. 创建互斥元数据管理器
    tmManager = new ProxyMetaManager();
    if (!this.getConfig().isDataHostWithoutWR()) {
        //init tmManager
        try {
            tmManager.init(this.getConfig());
        } catch (Exception e) {
            throw new IOException(e);
        }
    }
    
    1. 进行XA的crash recovery
    performXARecoveryLog();
    
    1. 注册定时任务
    long dataNodeIdleCheckPeriod = system.getDataNodeIdleCheckPeriod();
    scheduler.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
    scheduler.scheduleWithFixedDelay(processorCheck(), 0L, system.getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
    scheduler.scheduleAtFixedRate(dataNodeConHeartBeatCheck(dataNodeIdleCheckPeriod), 0L, dataNodeIdleCheckPeriod, TimeUnit.MILLISECONDS);
    //dataHost heartBeat  will be influence by dataHostWithoutWR
    scheduler.scheduleAtFixedRate(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);
    scheduler.scheduleAtFixedRate(dataSourceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
    scheduler.scheduleWithFixedDelay(xaSessionCheck(), 0L, system.getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
    scheduler.scheduleWithFixedDelay(xaLogClean(), 0L, system.getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
    scheduler.scheduleWithFixedDelay(resultSetMapClear(), 0L, system.getClearBigSqLResultSetMapMs(), TimeUnit.MILLISECONDS);
    if (system.getUseSqlStat() == 1) {
        //sql record detail timing clean
        scheduler.scheduleWithFixedDelay(recycleSqlStat(), 0L, DEFAULT_SQL_STAT_RECYCLE_PERIOD, TimeUnit.MILLISECONDS);
    }
    
    if (system.getUseGlobleTableCheck() == 1) {    // will be influence by dataHostWithoutWR
        scheduler.scheduleWithFixedDelay(globalTableConsistencyCheck(), 0L, system.getGlableTableCheckPeriod(), TimeUnit.MILLISECONDS);
    }
    
    1. 初始化到数据库的连接
    if (!this.getConfig().isDataHostWithoutWR()) {
        // init datahost
        // connect to Databases using conf settings
        Map<String, PhysicalDBPool> dataHosts = this.getConfig().getDataHosts();
        LOGGER.info("Initialize dataHost ...");
        for (PhysicalDBPool node : dataHosts.values()) {
            String index = dnIndexProperties.getProperty(node.getHostName(), "0");
            if (!"0".equals(index)) {
                LOGGER.info("init datahost: " + node.getHostName() + "  to use datasource index:" + index);
            }
            int activeIndex = node.init(Integer.parseInt(index));
            saveDataHostIndex(node.getHostName(), activeIndex);
            node.startHeartbeat();
        }
    }
    

    相关文章

      网友评论

      本文标题:DBLE 2.17.08.1与MyCat 1.6.5的启动过程(

      本文链接:https://www.haomeiwen.com/subject/jfaguxtx.html