Elastic job 移除选举方法 leaderService.removeLeader(); 判断是否存在leader ,移除 zk /leader/election/instance node
/**
* 删除主节点供重新选举.
*/
public void removeLeader() {
jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}
leaderService.removeLeader() 被以下方法调用
image.png
LeaderAbdicationJobListener dataChanged 方法调用了 移除选举方法
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
简单说 在当前实例是leader 情况下 , 且状态被 disable 的情况下 , 当前leader 会remove leader , 然后 会激活 被动选举 。 (如果在job执行过程 disable leader 节点 , 那么isLeaderUntilBlock 也会执行吧 ,等待leader选举后 再执行job )
isLocalServerDisabled方法 判断 event path 是否是当前servers 路径本地节点 , 且 event type 是否 disable 都满足 返回true
JobShutdownHookPlugin shutdown() 方法调用了 移除选举方法
public final class JobShutdownHookPlugin extends ShutdownHookPlugin {
private String jobName;
@Override
public void initialize(final String name, final Scheduler scheduler, final ClassLoadHelper classLoadHelper) throws SchedulerException {
super.initialize(name, scheduler, classLoadHelper);
jobName = scheduler.getSchedulerName();
}
@Override
public void shutdown() {
CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
if (null == regCenter) {
return;
}
LeaderService leaderService = new LeaderService(regCenter, jobName);
if (leaderService.isLeader()) {
leaderService.removeLeader();
}
new InstanceService(regCenter, jobName).removeInstance();
}
}
JobShutdownHookPlugin 会在 ctrl c , kill 或者 JVM terminating (idea 手动那个不行的)时候 激活 该job 的 shutdown 方法
首先判断 注册中心 是否有 该jobname , 如果有 判断当前是否是leader , 如果是 remove leader , 最后会remove 该job 的 instances 节点 如 instances/192.168.200.151@-@7100 节点
ShutdownListenerManager.InstanceShutdownStatusJobListener 在 job 没有shutdown ,没有 paused ,在 instances下的当前节点 被移除 , 判断当前节点没有运行的情况下 会 执行 schedulerFacade.shutdownInstance(); 然后 会判断当前节点是否leader ,如果是 ,执行 leaderService.removeLeader() 方法 。
class InstanceShutdownStatusJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
&& isRemoveInstance(path, eventType) && !isReconnectedRegistryCenter()) {
schedulerFacade.shutdownInstance();
}
}
private boolean isRemoveInstance(final String path, final Type eventType) {
return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
}
private boolean isReconnectedRegistryCenter() {
return instanceService.isLocalJobInstanceExisted();
}
}
/**
* 终止作业调度.
*/
public void shutdownInstance() {
if (leaderService.isLeader()) {
leaderService.removeLeader();
}
monitorService.close();
if (reconcileService.isRunning()) {
reconcileService.stopAsync();
}
JobRegistry.getInstance().shutdown(jobName);
}
网友评论