在springboot中配置一个简单的elastic job任务:
- 定义一个自己的简单任务,实现SimpleJob接口,编写自己任务的实际业务流程:
public class MySimpleJob implements SimpleJob {
Logger logger = LoggerFactory.getLogger(MySimpleJob.class);
@Override
public void execute(ShardingContext shardingContext) {
logger.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
"当前分片项: %s.当前参数: %s," +
"作业名称: %s.作业自定义参数: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
- zookeeper配置,主要是做任务之间的协调:
@Configuration
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
- 作业的配置,这里是作业启动流程的核心起始地,这里创建SpringJobScheduler,并且调用JobScheduler的init方法:
@Configuration
public class MyJobConfig {
private final String cron = "0/3 * * * * ?";
private final int shardingTotalCount = 3;
private final String shardingItemParameters = "0=A,1=B,2=C";
private final String jobParameters = "parameter";
@Autowired
private ZookeeperRegistryCenter regCenter;
@Bean
public SimpleJob stockJob() {
return new MySimpleJob();
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(),
cron, shardingTotalCount, shardingItemParameters, jobParameters));
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).
shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return simpleJobRootConfig;
}
}
上面三个步骤即完成了作业任务的配置,启动springboot应用程序,作业任务随即开始启动,下面梳理分析下任务的启动流程:
-
在作业任务配置的时候,会创建SpringJobScheduler实例,并且调用他的init初始化方法,这里是作业启动流程的起始点
//创建SpringJobScheduler实例,并且调用该实例的init方法
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(),
cron, shardingTotalCount, shardingItemParameters, jobParameters));
}
/**
* 基于Spring的作业启动器.
*/
public final class SpringJobScheduler extends JobScheduler {
private final ElasticJob elasticJob;
public SpringJobScheduler(final ElasticJob elasticJob, final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
super(regCenter, jobConfig, getTargetElasticJobListeners(elasticJobListeners));
this.elasticJob = elasticJob;
}
/**
* 作业调度器.
*/
public class JobScheduler {
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
//添加作业运行实例,这里会在zk中创建instances节点,节点值是job的ip和进程号
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
//为调度器提供内部服务,这里面都是一些service(配置服务,选举服务和分片服务等)
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
//为作业提供内部服务
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
}
这里的SpringJobScheduler是基于spring的作业启动器,这里是封装了JobScheduler,把spring启动和JobScheduler的启动链接起来了,并且调用JobScheduler的init初始化作业。
-
作业的初始化流程:
/**
* 作业调度器.
*/
public class JobScheduler {
/**
* 初始化作业.
*/
public void init() {
//更新作业配置信息,在zk中会生成config节点信息,里面会存放作业的基本配置信息
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
//设置作业分片缓存
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(),
liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
//创建作业调度控制器,这里是对quartz调度的封装,里面是依赖quartz的单机任务调度
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
//添加作业调度控制器至本地缓存
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
//注册作业启动信息,这里也是关键步骤,是作业启动的核心
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
//作业调度控制器开始调度作业
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
}
- 更新作业配置信息,在zk下面会有个config节点信息,里面的值就是当前作业的配置信息
- 设置作业分片缓存信息,这里主要是设置了当前作业任务的总分片信息
- 创建作业调度控制器,这里是对quartz调度的封装,里面是依赖quartz的单机任务调度
- 添加作业调度控制至本地缓存
- 注册作业启动信息,这里是作业启动的核心,下文简单梳理分析
- 作业调度控制器开始调度作业
-
注册作业启动信息
public final class SchedulerFacade {
/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
//开启作业监听器
listenerManager.startAllListeners();
//进行leader选举,会阻塞至leader选举完成
leaderService.electLeader();
//持久化作业服务器信息
serverService.persistOnline(enabled);
//持久化作业运行实例信息
instanceService.persistOnline();
//设置重新分片标记,表示需要进行分片
shardingService.setReshardingFlag();
//初始化作业监听服务
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
}
elastic job任务启动的大致流程就是这样,里面每个步骤的具体细节和作用,在后面的学习中再细致梳理分析。
网友评论