美文网首页MQTT
MQTT---HiveMQ源码详解(二)结构与启动

MQTT---HiveMQ源码详解(二)结构与启动

作者: 西安PP | 来源:发表于2017-10-22 01:34 被阅读86次

    目录结构

    在官网中也有更详细的介绍,下面我只对目录结构做一个简单介绍即可,感兴趣的朋友可以参考官网文档.http://www.hivemq.com/docs/hivemq/latest/#installation


    目录结构

    bin

    包含hivemq.jar以及一些启动脚本

    conf

    包含config.xml、logback.xml以及plugin的配置文件
    examples是一些示例组网场景的示例配置

    data

    metadata存放版本信息(加密过)
    persistence存放着所有持久化信息的文件、以及备份文件。包含client_session_subscriptions、client_sessions、outgoing_message_flow、incomming_message_flow、publish_payloads、queued_messages、retained_messages等。

    diagnostics

    存放着诊断模式下诊断信息,包括系统信息、网络接口信息、jvm信息、插件信息等等。方便开发者排查问题。

    license

    存放hivemq授权license文件。

    log

    存放日志

    plugins

    第三方插件目录


    启动

    既然它是一个java程序,那么我们就从它的main方法开始我们的hivemq源码之路。

    main

    
    public class HiveMQServer {
        private static final Logger LOGGER = LoggerFactory.getLogger(HiveMQServer.class);
        private final NettyServer nettyServer;
        private final ClusterConfigurationService clusterConfigurationService;
        private final PluginBrokerCallbackHandler pluginBrokerCallbackHandler;
        private final PluginInformationStore pluginInformationStore;
        private final Provider<ClusterJoiner> clusterJoinerProvider;
    
        @Inject
        HiveMQServer(NettyServer nettyServer,
                     ClusterConfigurationService clusterConfigurationService,
                     PluginBrokerCallbackHandler pluginBrokerCallbackHandler,
                     PluginInformationStore pluginInformationStore,
                     Provider<ClusterJoiner> clusterJoinerProvider) {
            this.nettyServer = nettyServer;
            this.clusterConfigurationService = clusterConfigurationService;
            this.pluginBrokerCallbackHandler = pluginBrokerCallbackHandler;
            this.pluginInformationStore = pluginInformationStore;
            this.clusterJoinerProvider = clusterJoinerProvider;
        }
    
        public void start() throws InterruptedException, ExecutionException {
            //启动netty server
            this.nettyServer.start().sync();
            //通知OnBrokerStart事件
            fireOnBrokerStart();
            //加入cluster
            joinCluster();
            //启动对应承载在netty上的Listener,并打印出这些Listener启动结果信息。请参考Linstener配置请参考http://www.hivemq.com/docs/hivemq/latest/#configuration-chapter
            ListenableFuture<List<ListenerStartResult>> startFuture = this.nettyServer.startListeners();
            List<ListenerStartResult> startResults = startFuture.get();
            new ListenerStartResultLogger(startResults).log();
        }
    
        private void joinCluster() {
            //根据配置确定是否加入cluster
            if (!this.clusterConfigurationService.isEnabled()) {
                return;
            }
            try {
            //使用ClusterJoiner类进行连接jgroup,组成cluster。
                ClusterJoiner clusterJoiner = this.clusterJoinerProvider.get();
                ListenableFuture<Void> future = clusterJoiner.join();
                future.get();
            } catch (Exception e) {
                if (e.getCause() instanceof DuplicateOrInvalidLicenseException) {
                    LOGGER.error("Found duplicate or invalid license file in the cluster. Shutting down HiveMQ");
                } else if (e.getCause() instanceof DifferentConfigurationException) {
                    LOGGER.error("The configuration of this HiveMQ instance is different form the other instances in the cluster. Shutting down HiveMQ");
                } else {
                    LOGGER.error("Could not join cluster. Shutting down HiveMQ.", e);
                }
                if (e.getCause() instanceof UnrecoverableException) {
                    throw ((UnrecoverableException) e.getCause());
                }
                throw new UnrecoverableException(false);
            }
        }
    
        //通知对应plugin broker已经启动
        private void fireOnBrokerStart() {
            LOGGER.trace("Calling all OnBrokerStart Callbacks");
            printPluginInformations();
            this.pluginBrokerCallbackHandler.onStart();
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            LOGGER.info("Starting HiveMQ Server");
            long startTime = System.nanoTime();
            //初始化SystemInformation,可以通过环境变量来分别设置conf、plugins、log、license等目录。
            //请参考hivemq spi SystemInformation
            LOGGER.trace("Initializing HiveMQ home directory");
            HiveMQSystemInformation systemInformation = new HiveMQSystemInformation(true);
            //创建MetricRegistry
            //请参考开源框架Metrics
            LOGGER.trace("Creating MetricRegistry");
            MetricRegistry metricRegistry = new MetricRegistry();
            //增加统计Listener
            metricRegistry.addListener(new StatisticsListener());
            //初始化日志
            LOGGER.trace("Initializing Logging");
            LogConfigurator.init(systemInformation.getConfigFolder(), metricRegistry);
            //增加未处理异常拦截,并对其进行优雅处理
            LOGGER.trace("Initializing Exception handlers");
            RecoverableExceptionHandler.init();
            //初始化ConfigurationService,并读取conf/config.xml文件,加载用户配置
            //请参考hivemq spi ConfigurationService,
            LOGGER.trace("Initializing configuration");
            HiveMQConfigurationService hiveMQConfigurationService = HiveMQConfigurationServiceFactory.create(systemInformation);
            //创建Clusterid提供者。
            ClusterIdProducer clusterIdProducer = new ClusterIdProducer();
            if (hiveMQConfigurationService.clusterConfiguration().isEnabled()) {
                LOGGER.info("This node's cluster-ID is {}", clusterIdProducer.get());
            }
            //根据原有版本,判断是否需要做持久化数据的migration,如需要进行migration,因为可以配置每个数据的使用策略(file/memory),所以每个数据分别进行migration
            LOGGER.trace("Checking for migrations");
            Map<MigrationType, Set<String>> neededMigrations = Migrations.getNeededMigrations(systemInformation);
            Injector injector = null;
            if (neededMigrations.size() > 0) {
                LOGGER.warn("HiveMQ has been updated, migrating persistent data to new version !");
                neededMigrations.keySet().forEach(type -> LOGGER.debug("{} needs to be migrated", type));
                //因为migration也是依赖guice来做容器,所以migration也会创建一个injector
                injector = Bootstrap.createInjector(systemInformation, hiveMQConfigurationService, clusterIdProducer);
                Migrations.start(injector, neededMigrations);
            }
            //升级完成,将升级的最新版本信息,持久化到文件中,以便下次启动进行判断
            Migrations.finish(systemInformation, hiveMQConfigurationService);
            //初始化guice
            LOGGER.trace("Initializing Guice");
            injector = Bootstrap.createInjector(systemInformation, metricRegistry, hiveMQConfigurationService, clusterIdProducer, injector);
            //从guice中获得HiveMQServer实例,并启动它
            HiveMQServer server = injector.getInstance(HiveMQServer.class);
            server.start();
            //对EXodus日志级别做修改
            LogConfigurator.addXodusLogModificator();
            LOGGER.info("Started HiveMQ in {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
            //hivemq版本升级检查器,会连接hivemq官网判断是否有新版本升级。可以在配置文件中设置不检查
            UpdateChecker updateChecker = injector.getInstance(UpdateChecker.class);
            updateChecker.start();
        }
    
        //根据加载出来的所有plugin打印plugin信息
        //请参考hivemq spi @Information
        private void printPluginInformations() {
            Set<PluginInformation> pluginInformations = this.pluginInformationStore.getPluginInformations();
            pluginInformations.forEach(pluginInformation ->
                    LOGGER.info("Loaded Plugin {} - v{}", pluginInformation.getName(), pluginInformation.getVersion())
            );
        }
    }
    

    Bootstrap & Guice Modules

    它是采用Guice作为di框架,那么我们就从Bootstrap开始看它包含了哪些Module以及简单介绍下这些Module主要是注入哪些对应处理代码。

    
    public class Bootstrap {
        private static final Logger LOGGER = LoggerFactory.getLogger(Bootstrap.class);
    
        public static Injector createInjector(SystemInformation systemInformation, MetricRegistry metricRegistry, HiveMQConfigurationService hiveMQConfigurationService, ClusterIdProducer clusterIdProducer, Injector injector) {
        //根据系统变量判断是否开启诊断模式
            if (!Boolean.parseBoolean(System.getProperty("diagnosticMode"))) {
                LOGGER.trace("Turning Guice stack traces off");
                System.setProperty("guice_include_stack_traces", "OFF");
            }
            //加载所有PluginModule
            //请参考hivemq spi PluginModule
            //后续会专门讲解plugin是如何加载的
            List<PluginModule> pluginModules = new PluginBootstrap().create(systemInformation.getPluginFolder());
            ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder();
            builder.add(
            //系统信息
                    new SystemInformationModule(systemInformation),
               //注册cache的生命周期范围
                    new ScopeModule(),
                    //增加@PostConstruct、@PreDestroy注解处理
                    new LifecycleModule(),
                    //配置的Module
                    new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer),
                    //netty所有handler、以及listenser等module
                    new NettyModule(),
                    //内部module
                    new InternalModule(),
                    //plugin callback module,主要处理plugin注册cabllback后回调
                    new PluginCallbackModule(),
                    //为方法增加cache的module
                    new MethodCacheModule(),
                    //持久化module
                    new PersistenceModule(injector),
                    //统计的module
                    new MetricModule(metricRegistry),
                    //流量监控module
                    new TrafficShapingModule(),
                    //cluster module
                    new ClusterModule(),
                    //plugin提供service的module
                    new ServiceModule(pluginModules),
                    //license的解析、验证、限制module
                    new LicensingModule(),
                    //更新hivemq程序的module
                    new UpdateModule(),
                    //诊断模式module
                    new DiagnosticModule());
            builder.addAll(pluginModules);
            return Guice.createInjector(Stage.PRODUCTION, builder.build());
        }
    
    //创建数据升级的Injector,这个较上面的module加载的少点而已。
        public static Injector createInjector(SystemInformation systemInformation,
                                              HiveMQConfigurationService hiveMQConfigurationService,
                                              ClusterIdProducer clusterIdProducer) {
            ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder();
            builder.add(
                    new SystemInformationModule(systemInformation),
                    new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer),
                    new BridgeModule(),
                    new ScopeModule(),
                    new LifecycleModule());
            return Guice.createInjector(Stage.PRODUCTION, builder.build());
        }
    }
    

    MQTT交流群:221405150

    RocketMQ交流群:10648794

    NewSQL交流群:153575008


    相关文章

      网友评论

        本文标题:MQTT---HiveMQ源码详解(二)结构与启动

        本文链接:https://www.haomeiwen.com/subject/sttguxtx.html