美文网首页
聊聊PowerJobAutoConfiguration

聊聊PowerJobAutoConfiguration

作者: go4it | 来源:发表于2023-12-20 09:11 被阅读0次

    本文主要研究一下PowerJobAutoConfiguration

    PowerJobProperties

    tech/powerjob/worker/autoconfigure/PowerJobProperties.java

    @ConfigurationProperties(prefix = "powerjob")
    public class PowerJobProperties {
    
        private final Worker worker = new Worker();
    
        public Worker getWorker() {
            return worker;
        }
    
        //......
    }    
    

    PowerJobProperties的配置前缀为powerjob,主要的配置都在worker上

    Worker

        /**
         * Powerjob worker configuration properties.
         */
        @Setter
        @Getter
        public static class Worker {
    
            /**
             * Whether to enable PowerJob Worker
             */
            private boolean enabled = true;
    
            /**
             * Name of application, String type. Total length of this property should be no more than 255
             * characters. This is one of the required properties when registering a new application. This
             * property should be assigned with the same value as what you entered for the appName.
             */
            private String appName;
            /**
             * Akka port of Powerjob-worker, optional value. Default value of this property is 27777.
             * If multiple PowerJob-worker nodes were deployed, different, unique ports should be assigned.
             * Deprecated, please use 'port'
             */
            @Deprecated
            private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
            /**
             * port
             */
            private Integer port;
            /**
             * Address(es) of Powerjob-server node(s). Ip:port or domain.
             * Example of single Powerjob-server node:
             * <p>
             * 127.0.0.1:7700
             * </p>
             * Example of Powerjob-server cluster:
             * <p>
             * 192.168.0.10:7700,192.168.0.11:7700,192.168.0.12:7700
             * </p>
             */
            private String serverAddress;
            /**
             * Protocol for communication between WORKER and server
             */
            private Protocol protocol = Protocol.AKKA;
            /**
             * Local store strategy for H2 database. {@code disk} or {@code memory}.
             */
            private StoreStrategy storeStrategy = StoreStrategy.DISK;
            /**
             * Max length of response result. Result that is longer than the value will be truncated.
             * {@link ProcessResult} max length for #msg
             */
            private int maxResultLength = 8192;
            /**
             * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
             * Test mode is used for conditions that your have no powerjob-server in your develop env, so you can't start up the application
             */
            private boolean enableTestMode = false;
            /**
             * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignored.
             * {@link WorkflowContext} max length for #appendedContextData
             */
            private int maxAppendedWfContextLength = 8192;
    
            private String tag;
            /**
             * Max numbers of LightTaskTacker
             */
            private Integer maxLightweightTaskNum = 1024;
            /**
             * Max numbers of HeavyTaskTacker
             */
            private Integer maxHeavyweightTaskNum = 64;
            /**
             * Interval(s) of worker health report
             */
            private Integer healthReportInterval = 10;
    
        }
    

    Worker定义了enabled、appName、port(默认27777)、serverAddress(支持多个ip:port用逗号分隔)、protocol(默认为akka,也支持http)、storeStrategy(默认为disk,也支持memory,主要是配置H2数据库的存储模式)、maxResultLength(返回结果的最大长度,默认为8192)、enableTestMode(默认为false,主要用于没有部署server的场景下进行调试)、maxAppendedWfContextLength(默认为8192)、tag、maxLightweightTaskNum(默认为1024)、maxHeavyweightTaskNum(默认为64)、healthReportInterval(默认为10s)

    PowerJobWorkerConfig

    tech/powerjob/worker/common/PowerJobWorkerConfig.java

    @Getter
    @Setter
    public class PowerJobWorkerConfig {
        /**
         * AppName, recommend to use the name of this project
         * Applications should be registered by powerjob-console in advance to prevent error.
         */
        private String appName;
        /**
         * Worker port
         * Random port is enabled when port is set with non-positive number.
         */
        private int port = RemoteConstant.DEFAULT_WORKER_PORT;
        /**
         * Address of powerjob-server node(s)
         * Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://.
         */
        private List<String> serverAddress = Lists.newArrayList();
        /**
         * Protocol for communication between WORKER and server
         */
        private Protocol protocol = Protocol.AKKA;
        /**
         * Max length of response result. Result that is longer than the value will be truncated.
         * {@link ProcessResult} max length for #msg
         */
        private int maxResultLength = 8096;
        /**
         * User-defined context object, which is passed through to the TaskContext#userContext property
         * Usage Scenarios: The container Java processor needs to use the Spring bean of the host application, where you can pass in the ApplicationContext and get the bean in the Processor
         */
        private Object userContext;
        /**
         * Internal persistence method, DISK or MEMORY
         * Normally you don't need to care about this configuration
         */
        private StoreStrategy storeStrategy = StoreStrategy.DISK;
        /**
         * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
         * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
         */
        private boolean enableTestMode = false;
        /**
         * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
         * {@link WorkflowContext} max length for #appendedContextData
         */
        private int maxAppendedWfContextLength = 8192;
        /**
         * user-customized system metrics collector
         */
        private SystemMetricsCollector systemMetricsCollector;
        /**
         * Processor factory for custom logic, generally used for IOC framework processor bean injection that is not officially supported by PowerJob
         */
        private List<ProcessorFactory> processorFactoryList;
    
        private String tag;
        /**
         * Max numbers of LightTaskTacker
         */
        private Integer maxLightweightTaskNum = 1024;
        /**
         * Max numbers of HeavyTaskTacker
         */
        private Integer maxHeavyweightTaskNum = 64;
        /**
         * Interval(s) of worker health report
         */
        private Integer healthReportInterval = 10;
    
    }
    

    PowerJobWorkerConfig配置基本与PowerJobProperties.Worker配置相同

    PowerJobAutoConfiguration

    tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java

    @Configuration
    @EnableConfigurationProperties(PowerJobProperties.class)
    @ConditionalOnProperty(prefix = "powerjob.worker", name = "enabled", havingValue = "true", matchIfMissing = true)
    public class PowerJobAutoConfiguration {
    
        @Bean
        @ConditionalOnMissingBean
        public PowerJobSpringWorker initPowerJob(PowerJobProperties properties) {
    
            PowerJobProperties.Worker worker = properties.getWorker();
    
            /*
             * Address of PowerJob-server node(s). Do not mistake for ActorSystem port. Do not add
             * any prefix, i.e. http://.
             */
            CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty! " +
                "if you don't want to enable powerjob, please config program arguments: powerjob.worker.enabled=false");
            List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(","));
    
            /*
             * Create OhMyConfig object for setting properties.
             */
            PowerJobWorkerConfig config = new PowerJobWorkerConfig();
            /*
             * Configuration of worker port. Random port is enabled when port is set with non-positive number.
             */
            if (worker.getPort() != null) {
                config.setPort(worker.getPort());
            } else {
                int port = worker.getAkkaPort();
                if (port <= 0) {
                    port = NetUtils.getRandomPort();
                }
                config.setPort(port);
            }
            /*
             * appName, name of the application. Applications should be registered in advance to prevent
             * error. This property should be the same with what you entered for appName when getting
             * registered.
             */
            config.setAppName(worker.getAppName());
            config.setServerAddress(serverAddress);
            config.setProtocol(worker.getProtocol());
            /*
             * For non-Map/MapReduce tasks, {@code memory} is recommended for speeding up calculation.
             * Map/MapReduce tasks may produce batches of subtasks, which could lead to OutOfMemory
             * exception or error, {@code disk} should be applied.
             */
            config.setStoreStrategy(worker.getStoreStrategy());
            /*
             * When enabledTestMode is set as true, PowerJob-worker no longer connects to PowerJob-server
             * or validate appName.
             */
            config.setEnableTestMode(worker.isEnableTestMode());
            /*
             * Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignored.
             */
            config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength());
    
            config.setTag(worker.getTag());
    
            config.setMaxHeavyweightTaskNum(worker.getMaxHeavyweightTaskNum());
    
            config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum());
    
            config.setHealthReportInterval(worker.getHealthReportInterval());
            /*
             * Create PowerJobSpringWorker object and set properties.
             */
            return new PowerJobSpringWorker(config);
        }
    
    }
    

    PowerJobAutoConfiguration开启了PowerJobProperties,并且会自动配置,除非powerjob.worker.enabled设置为false,之后它配置了PowerJobSpringWorker,这里用initPowerJob这个命名不太好,因为这样子会变成bean的名称是initPowerJob;initPowerJob方法主要是将PowerJobProperties.Worker配置转换为PowerJobWorkerConfig,在port小于等于0时支持随机port;最后根据PowerJobWorkerConfig创建PowerJobSpringWorker

    PowerJobSpringWorker

    tech/powerjob/worker/PowerJobSpringWorker.java

    public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
    
        /**
         * 组合优于继承,持有 PowerJobWorker,内部重新设置 ProcessorFactory 更优雅
         */
        private PowerJobWorker powerJobWorker;
        private final PowerJobWorkerConfig config;
    
        public PowerJobSpringWorker(PowerJobWorkerConfig config) {
            this.config = config;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            powerJobWorker = new PowerJobWorker(config);
            powerJobWorker.init();
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);
    
            BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);
            // append BuiltInSpringProcessorFactory
    
            List<ProcessorFactory> processorFactories = Lists.newArrayList(
                    Optional.ofNullable(config.getProcessorFactoryList())
                            .orElse(Collections.emptyList()));
            processorFactories.add(springProcessorFactory);
            processorFactories.add(springMethodProcessorFactory);
            config.setProcessorFactoryList(processorFactories);
        }
    
        @Override
        public void destroy() throws Exception {
            powerJobWorker.destroy();
        }
    }
    

    PowerJobSpringWorker实现了ApplicationContextAware、InitializingBean、DisposableBean接口;其afterPropertiesSet方法创建PowerJobWorker并执行init方法,其destroy方法执行powerJobWorker.destroy();其setApplicationContext方法主要是创建processorFactories,把springProcessorFactory、springMethodProcessorFactory添加到processorFactories,最后将processorFactories设置到config中

    PowerJobWorker

    tech/powerjob/worker/PowerJobWorker.java

    @Slf4j
    public class PowerJobWorker {
        private final RemoteEngine remoteEngine;
        protected final WorkerRuntime workerRuntime;
        private final AtomicBoolean initialized = new AtomicBoolean(false);
    
        public PowerJobWorker(PowerJobWorkerConfig config) {
            this.workerRuntime = new WorkerRuntime();
            this.remoteEngine = new PowerJobRemoteEngine();
            workerRuntime.setWorkerConfig(config);
        }
    
        //......
    }    
    

    PowerJobWorker定义了remoteEngine、workerRuntime、initialized属性,构造方法将PowerJobWorkerConfig设置到workerRuntime中

    init

        public void init() throws Exception {
    
            if (!initialized.compareAndSet(false, true)) {
                log.warn("[PowerJobWorker] please do not repeat the initialization");
                return;
            }
    
            Stopwatch stopwatch = Stopwatch.createStarted();
            log.info("[PowerJobWorker] start to initialize PowerJobWorker...");
    
            PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
            CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");
    
            try {
                PowerBannerPrinter.print();
                // 校验 appName
                if (!config.isEnableTestMode()) {
                    assertAppName();
                } else {
                    log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
                }
    
                // 初始化元数据
                String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
                workerRuntime.setWorkerAddress(workerAddress);
    
                // 初始化 线程池
                final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
                workerRuntime.setExecutorManager(executorManager);
    
                // 初始化 ProcessorLoader
                ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);
                workerRuntime.setProcessorLoader(processorLoader);
    
                // 初始化 actor
                TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
                ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
                WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);
    
                // 初始化通讯引擎
                EngineConfig engineConfig = new EngineConfig()
                        .setType(config.getProtocol().name())
                        .setServerType(ServerType.WORKER)
                        .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
                        .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
    
                EngineOutput engineOutput = remoteEngine.start(engineConfig);
                workerRuntime.setTransporter(engineOutput.getTransporter());
    
                // 连接 server
                ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
    
                serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
                workerRuntime.setServerDiscoveryService(serverDiscoveryService);
    
                log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
    
                // 初始化日志系统
                OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
                workerRuntime.setOmsLogHandler(omsLogHandler);
    
                // 初始化存储
                TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
                taskPersistenceService.init();
                workerRuntime.setTaskPersistenceService(taskPersistenceService);
                log.info("[PowerJobWorker] local storage initialized successfully.");
    
    
                // 初始化定时任务
                workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);
                workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
    
                log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);
            }catch (Exception e) {
                log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);
                throw e;
            }
        }
    

    init方法先通过PowerBannerPrinter.print()打印banner,对于非testMode会执行assertAppName校验,之后就是设置workerRuntime的workerAddress、executorManager、processorLoader、workerActor、processorTrackerActor,执行remoteEngine.start(engineConfig)、serverDiscoveryService.start、设置omsLogHandler、初始化taskPersistenceService、调度WorkerHealthReporter及logSubmitter

    assertAppName

        private void assertAppName() {
    
            PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
            String appName = config.getAppName();
            Objects.requireNonNull(appName, "appName can't be empty!");
    
            String url = "http://%s/server/assert?appName=%s";
            for (String server : config.getServerAddress()) {
                String realUrl = String.format(url, server, appName);
                try {
                    String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
                    ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
                    if (resultDTO.isSuccess()) {
                        Long appId = Long.valueOf(resultDTO.getData().toString());
                        log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
                        workerRuntime.setAppId(appId);
                        return;
                    }else {
                        log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
                        throw new PowerJobException(resultDTO.getMessage());
                    }
                }catch (PowerJobException oe) {
                    throw oe;
                }catch (Exception ignore) {
                    log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
                }
            }
            log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
            throw new PowerJobException("no server available!");
        }
    

    assertAppName方法,主要是遍历server,调用http://%s/server/assert?appName=%s,根据appName或者appId,然后设置到workerRuntime,有一个成功则立即返回

    buildProcessorLoader

        private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
            List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
            List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);
    
            // 后置添加2个系统 ProcessorLoader
            finalPF.add(new BuiltInDefaultProcessorFactory());
            finalPF.add(new JarContainerProcessorFactory(runtime));
    
            return new PowerJobProcessorLoader(finalPF);
        }
    

    buildProcessorLoader在原来的processorFactoryList基础(BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory)上添加了BuiltInDefaultProcessorFactory、JarContainerProcessorFactory

    close

        public void destroy() throws Exception {
            workerRuntime.getExecutorManager().shutdown();
            remoteEngine.close();
        }
    

    close方法主要是执行workerRuntime.getExecutorManager().shutdown()及remoteEngine.close()

    小结

    PowerJobAutoConfiguration主要是依据PowerJobProperties.Worker配置去创建PowerJobSpringWorker,而PowerJobSpringWorker则是将PowerJobWorker纳入到spring容器中,其setApplicationContext方法主要是将BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory添加到config的processorFactoryList;其init主要是校验appName、初始化线程池、ProcessorLoader、actor、remoteEngine、serverDiscoveryService、omsLogHandler、taskPersistenceService、调度WorkerHealthReporter及logSubmitter。

    相关文章

      网友评论

          本文标题:聊聊PowerJobAutoConfiguration

          本文链接:https://www.haomeiwen.com/subject/khutndtx.html