美文网首页
Elastic job 移除leader 源码分析

Elastic job 移除leader 源码分析

作者: pcgreat | 来源:发表于2018-08-14 11:05 被阅读77次

    Elastic job 移除选举方法 leaderService.removeLeader(); 判断是否存在leader ,移除 zk /leader/election/instance node

        /**
         * 删除主节点供重新选举.
         */
        public void removeLeader() {
            jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
        }
    
    

    leaderService.removeLeader() 被以下方法调用


    image.png

    LeaderAbdicationJobListener dataChanged 方法调用了 移除选举方法

     class LeaderAbdicationJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
                    leaderService.removeLeader();
                }
            }
            
            private boolean isLocalServerDisabled(final String path, final String data) {
                return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
            }
        }
    

    简单说 在当前实例是leader 情况下 , 且状态被 disable 的情况下 , 当前leader 会remove leader , 然后 会激活 被动选举 。 (如果在job执行过程 disable leader 节点 , 那么isLeaderUntilBlock 也会执行吧 ,等待leader选举后 再执行job )

    isLocalServerDisabled方法 判断 event path 是否是当前servers 路径本地节点 , 且 event type 是否 disable 都满足 返回true

    JobShutdownHookPlugin shutdown() 方法调用了 移除选举方法

    public final class JobShutdownHookPlugin extends ShutdownHookPlugin {
        
        private String jobName;
        
        @Override
        public void initialize(final String name, final Scheduler scheduler, final ClassLoadHelper classLoadHelper) throws SchedulerException {
            super.initialize(name, scheduler, classLoadHelper);
            jobName = scheduler.getSchedulerName();
        }
        
        @Override
        public void shutdown() {
            CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
            if (null == regCenter) {
                return;
            }
            LeaderService leaderService = new LeaderService(regCenter, jobName);
            if (leaderService.isLeader()) {
                leaderService.removeLeader();
            }
            new InstanceService(regCenter, jobName).removeInstance();
        }
    }
    

    JobShutdownHookPlugin 会在 ctrl c , kill 或者 JVM terminating (idea 手动那个不行的)时候 激活 该job 的 shutdown 方法
    首先判断 注册中心 是否有 该jobname , 如果有 判断当前是否是leader , 如果是 remove leader , 最后会remove 该job 的 instances 节点 如 instances/192.168.200.151@-@7100 节点

    ShutdownListenerManager.InstanceShutdownStatusJobListener 在 job 没有shutdown ,没有 paused ,在 instances下的当前节点 被移除 , 判断当前节点没有运行的情况下 会 执行 schedulerFacade.shutdownInstance(); 然后 会判断当前节点是否leader ,如果是 ,执行 leaderService.removeLeader() 方法 。

    class InstanceShutdownStatusJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
                        && isRemoveInstance(path, eventType) && !isReconnectedRegistryCenter()) {
                    schedulerFacade.shutdownInstance();
                }
            }
            
            private boolean isRemoveInstance(final String path, final Type eventType) {
                return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
            }
            
            private boolean isReconnectedRegistryCenter() {
                return instanceService.isLocalJobInstanceExisted();
            }
        }
    
        /**
         * 终止作业调度.
         */
        public void shutdownInstance() {
            if (leaderService.isLeader()) {
                leaderService.removeLeader();
            }
            monitorService.close();
            if (reconcileService.isRunning()) {
                reconcileService.stopAsync();
            }
            JobRegistry.getInstance().shutdown(jobName);
        }
    
    

    相关文章

      网友评论

          本文标题:Elastic job 移除leader 源码分析

          本文链接:https://www.haomeiwen.com/subject/rzgybftx.html