美文网首页
分布式定时任务elastic-job(一)

分布式定时任务elastic-job(一)

作者: 后来丶_a24d | 来源:发表于2021-05-29 09:47 被阅读0次

    目录

    目录.png

    分布式定时任务系列


    配置

    客户端核心配置SpringJobScheduler

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
                                           @Value("${simpleJob.cron}") final String cron,
                                           @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
                                           @Value("${simpleJob.jobParameter}") final String jobParameter,
                                           @Value("${simpleJob.failover}") final boolean failover,
                                           @Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
                                           @Value("${simpleJob.monitorPort}") final int monitorPort,
                                           @Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
                                           @Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {
            // SpringJobScheduler配置,分析初始化从这里开始
        return new SpringJobScheduler(simpleJob,
                registryCenter,
                getLiteJobConfiguration(simpleJob.getClass(),
                        cron,
                        shardingTotalCount,
                        shardingItemParameters,
                        jobParameter,
                        failover,
                        monitorExecution,
                        monitorPort,
                        maxTimeDiffSeconds,
                        jobShardingStrategyClass),
                jobEventConfiguration,
                new SimpleJobListener());
    
    }
    
    
    private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                         int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                         boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
    
        //定义作业核心配置
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobClass.getName(), cron, shardingTotalCount)
                .misfire(true)
                .failover(failover)
                .jobParameter(jobParameter)
                .shardingItemParameters(shardingItemParameters)
                .jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
                .build();
    
        //定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
    
        //定义Lite作业根配置
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
                .jobShardingStrategyClass(jobShardingStrategyClass)
                .monitorExecution(monitorExecution)
                .monitorPort(monitorPort)
                .maxTimeDiffSeconds(maxTimeDiffSeconds)
                .overwrite(true)
                .build();
    
        return liteJobConfiguration;
    }
    

    SpringJobScheduler

    • 初始化代码
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }
        
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
    
    • 可以看到父类JobScheduler有几个属性LiteJobConfiguration(Lite作业配置),CoordinatorRegistryCenter(注册中心),SchedulerFacade,JobFacade


      SpringJobScheduler对应diagram.png
    LiteJobConfiguration
    • 作业配置类


      作业配置类diagram.png
    1. typeConfig: 作业配置类型,分为simple,dataflow,script不同类型不同配置类


      JobTypeConfiguration的diagram.png
    2. monitorExecution监控作业运行状态,如果执行时间与执行间隔都短的情况下不建议开启
    3. maxTimeDiffSeconds设置最大容忍的本机与注册中心的时间误差秒数
    4. monitorPort作业监控端口,可以dump文件,echo “dump” | nc 127.0.0.1 9888
    5. jobShardingStrategyClass策略模式实现分片的不同策略
    6. reconcileIntervalMinutes修复作业服务器不一致状态服务调度间隔时间
    7. disabled是否禁用
    8. overwrite使用本地作业配置覆盖注册中心的作业配置
    • JobTypeConfiguration,以最常用的SimpleJobConfiguration举例


      SimpleJobConfiguration.png
      JobCoreConfiguration.png
    1. jobName作业名称
    2. cron core表达式
    3. shardingTotalCount作业分片总数
    4. shardingItemParameters 分片项目参数,给分片上别名,本来分片是0,1,2,3这种的,可以起别名增加可读性
    5. jobParameter自定义作业参数
    6. failover失效转移是否启动,集群上别的集群如果有分片失效了比如这台服务宕机,可以让本机执行失效的分片,这里并不是master节点(可能每个任务都不同的master节点,因为主节点如果存在说明zk上有/${JOB_NAME}/leader/electron/instance,不同job就有可能存在不同的master节点)执行,而是任务争夺到分布式锁的机器执行,这里能够利用集群能力
    7. misfire错过执行 如果本机的任务时间间隔比执行时间短,那就错过了,elastic-job提供错过重执行的功能
    8. description描述
    9. jobProperties job属性,可以自定义异常处理器和线程池是SPI接口
    CoordinatorRegistryCenter
    • 注册中心,用于协调分布式服务,提供了默认实现ZookeeperRegistryCenter。ZookeeperRegistryCenter定义了持久节点、临时节点、持久顺序节点、临时顺序节点等目录服务接口方法,隐性的要求提供事务、分布式锁、数据订阅等特。ZookeeperRegistryCenter封装了apaceh /curator(zk连接设置节点等,使用curator很方便)方法。elastic job使用 Quartz 作为调度内核。


      注册中心配置.png
    • ZookeeperConfiguration,初始化方法, 根据设置的zk配置初始化curator框架的client。其中ExponentialBackoffRetry动态计算时间间隔
    public void init() {
        log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getServerLists())
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                .namespace(zkConfig.getNamespace());
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
            builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
                    .aclProvider(new ACLProvider() {
                    
                        @Override
                        public List<ACL> getDefaultAcl() {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    
                        @Override
                        public List<ACL> getAclForPath(final String path) {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    });
        }
        client = builder.build();
        client.start();
        try {
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }
    // 动态计算时间间隔
    baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
    
    • 缓存是通过 Curator TreeCache 实现监控整个树( Zookeeper目录 )的数据订阅和缓存
    • 对zk的crud比较基础细节可以深入源码看
    SchedulerFacade
    • 通过调用构造器
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }
        
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 省去部分代码
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
    }
    
    // 调用SchedulerFacade
     public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
            this.jobName = jobName;
            configService = new ConfigurationService(regCenter, jobName);
            leaderService = new LeaderService(regCenter, jobName);
            serverService = new ServerService(regCenter, jobName);
            instanceService = new InstanceService(regCenter, jobName);
            shardingService = new ShardingService(regCenter, jobName);
            executionService = new ExecutionService(regCenter, jobName);
            monitorService = new MonitorService(regCenter, jobName);
            reconcileService = new ReconcileService(regCenter, jobName);
            listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
        }
    
    • 为调度器提供内部服务的门面类,包含各个service


      为调度器提供内部服务的门面类
    • ElasticJobListeners监听器是放到ListenerManager做管理
    • 可以看到各个service都是基于regCenter, jobName做初始化
    JobFacade
    • 通过调用构造器初始化
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                            final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 省略部分代码
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
    // 调用LiteJobFacade构造器
    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        executionContextService = new ExecutionContextService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }
    
    • 为Job提供内部服务的门面类, 包含各个service, 以及event bus


      为Job提供内部服务的门面类.png
    • 初始化时会传设置好的JobEventConfiguration,通过new JobEventBus(jobEventConfig)设置JobEventBus。
    • 监听器是LiteJobFacade自个管理
    • 可以看到各个service都是基于regCenter, jobName做初始化

    事件追踪配置JobEventConfiguration

    @Autowired
    private DataSource dataSource;
    
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
    
    • 初始化的类比较简单
    public final class JobEventRdbConfiguration extends JobEventRdbIdentity implements JobEventConfiguration, Serializable {
        private static final long serialVersionUID = 3344410699286435226L;
        private final transient DataSource dataSource;
    
        public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
            try {
                return new JobEventRdbListener(this.dataSource);
            } catch (SQLException var2) {
                throw new JobEventListenerConfigurationException(var2);
            }
        }
       // 初始化时调用
        @ConstructorProperties({"dataSource"})
        public JobEventRdbConfiguration(DataSource dataSource) {
            this.dataSource = dataSource;
        }
    
        public DataSource getDataSource() {
            return this.dataSource;
        }
    }
    

    JobRegistry作业注册表

    • 调用构造函数构造SpringJobScheduler时,会往JobRegistry注册
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }
        
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        // 省略部分代码
    }
    // getInstance单例
     public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }
    // addJobInstance
    public void addJobInstance(final String jobName, final JobInstance jobInstance) {
        jobInstanceMap.put(jobName, jobInstance);
    }
    
    • JobRegistry是单例,设计思想有点类似spring ioc容器


      JobRegistry.png
    • instance: 单例 private static volatile JobRegistry 修饰
    • 各个map缓存数据,ConcurrentHashMap保证单个put并发安全

    初始化

    • 调用完构造函数后,会调用init方法
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(//省略参数) {
        return new SpringJobScheduler(省略参数);
    }
    
    • 调用init方法
    public void init() {
        //更新作业配置
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        // 设置分片信息
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        // 创建作业调度控制器
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        // 添加作业调度控制器
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        // 注册启动信息
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        // 调度作业,基于quartz
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }
    

    更新作业配置

    • SchedulerFacade门面里面ConfigService new出来的, ConfigService依赖JobNodeStorage,JobNodeStorage使用curator对zk操作
     public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        // 持久化
        configService.persist(liteJobConfig);
        // 读取配置
        return configService.load(false);
    }
    
    // ConfigService 
    public void persist(final LiteJobConfiguration liteJobConfig) {
        checkConflictJob(liteJobConfig);
        if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
            jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
        }
    }
    

    设置作业分片

    • SchedulerFacade门面里面JobRegistry.getInstance().setCurrentShardingTotalCount
    private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();
    
    public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
        currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
    }
    

    创建作业调度控制器

    • JobScheduleController作业调度控制器,提供对Quartz的封装,
    JobScheduleController jobScheduleController = new JobScheduleController(
                    createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
    
    createScheduler
    • org.quartz.threadPool.threadCount这里配置1,Quartz 执行作业线程数量为 1, 一个ElasticJob对应1个JobScheduler作业调度器,失效转移时会调用quartz的trigger,有可能多次调用,这个时候因为Quartz 执行作业线程数设置为1,所以是串行执行,不会有并发问题。
     // 创建quartz调度器
     private Scheduler createScheduler() {
        Scheduler result;
        try {
            // quartz提供的能力创建Scheduler
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }
    
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        // Quartz 执行作业线程数量为 1, 一个ElasticJob对应1个JobScheduler作业调度器
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }
    
    createJobDetail
    • 创建JobDetail时使用newJob(LiteJob.class),每次任务都会new LiteJob,Jodetail.jobDataMap为LiteJob添加属性,这样虽然每次都是new的LiteJob, 但是每次属性是一样的。
    private JobDetail createJobDetail(final String jobClass) {
        // quartz的能力,创建JobDetail
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        // 创建ElasticJob对象
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
    
    注册作业启动信息
    • SchedulerFacade里面调用注册作业启动信息
    public void registerStartUpInfo(final boolean enabled) {
        // 开启所有监听器
        listenerManager.startAllListeners();
        // 选主
        leaderService.electLeader();
        // zk持久化作业服务信息
        serverService.persistOnline(enabled);
        // zk持久化作业运行实例上线相关信息
        instanceService.persistOnline();
        // 在zk上设置需要重新分片的标注,新上线服务,后面会分析分片
        shardingService.setReshardingFlag();
        // 初始化作业监听服务,这个在后面自诊断修复会涉及
        monitorService.listen();
        // 初始化调解作业不一致状态服务
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
    
    作业调度
    public void init() {
       // 进行作业调度 利用quartz
       jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }
    

    初始化与执行联系

    quartz简单介绍

    • 核心元素有四个
    1. Trigger: 触发器用于定义时间规则,比如core表达式的触发
    2. Job: 任务执行的逻辑
    3. JobDetail: 任务详情
    4. Scheduler: 调度控制器
    • 借用参考文章6的图


      quartz核心元素.png
    • quartz有两种线程类型,一种是常规执行任务线程,一种是错过执行线程,当任务触发时间小于任务执行时间时,就会标记错过的任务,这个时候elastic-job将quartz错过执行的逻辑转移到elastic-job框架处理

    初始化与执行的联系

    • 在初始化时创建JobDetail,JobBuilder.newJob(LiteJob.class),每次执行会new LiteJob,并且设置属性进行,属性里面的值每次都是同一实例
    private JobDetail createJobDetail(final String jobClass) {
        // quartz的能力,创建JobDetail
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
    }
    
    • 任务触发之后会执行LiteJob.execute方法
    public final class LiteJob implements Job {
        
        @Override
        public void execute(final JobExecutionContext context) throws JobExecutionException {
            JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
        }
    }
    
    // JobExecutorFactory
    // 这里是策略模式的Context,根据不同输入,获取不同实例,有脚本job simple job和data flow job
    public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
        if (null == elasticJob) {
            return new ScriptJobExecutor(jobFacade);
        }
        if (elasticJob instanceof SimpleJob) {
            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        if (elasticJob instanceof DataflowJob) {
            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
    }
    
    • 到AbstractElasticJobExecutor.executor是几个Job Executor的父类,executor这块代码也是elastic-job核心的代码


      AbstractElasticJobExecutor子类.png

    参考文章

    1. 脑裂是什么?Zookeeper是如何解决的?
    2. Kafka研究系列之kafka 如何避免脑裂?如何选举leader
    3. 如何防止ElasticSearch集群出现脑裂现象
    4. elastic-job调度模型
    5. 芋道源码-elastic-job
    6. Quartz原理解密
    7. 分布式定时任务调度系统技术选型

    相关文章

      网友评论

          本文标题:分布式定时任务elastic-job(一)

          本文链接:https://www.haomeiwen.com/subject/xudysltx.html