1.1RegistryCenter
注册中心<bean id="regCenter" class="com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter" init-method="init">
<constructor-arg>
<bean class="com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration">
<property name="serverLists" value="${elasticjob.zk.url}" />
<property name="namespace" value="${elasticjob.namespace}" />
<property name="baseSleepTimeMilliseconds" value="${elasticjob.baseSleepTimeMilliseconds}" />
<property name="maxSleepTimeMilliseconds" value="${elasticjob.maxSleepTimeMilliseconds}" />
<property name="maxRetries" value="${elasticjob.maxRetries}" />
</bean>
</constructor-arg>
</bean>
ZookeeperRegistryCenter
内部通过ZookeeperConfiguration
的配置信息,使用curator来连接zookeeper服务器,并向外提供操作节点的方法。
注册中心的作用是让所有的job都注册到这里,然后统一管理job的配置信息,当前的运行节点信息,然后可以分片以及弹性扩容等。目前只支持zookeeper,未来会增加。
每一个Job节点的结构如下:
下面讲每一个service时会讲到对应的节点。
1.2Job
<job:bean id="simpleTask" class="packageName.SimpleTask" regCenter="regCenter" cron="0 0 0 1 * ?" shardingTotalCount="1" overwrite="false" />
上面的配置是1.0.8及之前的版本,目前2.X版本的Spring标签以及改为simple等
ElasticJob通过自定义的标签来配置job,每一个job都会注入上面配置的注册中心。
具体使用的类如下:
每一个<job:bean>都会构造一个SpringJobScheduler类型的bean,然后构造JobScheduler,包括了如下的属性和方法:
至此,所有的SpringJobScheduler都已经被注册为bean。与此同时这个Scheduler对应的Task也被扫描装配为bean。
当SpringJobScheduler根据crontab表达式到达执行的时候,也是会通过SpringJobFactory
来找到与自己对应的Task bean,这个Task bean也就是quartz的Job类型,然后通过quartz来执行。
1.3JobScheduler
看一下关键的JobScheduler的初始化
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
做了如下的事情:
- 到注册中心更新当前Job的配置信息。
- 在JobRegistry中注册该Job,包括了JobScheduleController以及Job相关连的RegCenter以及Job的分片信息。
- 注册Job启动信息
- 根据Cron表达式调度Job
值得一提的是,这里从JobScheduler到facade到facade中的所有service都是和每一个Job相关的,同时也都关联到了注册这个Job的注册中心。
1.4SchedulerFacade
SchedulerFacade
是为调度类提供内部服务的门面类,包括如下属性
主要方法有更新Job的config信息,以及注册Job启动信息。
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
configService.persist(liteJobConfig);
return configService.load(false);
}
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
1.4.1ConfigService
节点 :config
位置:/namespace/jobName/config
值:LiteJobConfiguration
类的json字符串。(具体的配置信息见这里)
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
使用:
持久化config信息的时候,要看config节点是否存在,以及是否可以覆盖,以防启动时配置的默认config覆盖了正在使用的配置信息。而在取出config信息的时候可以选择从本地缓存(Curator的TreeCache)中或者直接从注册中心中取出config值。(如果获取之前的配置的时候,可以从缓存获取,如要分片的时候判断当前是否还有正在执行的分片;而要获取最新分片的配置的时候,就要从注册中心取)
监听:
RescheduleListenerManager
注册一个监听config节点的TreeCacheListener,如果收到config节点内容更新的事件通知,然后就根据新的config节点内容重新调度Job。
1.4.2ListenerManager
统一注册上文以及后面各个节点相关的listener。
1.4.3LeaderService
节点:leader
位置:
选举路径 /namespace/jobName/leader/election/latch
instance路径 /namespace/jobName/leader/election/instance
值:
选举路径没有值,选举时存在临时的子节点;instance节点为当前leader的instanceId(ip@-@pid),这是一个临时节点,当断开连接以后节点会清除。
使用:
使用curator的LeaderLatch来进行leader选举,成为leader后会创建instance节点的值写为自己的instanceId;对于leader节点的判断或者删除都是对instance节点的操作。
这里值得注意的是对LeaderLatch的使用,当一个节点被选为leader以后,他会创建instance节点并写为自己,然后退出LeaderLatch,那么剩下参与竞选的节点就会成为leader,但当新leader发现instance节点已经存在之后,他什么都不会做,然后退出LeaderLatch,接着所有剩下的节点会依次成为leader,同样什么都不会做,只有第一个成为leader的会创建instance节点。
所以Ejob中只有在竞选的时候,选举路径才会有临时节点,当leader选定之后,选举路径为空,instance节点存在。
监听:
ElectionListenerManager
会注册两个Listener
-
LeaderAbdicationJobListener
leader禅让监听
监听server节点下的自己的ip,如果自己是leader,并且servers节点下的自己ip被disable,那么就删除leader/instance,然后下次调度会重新选举。 -
LeaderElectionJobListener
leader选举监听
如果当前Job仍在调度中,监听到servers节点下自己的ip没有被disable,且当前没有leader,那么开始选举;
或者监听到leader节点被移除,且自己是可用的服务器,那么开始选举。
1.4.4ServerService
节点:servers
位置: /namespace/jobName/servers/ip
值:ip节点的值为空时表示可用,为disable时表示不可用(可以配置,用于统一开启调度)
使用:
创建ip节点,ip节点是永久节点,每次开始运行时会更新节点里的值;
判断是否存在可用的服务器,包括ip节点没有被disable且有instance子节点。
1.4.5InstanceService
节点:instances
位置: /namespace/jobName/instances/instanceId
值:空
使用:
创建instance节点,临时节点;删除作业是移除节点。
1.4.6ShardingService
节点:sharding
位置: /namespace/jobName/sharding
使用:建立/leader/sharding/necessary子节点,表示这个Job需要重新分片;
AbstractElasticJobExecutor在每次执行Task前的准备工作中就包括了如下的分片流程。
分片流程
- 如果当前节点是leader节点,并且这个Job需要分片,等待这个Job所有正在RUNNING的分片执行完成(/sharding/index/running),然后进入下一步;如果不是leader,那么等待分片完成。
- 从中心获取最新分片配置,写/leader/sharding/processing临时节点,清空原来的/sharding/index/instance节点(包括多余的/sharding/index),写最新的/sharding/index节点;
- 根据配置的策略进行分片,获取JobInstance到分片List的映射
- 通过curator transaction来写/sharding/index/instance节点(值为instanceId),删除necessary和processing节点
监听:
1.ShardingTotalCountChangedJobListener
监听到config节点有shardingtotalCount配置修改,和之前不同的话,写necessary节点 -
ListenServersChangedJobListener
监听到instances子节点中有变动(删除或者新增),servers子节点中有新增\删除\更新,写necessary节点。
JobSchdulerController
由quartz的StdSchduler来根据cron调度,当开始执行的任务的时候,触发LiteJob->AbstractElasticJobExecutor来execute,其中会使用的jobFacade来在这个基类中执行
jobFacade.registerJobBegin(shardingContexts);
jobFacade.registerJobCompleted(shardingContexts);
这里处理RUNNING节点的新增以及remove
AbstractElasticJobExecutor
它代表了任务执行的基本流程,其中做的工作非常多,包括了上面提到的分片,leader来分片,非leader等待分片完成;然后无论是leader还是非leader都会获取分配给本实例的分片列表;
然后包装成本实例的shardingContexts,执行前注册listener。
如果没有获得分片,那么不执行;如果有分片,处理本实例相关的分片的/sharding/index/running临时节点;
当分片为1时,直接调用用户实现的Task开始执行;如果分片大于1,那么使用一个线程池开始所有分片的task执行,然后开启一个CountDownLatch开始等待所有分片完成。所有分片完成后会有一些清理工作。
网友评论