美文网首页
ElasticJob任务启动流程

ElasticJob任务启动流程

作者: 圣村的希望 | 来源:发表于2020-09-13 15:32 被阅读0次

   在springboot中配置一个简单的elastic job任务:

  1. 定义一个自己的简单任务,实现SimpleJob接口,编写自己任务的实际业务流程:
public class MySimpleJob implements SimpleJob {

    Logger logger = LoggerFactory.getLogger(MySimpleJob.class);

    @Override
    public void execute(ShardingContext shardingContext) {
        logger.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));

    }
}
  1. zookeeper配置,主要是做任务之间的协调:
@Configuration
public class JobRegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
                                             @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}
  1. 作业的配置,这里是作业启动流程的核心起始地,这里创建SpringJobScheduler,并且调用JobScheduler的init方法:
@Configuration
public class MyJobConfig {

    private final String cron = "0/3 * * * * ?";
    private final int shardingTotalCount = 3;
    private final String shardingItemParameters = "0=A,1=B,2=C";
    private final String jobParameters = "parameter";

    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Bean
    public SimpleJob stockJob() {
        return new MySimpleJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(),
                cron, shardingTotalCount, shardingItemParameters, jobParameters));
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters,
                                                         final String jobParameters) {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).
                shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        return simpleJobRootConfig;

    }

}

   上面三个步骤即完成了作业任务的配置,启动springboot应用程序,作业任务随即开始启动,下面梳理分析下任务的启动流程:

  1. 在作业任务配置的时候,会创建SpringJobScheduler实例,并且调用他的init初始化方法,这里是作业启动流程的起始点
//创建SpringJobScheduler实例,并且调用该实例的init方法
@Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(),
                cron, shardingTotalCount, shardingItemParameters, jobParameters));
    }
/**
 * 基于Spring的作业启动器.
 */
public final class SpringJobScheduler extends JobScheduler {
    
    private final ElasticJob elasticJob;
    
    public SpringJobScheduler(final ElasticJob elasticJob, final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
        super(regCenter, jobConfig, getTargetElasticJobListeners(elasticJobListeners));
        this.elasticJob = elasticJob;
}
/**
 * 作业调度器.
 */
public class JobScheduler {
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        //添加作业运行实例,这里会在zk中创建instances节点,节点值是job的ip和进程号
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        //为调度器提供内部服务,这里面都是一些service(配置服务,选举服务和分片服务等)
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        //为作业提供内部服务
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
}

   这里的SpringJobScheduler是基于spring的作业启动器,这里是封装了JobScheduler,把spring启动和JobScheduler的启动链接起来了,并且调用JobScheduler的init初始化作业。

  1. 作业的初始化流程:
/**
 * 作业调度器.
 */
public class JobScheduler {
    /**
     * 初始化作业.
     */
    public void init() {
        //更新作业配置信息,在zk中会生成config节点信息,里面会存放作业的基本配置信息
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        //设置作业分片缓存
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), 
        liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        //创建作业调度控制器,这里是对quartz调度的封装,里面是依赖quartz的单机任务调度
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        //添加作业调度控制器至本地缓存
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        //注册作业启动信息,这里也是关键步骤,是作业启动的核心
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        //作业调度控制器开始调度作业
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }
}
  • 更新作业配置信息,在zk下面会有个config节点信息,里面的值就是当前作业的配置信息
  • 设置作业分片缓存信息,这里主要是设置了当前作业任务的总分片信息
  • 创建作业调度控制器,这里是对quartz调度的封装,里面是依赖quartz的单机任务调度
  • 添加作业调度控制至本地缓存
  • 注册作业启动信息,这里是作业启动的核心,下文简单梳理分析
  • 作业调度控制器开始调度作业
  1. 注册作业启动信息
public final class SchedulerFacade {
    /**
     * 注册作业启动信息.
     * 
     * @param enabled 作业是否启用
     */
    public void registerStartUpInfo(final boolean enabled) {
        //开启作业监听器
        listenerManager.startAllListeners();
        //进行leader选举,会阻塞至leader选举完成
        leaderService.electLeader();
        //持久化作业服务器信息
        serverService.persistOnline(enabled);
        //持久化作业运行实例信息
        instanceService.persistOnline();
        //设置重新分片标记,表示需要进行分片
        shardingService.setReshardingFlag();
        //初始化作业监听服务
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
}

   elastic job任务启动的大致流程就是这样,里面每个步骤的具体细节和作用,在后面的学习中再细致梳理分析。

相关文章

网友评论

      本文标题:ElasticJob任务启动流程

      本文链接:https://www.haomeiwen.com/subject/bxbnektx.html