美文网首页
Elastic-job分布式任务调度框架

Elastic-job分布式任务调度框架

作者: keetone | 来源:发表于2019-04-24 18:11 被阅读0次

    本文将从以下几个方面展开介绍elastic-job

    简介

    Elastic-job是由当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

    Elastic-job-lite:
    定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
    仅关注点分布式调度、协调以及分片等核心功能

    Elastic-Job-Cloud :
    提供一体化私有云服务,将分布式调度、作业部署、资源分配、监控、日志处理等提供完善的解决方案。
    功能更加完善,但使用复杂度较高。使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务

    功能特点:

    功能 Elastic-job-lite Elastic-Job-Cloud
    分布式调度协调
    弹性扩容缩容
    失效转移
    错过执行作业重触发
    作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
    支持并行调度
    支持作业生命周期操作
    丰富的作业类型
    Spring整合以及命名空间提供
    运维平台
    自诊断并修复分布式不稳定造成的问题
    应用自动分发
    基于Fenzo的弹性资源分配
    基于Docker的进程隔离(TBD)

    开发指南

    基于目前流行的springboot框架来讲解如何在springboot中集成elastic-job,所以在进行下面的操作之前我们先得创建一个springboot项目,至于如何创建springboot项目这里就不做介绍了,访问这个网站:https://start.spring.io/

    1. 引入maven依赖

      <dependency> 
          <groupId>com.dangdang</groupId> 
          <artifactId>elastic-job-lite-core</artifactId> 
          <version>2.1.5</version> 
      </dependency>
      
    2. 作业开发

      Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

      2.1.Simple类型作业

      意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

      public class CourseSimpleJob implements SimpleJob {
          
          @Override
          public void execute(ShardingContext context) {
              switch (context.getShardingItem()) {
                  case 0: 
                      // do something by sharding item 0
                      break;
                  case 1: 
                      // do something by sharding item 1
                      break;
                  case 2: 
                      // do something by sharding item 2
                      break;
                  // case n: ...
              }
          }
      }
      

      这里的context.getShardingItem()为当前分片序号,最大值为分片总数减1(n-1)。例如:分片总数为10,那么context.getShardingItem()的最大值则为9。

      2.2.Dataflow类型作业

      Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

      public class SpringDataflowJob implements DataflowJob<Foo> {
      
          @Override
          public List<Foo> fetchData(ShardingContext context) {
              switch (context.getShardingItem()) {
                  case 0: 
                      List<Foo> data = // get data from database by sharding item 0
                      return data;
                  case 1: 
                      List<Foo> data = // get data from database by sharding item 1
                      return data;
                  case 2: 
                      List<Foo> data = // get data from database by sharding item 2
                      return data;
                  // case n: ...
              }
          }
          
          @Override
          public void processData(ShardingContext shardingContext, List<Foo> data) {
              // process data
              // ...
          }
      }
      

      流式处理

      可通过DataflowJobConfiguration配置是否流式处理。

      流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

      如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

      2.3.Script类型作业

      Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

      #!/bin/bash
      echo sharding execution context is $*
      
    3. 作业配置

      3.1.zookeeper配置

      新建RegistryCenterConfig类

      @Configuration
      @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
      public class RegistryCenterConfig {
      
          private final Logger log = LoggerFactory.getLogger(RegistryCenterConfig.class);
      
          @Bean(initMethod = "init")
          public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
              log.info("---------regCenter.serverList: {}---------", serverList);
              return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
          }
      }
      

      在配置文件application.yml中添加这两个属性

      regCenter:
          serverList: 127.0.0.1:2181 #zookeeper地址
          namespace: elastic-job-lite-springboot #命名空间
      

      3.2.配置事件追踪

      关于事件追踪这里只做代码展示,具体详细请参考:http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/

      新建JobEventConfig类:

      @Configuration
      public class JobEventConfig {
      
          @Resource
          private DataSource dataSource;
      
          @Bean
          public JobEventConfiguration jobEventConfiguration() {
              return new JobEventRdbConfiguration(dataSource);
          }
      }
      

      在配置文件application.yml中添加数据源相关的配置

      spring:
          datasource:
              url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
              driver-class-name: com.mysql.jdbc.Driver
              username: root
              password: 123456
              tomcat:
                  max-wait: 10000
                  min-idle: 0
                  initial-size: 25
                  validation-query: SELECT 1
                  test-on-borrow: false
                  test-while-idle: true
                  time-between-eviction-runs-millis: 18800
                  remove-abandoned: true
                  remove-abandoned-timeout: 180
      

      这里是在demo中这么配置数据源,一般在现成的项目中都会有相应的数据源配置,无需更改,只需直接将DataSource注入JobEventConfig中即可。

      官方配置

      这里我们先看看官方给的通用配置:

      // 定义作业核心配置
      JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
      // 定义SIMPLE类型配置
      SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
      // 定义Lite作业根配置
      JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
      
      // 定义作业核心配置
      JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();
      // 定义DATAFLOW类型配置
      DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);
      // 定义Lite作业根配置
      JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
      
      // 定义作业核心配置配置
      JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();
      // 定义SCRIPT类型配置
      ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");
      // 定义Lite作业根配置
      JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();
      

      这里官方只是告诉你他的api是这么用的,但是实际工作中我们肯定不能生搬硬套,不然每一个job都要写一个配置,那多麻烦,有时候还会忘记。因为在实际工作中肯定是有很多个job,其实我们完全可以写一个通用的配置类,然后将job配置的相关参数提取到配置文件中或者直接在job类中添加自定义注解,然后在注解中添加相关配置,最后在配置类中去扫描这些job的相关配置并遍历它生成对应的job配置实例,这样作业的配置代码只需要编写一次即可。

      3.3.Simple类型作业配置

      这里以Simple类型作业为例来简单的编写一个Simple类型通用的作业配置

      @Configuration
      public class SimpleJobConfig {
          private static final String      JOB_TYPE                 = "simpleJob";
          public static final  String      CRON                     = "cron";
          public static final  String      SHARDING_TOTAL_COUNT     = "shardingTotalCount";
          public static final  String      SHARDING_ITEM_PARAMETERS = "shardingItemParameters";
          private final        Environment env;
      
          @Resource
          private ZookeeperRegistryCenter regCenter;
      
          @Resource
          private JobEventConfiguration jobEventConfiguration;
      
          @Autowired
          public SimpleJobConfig(Environment env) {
              this.env = env;
          }
      
          @Bean
          public List<JobScheduler> simpleJobScheduler() {
              List<SimpleJob>    simpleJobs    = schedulers();
              List<JobScheduler> jobSchedulers = new ArrayList<>();
              Joiner             joiner        = Joiner.on(".");
              JobScheduler       scheduler;
              for (SimpleJob job : simpleJobs) {
                  String propertiesPrefix       = joiner.join(JOB_TYPE, job.getClass().getCanonicalName());
                  String cron                   = env.getProperty(joiner.join(propertiesPrefix, CRON));
                  int    shardingTotalCount     = Integer.valueOf(env.getProperty(joiner.join(propertiesPrefix, SHARDING_TOTAL_COUNT)));
                  String shardingItemParameters = env.getProperty(joiner.join(propertiesPrefix, SHARDING_ITEM_PARAMETERS));
      
                  if (shardingTotalCount > 0
                          && StringUtils.isNoneBlank(cron)
                          && StringUtils.isNoneBlank(shardingItemParameters)
                  ) {
                      scheduler = new SpringJobScheduler(job, regCenter, getLiteJobConfiguration(job.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
                      scheduler.init();
                      jobSchedulers.add(scheduler);
                  }
              }
              return jobSchedulers;
          }
      
          private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
              return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                      jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
          }
      
          private List<SimpleJob> schedulers() {
              List<SimpleJob> jobList = new ArrayList<>(ApplicationContextProvider.getBeansOfType(SimpleJob.class).values());
              jobList.forEach(job -> System.out.println(job.getClass().getCanonicalName()));
              return jobList;
          }
      }
      

      在application.yml配置文件中添加以下配置:

      simpleJob:
          com.dangdang.ddframe.job.example.job.simple: #job的包名
              CourseSimpleJob: #job的类名
                  cron: 0/30 * * * * ?
                  shardingTotalCount: 1
                  shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
              SpringSimpleJob: #job的类名
                  cron: 0/25 * * * * ?
                  shardingTotalCount: 3
                  shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      

      很显然这里的做法是通过将job的相关配置参数集中到一个文件里面,好处是配置参数可以统一起来集中管理。

      3.4.Dataflow类型作业配置

      @Configuration
      public class DataflowJobConfig {
          
          @Resource
          private ZookeeperRegistryCenter regCenter;
          
          @Resource
          private JobEventConfiguration jobEventConfiguration;
          
          @Bean
          public DataflowJob dataflowJob() {
              return new SpringDataflowJob(); 
          }
          
          @Bean(initMethod = "init")
          public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
                                              @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
              return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
          }
          
          private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
              return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
                      jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
          }
      }
      

      这里在dataflowJobScheduler方法上的@Bean注解中有一个initMethod = "init",这表示在创建JobScheduler的实例之后会调用JobSchedulerinit方法。而在前面2.3.3.Simple类型作业配置中我们是直接显示的调用init方法的。

      3.5.Script类型作业配置

      脚本类型的作业配置这里就不做展开了,直接参考上面的官方配置即可,或者根据上面的经验进行简单的修改。

    4. 作业运行

      正常地启动springboot项目即可,作业会根据cron参数的配置自动执行。

    部署

    正常部署应用即可

    1. 启动Elastic-Job-Lite指定注册中心的Zookeeper。
    2. 运行包含Elastic-Job-Lite和业务代码的jar文件。不限与jar或war的启动方式。

    运维平台(可选)

    1. 解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。

    2. 打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。

      elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。

    最后附上官方文档大全地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/

    相关文章

      网友评论

          本文标题:Elastic-job分布式任务调度框架

          本文链接:https://www.haomeiwen.com/subject/khkwgqtx.html