美文网首页
分布式定时任务elastic-job(二)

分布式定时任务elastic-job(二)

作者: 后来丶_a24d | 来源:发表于2021-05-29 09:51 被阅读0次

    目录

    目录.png

    分布式定时任务系列


    执行

    执行器的创建

    AbstractElasticJobExecutor各个属性
    AbstractElasticJobExecutor各个属性.png
    • JobFacade: 作业门面对象
    • JobRootConfiguration: 作业配置
    • ExecutorService: 执行线程池
    • JobExceptionHandler: 异常处理器
    构造函数
    • 构造函数调用
    protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        this.jobFacade = jobFacade;
        // 加载作业配置
        jobRootConfig = jobFacade.loadJobRootConfiguration(true);
        jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
        // 获取线程池
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
        // 获取异常处理器
        jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
        // 分片错误信息集合设置
        itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
    }
    
    • 拿simple job举例是通过SimpleJobExecutor初始化父类AbstractElasticJobExecutor
    public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
        /**
         * simple job实现
         */
        private final SimpleJob simpleJob;
        
        public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
            super(jobFacade);
            this.simpleJob = simpleJob;
        }
    }
    
    构造函数中获取线程池
    • 当某台服务承担多个分片时,需要多线程执行加快速度,再加上任务隔离性,一个任务对应一个线程池。
    // AbstractElasticJobExecutor
    protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        // 省略部分代码
        // 获取线程池
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
    }
    
    // AbstractElasticJobExecutor.getHandler
    private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
        // 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
        String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
        try {
            Class<?> handlerClass = Class.forName(handlerClassName);
            if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
                return handlerClass.newInstance();
            }
            return getDefaultHandler(jobPropertiesEnum, handlerClassName);
        } catch (final ReflectiveOperationException ex) {
            return getDefaultHandler(jobPropertiesEnum, handlerClassName);
        }
    }
    
    // ExecutorServiceHandlerRegistry
    // Map<String, ExecutorService> REGISTRY = new HashMap<>()
    public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
        // 缓存不存在则创建线程池,key是jobName
        if (!REGISTRY.containsKey(jobName)) 
            REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
        }
        return REGISTRY.get(jobName);
    }
    
    // ExecutorServiceHandler 是个接口, 支持自定义线程池
    ExecutorService createExecutorService(final String jobName);
    
    • 构造函数中getHandler是实现可以自定义线程池, 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
    • 自定义线程池和自定义异常处理器逻辑都是一样的
    private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                             int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                             boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
    
        //定义作业核心配置
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobClass.getName(), cron, shardingTotalCount)
                .misfire(true)
                .failover(failover)
                .jobParameter(jobParameter)
                .shardingItemParameters(shardingItemParameters)
                // 自定义异常处理器,也可以自定义线程池
                .jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
                .build();
    
        // 省略其他代码
    
        return liteJobConfiguration;
    }
    
    • 如果没有自定义线程池则使用DefaultExecutorServiceHandler创建线程池, DefaultExecutorServiceHandler创建的线程池使用了guava提供的方法可以再jvm退出时增加钩子等120秒再优雅退出,kill -15类似这种。这种方法会把线程池的线程设置成守护线程,如果遇到需要发mq, 操作数据库,不太适合,因为这jvm关闭spring也关了,发mq也会需要spring(因为一般使用mq都是结合spring来的),这会导致发送失败。
    // DefaultExecutorServiceHandler
    public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler { 
        @Override
        public ExecutorService createExecutorService(final String jobName) {
            return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
        }
    }
    
    // ExecutorServiceObject
    public ExecutorService createExecutorService() {
        // 使用guava提供的方法将线程池自定增加jvm退出钩子
        return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
    }
    
    构造函数中获取异常处理器
    • 实现思路跟获取线程池很像,也是可以自定义的SPI接口,也有默认实现
    public final class DefaultJobExceptionHandler implements JobExceptionHandler {
        
        @Override
        public void handleException(final String jobName, final Throwable cause) {
            log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
        }
    }
    

    执行器的执行

    • 时序图


      时序图.png
    1.1 检查作业环境
    • 源码
    // AbstractElasticJobExecutor
    public final void execute() {
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        }
        // 省略部分代码
    }
    
    // LiteJobFacade
    public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
        configService.checkMaxTimeDiffSecondsTolerable();
    }
    // ConfigService
    public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        // 检查本机与注册中心的时间误差秒数是否在允许范围, 可通过插入测试数据获取zk的更新时间判断zk那边的系统时间
        int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
        if (-1  == maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }
    
    1.2 获取当前作业上下文
    • 在分片章节分析
    1.3,1.5 发布作业状态追踪事件
    • 在事件追踪章节分析
    1.4 跳过正在运行中的被错过执行的作业
    • 这里是防御性编程,防止正在执行的但是属于错过执行的任务重复执行。因为错过任务是在1.9清除错过标记之后(如果1.8判断需要执行错过方法),调用executor方法执行错过任务(1.10),1.7会有executor方法调用这个是执行正常任务。这也就是说,错过任务是在正常任务执行完,马上执行,而不像失效转移任务不是在正常任务执行完,下次任务执行失效转移任务。
    • 分配的作业分片项里存在任意一个分片正在运行中,则设置分片项被错过执行,不去执行这些任务,因为这时候如果不跳过,可能导致同时运行某个作业分片。依赖monitorExecution为true记录下有分片在执行。
    // LiteJobFacade.java
    @Override
    public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
       return executionService.misfireIfHasRunningItems(shardingItems);
    }
    
    // ExecutionService.java
    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
       if (!hasRunningItems(items)) {
           return false;
       }
       setMisfire(items);
       return true;
    }
    
    public boolean hasRunningItems(final Collection<Integer> items) {
       LiteJobConfiguration jobConfig = configService.load(true);
       if (null == jobConfig || !jobConfig.isMonitorExecution()) {
           return false;
       }
       for (int each : items) {
           if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
               return true;
           }
       }
       return false;
    }
    
    1.6 作业执行前的方法
    • 监听器章节分析, 源码
    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
        for (ElasticJobListener each : elasticJobListeners) {
            each.beforeJobExecuted(shardingContexts);
        }
    }
    
    执行正常任务
    • 1.7执行正常任务executor方法
    // 错过执行和正常任务执行还有失效转移执行都是调用此方法,ExecutionSource 不同而已
    // AbstractElasticJobExecutor.java
    private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
       // 分片数据为空,则发布事件
       if (shardingContexts.getShardingItemParameters().isEmpty()) {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
           }
           return;
       }
       // 注册作业启动信息
       jobFacade.registerJobBegin(shardingContexts);
       // 发布作业状态追踪事件
       String taskId = shardingContexts.getTaskId();
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
       }
       try {
           // 核心处理
           process(shardingContexts, executionSource);
       } finally {
           // 注册作业完成信息
           jobFacade.registerJobCompleted(shardingContexts);
           // 根据是否有异常,发布作业状态追踪事件
           if (itemErrorMessages.isEmpty()) {
               if (shardingContexts.isAllowSendJobEvent()) {
                   jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
               }
           } else {
               if (shardingContexts.isAllowSendJobEvent()) {
                   jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
               }
           }
       }
    }
    
    • 注册作业启动信息
    // LiteJobFacade
    public void registerJobBegin(final ShardingContexts shardingContexts) {
       executionService.registerJobBegin(shardingContexts);
    }
    
    // ExecutionService
    public void registerJobBegin(final ShardingContexts shardingContexts) {
        // 告诉JobRegistry当前jobName下的任务正在执行
        JobRegistry.getInstance().setJobRunning(jobName, true);
        if (!configService.load(true).isMonitorExecution()) {
            return;
        }
        // 设置监控作业运行时状态, 才记录运行状态
        for (int each : shardingContexts.getShardingItemParameters().keySet()) {
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
        }
    }
    
    • 注册作业完成信息, 思路跟注册作业启动信息相反,就是移除正在执行的标记,如果开启了失效转移了,调用FailoverService.updateFailoverComplete(), 这个在失效转移章节分析
    • 核心process方法, 但分片则单线程执行,多个分片则多线程执行
    // AbstractElasticJobExecutor
    private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        // 如果是单个分片则直接执行
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
            //执行单个任务
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        // 多个分片,则多线程执行
        final CountDownLatch latch = new CountDownLatch(items.size());
        for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        //执行单个任务
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }
    // AbstractElasticJobExecutor
    private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        // 发布单个事件开始
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(startEvent);
        }
        log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
        JobExecutionEvent completeEvent;
        try {
            // 子类实现,抽象方法
            process(new ShardingContext(shardingContexts, item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobExecutionEvent(completeEvent);
            }
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //发布单个事件结束
            completeEvent = startEvent.executionFailure(cause);
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtil.transform(cause));
            jobExceptionHandler.handleException(jobName, cause);
        }
    }
    // 拿SimpleJobExecutor举例
    protected void process(final ShardingContext shardingContext) {
        // 调用自己实现的job类 
        simpleJob.execute(shardingContext);
    }
    
    执行错过转移任务
    • 1.10执行错过任务executor方法, 如果1.8判断需要错过转移
    • 标记错过执行
    // JobScheduler.java
    private Scheduler createScheduler() {
       Scheduler result;
       // 省略部分代码
       result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
       return result;
    }
    
    private Properties getBaseQuartzProperties() {
       // 省略部分代码
       // 设置最大允许超过 1 毫秒,作业分片项即被视为错过执行
       result.put("org.quartz.jobStore.misfireThreshold", "1");
       return result;
    }
    
    // JobScheduleController.class
    private CronTrigger createTrigger(final String cron) {
       return TriggerBuilder.newTrigger()
               .withIdentity(triggerIdentity)
               .withSchedule(CronScheduleBuilder.cronSchedule(cron)
                // quartz遇到错过执行时不进行处理交给elastic-job
               .withMisfireHandlingInstructionDoNothing())
               .build();
    }
     
    // TriggerListener 监听处理
    // JobTriggerListener.java
    public final class JobTriggerListener extends TriggerListenerSupport {
        
        @Override
        public void triggerMisfired(final Trigger trigger) {
            if (null != trigger.getPreviousFireTime()) {
                executionService.setMisfire(shardingService.getLocalShardingItems());
            }
        }
    }
    
    // ExecutionService设置错过执行标记
    public void setMisfire(final Collection<Integer> items) {
       for (int each : items) {
           jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
       }
    }
    
    • 执行,判断是用内存数据判断,更新数据是zk异步更新,由于网络原因可能,内存数据没更新,是防御编程,保证内存缓存的数据已经更新,这时候业务方需要保证幂等,因为可能重复调用。
    public final void execute() {
       // .... 省略部分代码
       // 判断是否执行错误执行任务, 从时序图1.7开始
       while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
           // 清除错过执行标识
           jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
           // 执行错过任务,跟前面执行正常任务逻辑一样
           execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
       }
    }
    
    执行失效转移1.11
    • 失效转移章节分析
    作业执行后方法1.12
    • 监听器章节分析, 源码
    // LiteJobFacade
    public void afterJobExecuted(final ShardingContexts shardingContexts) {
        for (ElasticJobListener each : elasticJobListeners) {
            each.afterJobExecuted(shardingContexts);
        }
    }
    

    注册中心

    JobNodeStorage

    • JobNodeStorage: 作业节点数据访问类
    • zk注册中心类图, RegistryCenter接口定义了简单的增删改查注册数据和查询时间的接口,CoordinatorRegistryCenter协调分布式服务注册中心,体用持久节点、临时节点、持久顺序节点、临时顺序节点等,ZookeeperRegistryCenter是zk实现的注册中心


      image.png
    • JobeNodeStorage依赖RegistryCenter,各个service是依赖JobeNodeStorage而不直接依赖RegistryCenter


      JobeNodeStorage.png
    • ZookeeperRegistryCenter主要是用curator框架操作zk,有兴趣可以深入一下
    • 几个比较有意思的方法举例说明下:
    获取注册中心当前时间
    • 更新一下数据,然后根据更新数据返回的更新时间,获得注册中心时间。
    //ZookeeperRegistryCenter
    public long getRegistryCenterTime(final String key) {
       long result = 0L;
       try {
           persist(key, "");
           result = client.checkExists().forPath(key).getMtime();
       } catch (final Exception ex) {
           RegExceptionHandler.handleException(ex);
       }
       Preconditions.checkState(0L != result, "Cannot get registry center time.");
       return result;
    }
    

    在主节点执行操作,分布式锁

    • Curator用zk实现了两种分布式锁,LeaderLatch为其中一种,start后await等待拿到锁,多线程调用时其他线程会等待,当前线程释放锁时,其他线程会获取锁,try with resource结束后会调用close方法,LeaderLatch 实现了closable接口,具体使用在选主章节介绍。还有一种分布式锁有兴趣的可以看看
    // JobeNodeStorage 分布式锁,并提供回调机制
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
            // zk节点创建LeaderLatch getClinet依赖ZookeeperRegistryCenter
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        } catch (final Exception ex) {
            handleException(ex);
        }
    }
    

    在事务中执行操作

    • 开启事务,并执行TransactionExecutionCallback 回调
    // JobeNodeStorage
    public void executeInTransaction(final TransactionExecutionCallback callback) {
        try {
            CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
            callback.execute(curatorTransactionFinal);
            curatorTransactionFinal.commit();
        } catch (final Exception ex) {
            RegExceptionHandler.handleException(ex);
        }
    }
    

    监听器

    • ListenerManager管理多个监听器


      ListenerManager.png
    • field中各个manage就是实现各个监听功能

    注册中心监听器

    • ListenerManager中RegistryCenterConnectionStateListener,实现 Curator ConnectionStateListener 接口,监听注册中心连接状态


      RegistryCenterConnectionStateListener.png
    • 监听状态变更
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        if (JobRegistry.getInstance().isShutdown(jobName)) {
            return;
        }
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        // 连接中断 或者连接丢失
        if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
            // 则暂停job
            jobScheduleController.pauseJob();
        } else if (ConnectionState.RECONNECTED == newState) {
            // 持久化作业服务器上线信息
            serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
            // 持久化作业运行实例上线相关信息
            instanceService.persistOnline();
            // 清除本地分配的作业分片项运行中的标记
            executionService.clearRunningInfo(shardingService.getLocalShardingItems());
            // 恢复作业调度
            jobScheduleController.resumeJob();
        }
    }
    
    // JobScheduleController
    public synchronized void pauseJob() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.pauseAll();
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    public synchronized void resumeJob() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.resumeAll();
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    

    分片监听

    • ListenerManager中ShardingListenerManager是分片监听管理
    • 在作业初始化时会开启所有监听器,
    // SchedulerFacade.java
    public void registerStartUpInfo(final boolean enabled) {
       // 开启所有监听器
       listenerManager.startAllListeners();
       // 省略部分方法
    }
    // SchedulerFacade.java
    public void registerStartUpInfo(final boolean enabled) {
       // 开启 所有监听器
       listenerManager.startAllListeners();
       // 省略部分方法
    }
    // 开启 所有监听器
    public void startAllListeners() {
        electionListenerManager.start();
        shardingListenerManager.start();
        failoverListenerManager.start();
        monitorExecutionListenerManager.start();
        shutdownListenerManager.start();
        triggerListenerManager.start();
        rescheduleListenerManager.start();
        guaranteeListenerManager.start();
    
    
    • 调父类的方法AbstractListenerManager中对应方法


      AbstractListenerManager.png
    // AbstractListenerManager
    //开启监听器
    public abstract void start();
    // 添加注册中心监听器到注册中心TreeCache
    protected void addDataListener(final TreeCacheListener listener) {
        jobNodeStorage.addDataListener(listener);
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }
    
    • ShardingListenerManager中start方法
    public void start() {
        addDataListener(new ShardingTotalCountChangedJobListener());
        addDataListener(new ListenServersChangedJobListener());
    }
    
    • AbstractJobListener作业注册中心的监听器抽象类实现了curator中TreeCacheListener
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        // 忽略掉非数据变化的事件
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }
    // 节点变化交给子类实现
    protected abstract void dataChanged(final String path, final Type eventType, final String data);
    
    • 拿ShardingListenerManager举例,监听到变化时设置重新分片标识
    // ShardingListenerManager
    class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }
    

    作业监听器

    • ElasticJobListener作业监听器
    public interface ElasticJobListener {
        // 作业执行前的执行的方法.
        void beforeJobExecuted(final ShardingContexts shardingContexts);
        // 作业执行后的执行的方法.
        void afterJobExecuted(final ShardingContexts shardingContexts);
    }
    
    • 在没传执行任务时会调用
    // AbstractElasticJobExecutor.java
    public final void execute() {
       // 省略代码
       
       // 执行 作业执行前的方法
       try {
           jobFacade.beforeJobExecuted(shardingContexts);
       } catch (final Throwable cause) {
           jobExceptionHandler.handleException(jobName, cause);
       }
       // 省略代码
       // 执行 作业执行后的方法
       try {
           jobFacade.afterJobExecuted(shardingContexts);
       } catch (final Throwable cause) {
           jobExceptionHandler.handleException(jobName, cause);
       }
    }
    
    • 自定义实现可以在配置中设置
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(// 省略) {
    
        return new SpringJobScheduler(simpleJob,
                registryCenter,
                getLiteJobConfiguration(simpleJob.getClass(),
                        cron,
                        shardingTotalCount,
                        shardingItemParameters,
                        jobParameter,
                        failover,
                        monitorExecution,
                        monitorPort,
                        maxTimeDiffSeconds,
                        jobShardingStrategyClass),
                jobEventConfiguration,
                // 自定义作业监听器
                new SimpleJobListener());
    
    }
    public class SimpleJobListener implements ElasticJobListener {
        @Override
        public void beforeJobExecuted(ShardingContexts shardingContexts) {
            System.out.println("任务开始了");
        }
    
        @Override
        public void afterJobExecuted(ShardingContexts shardingContexts) {
            System.out.println("任务结束了");
        }
    }
    

    参考文章

    1. 脑裂是什么?Zookeeper是如何解决的?
    2. Kafka研究系列之kafka 如何避免脑裂?如何选举leader
    3. 如何防止ElasticSearch集群出现脑裂现象
    4. elastic-job调度模型
    5. 芋道源码-elastic-job
    6. Quartz原理解密
    7. 分布式定时任务调度系统技术选型

    相关文章

      网友评论

          本文标题:分布式定时任务elastic-job(二)

          本文链接:https://www.haomeiwen.com/subject/jwdysltx.html