本文将从以下几个方面展开介绍elastic-job
简介
Elastic-job是由当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-job-lite:
定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
仅关注点分布式调度、协调以及分片等核心功能
Elastic-Job-Cloud :
提供一体化私有云服务,将分布式调度、作业部署、资源分配、监控、日志处理等提供完善的解决方案。
功能更加完善,但使用复杂度较高。使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务
功能特点:
功能 | Elastic-job-lite | Elastic-Job-Cloud |
---|---|---|
分布式调度协调 | 有 | 有 |
弹性扩容缩容 | 有 | 有 |
失效转移 | 有 | 有 |
错过执行作业重触发 | 有 | 有 |
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例 | 有 | 有 |
支持并行调度 | 有 | 有 |
支持作业生命周期操作 | 有 | 有 |
丰富的作业类型 | 有 | 有 |
Spring整合以及命名空间提供 | 有 | 有 |
运维平台 | 有 | 有 |
自诊断并修复分布式不稳定造成的问题 | 有 | |
应用自动分发 | 有 | |
基于Fenzo的弹性资源分配 | 有 | |
基于Docker的进程隔离(TBD) | 有 |
开发指南
基于目前流行的springboot框架来讲解如何在springboot中集成elastic-job,所以在进行下面的操作之前我们先得创建一个springboot项目,至于如何创建springboot项目这里就不做介绍了,访问这个网站:https://start.spring.io/
-
引入maven依赖
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency>
-
作业开发
Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。
2.1.Simple类型作业
意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
public class CourseSimpleJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
这里的
context.getShardingItem()
为当前分片序号,最大值为分片总数减1(n-1)。例如:分片总数为10,那么context.getShardingItem()
的最大值则为9。2.2.Dataflow类型作业
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
public class SpringDataflowJob implements DataflowJob<Foo> { @Override public List<Foo> fetchData(ShardingContext context) { switch (context.getShardingItem()) { case 0: List<Foo> data = // get data from database by sharding item 0 return data; case 1: List<Foo> data = // get data from database by sharding item 1 return data; case 2: List<Foo> data = // get data from database by sharding item 2 return data; // case n: ... } } @Override public void processData(ShardingContext shardingContext, List<Foo> data) { // process data // ... } }
流式处理
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。
2.3.Script类型作业
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
#!/bin/bash echo sharding execution context is $*
-
作业配置
3.1.zookeeper配置
新建RegistryCenterConfig类
@Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class RegistryCenterConfig { private final Logger log = LoggerFactory.getLogger(RegistryCenterConfig.class); @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { log.info("---------regCenter.serverList: {}---------", serverList); return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
在配置文件application.yml中添加这两个属性
regCenter: serverList: 127.0.0.1:2181 #zookeeper地址 namespace: elastic-job-lite-springboot #命名空间
3.2.配置事件追踪
关于事件追踪这里只做代码展示,具体详细请参考:http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/
新建JobEventConfig类:
@Configuration public class JobEventConfig { @Resource private DataSource dataSource; @Bean public JobEventConfiguration jobEventConfiguration() { return new JobEventRdbConfiguration(dataSource); } }
在配置文件application.yml中添加数据源相关的配置
spring: datasource: url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false driver-class-name: com.mysql.jdbc.Driver username: root password: 123456 tomcat: max-wait: 10000 min-idle: 0 initial-size: 25 validation-query: SELECT 1 test-on-borrow: false test-while-idle: true time-between-eviction-runs-millis: 18800 remove-abandoned: true remove-abandoned-timeout: 180
这里是在demo中这么配置数据源,一般在现成的项目中都会有相应的数据源配置,无需更改,只需直接将DataSource注入JobEventConfig中即可。
官方配置
这里我们先看看官方给的通用配置:
// 定义作业核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName()); // 定义Lite作业根配置 JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); // 定义作业核心配置 JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build(); // 定义DATAFLOW类型配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true); // 定义Lite作业根配置 JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build(); // 定义作业核心配置配置 JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build(); // 定义SCRIPT类型配置 ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh"); // 定义Lite作业根配置 JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();
这里官方只是告诉你他的api是这么用的,但是实际工作中我们肯定不能生搬硬套,不然每一个job都要写一个配置,那多麻烦,有时候还会忘记。因为在实际工作中肯定是有很多个job,其实我们完全可以写一个通用的配置类,然后将job配置的相关参数提取到配置文件中或者直接在job类中添加自定义注解,然后在注解中添加相关配置,最后在配置类中去扫描这些job的相关配置并遍历它生成对应的job配置实例,这样作业的配置代码只需要编写一次即可。
3.3.Simple类型作业配置
这里以Simple类型作业为例来简单的编写一个Simple类型通用的作业配置
@Configuration public class SimpleJobConfig { private static final String JOB_TYPE = "simpleJob"; public static final String CRON = "cron"; public static final String SHARDING_TOTAL_COUNT = "shardingTotalCount"; public static final String SHARDING_ITEM_PARAMETERS = "shardingItemParameters"; private final Environment env; @Resource private ZookeeperRegistryCenter regCenter; @Resource private JobEventConfiguration jobEventConfiguration; @Autowired public SimpleJobConfig(Environment env) { this.env = env; } @Bean public List<JobScheduler> simpleJobScheduler() { List<SimpleJob> simpleJobs = schedulers(); List<JobScheduler> jobSchedulers = new ArrayList<>(); Joiner joiner = Joiner.on("."); JobScheduler scheduler; for (SimpleJob job : simpleJobs) { String propertiesPrefix = joiner.join(JOB_TYPE, job.getClass().getCanonicalName()); String cron = env.getProperty(joiner.join(propertiesPrefix, CRON)); int shardingTotalCount = Integer.valueOf(env.getProperty(joiner.join(propertiesPrefix, SHARDING_TOTAL_COUNT))); String shardingItemParameters = env.getProperty(joiner.join(propertiesPrefix, SHARDING_ITEM_PARAMETERS)); if (shardingTotalCount > 0 && StringUtils.isNoneBlank(cron) && StringUtils.isNoneBlank(shardingItemParameters) ) { scheduler = new SpringJobScheduler(job, regCenter, getLiteJobConfiguration(job.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration); scheduler.init(); jobSchedulers.add(scheduler); } } return jobSchedulers; } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build(); } private List<SimpleJob> schedulers() { List<SimpleJob> jobList = new ArrayList<>(ApplicationContextProvider.getBeansOfType(SimpleJob.class).values()); jobList.forEach(job -> System.out.println(job.getClass().getCanonicalName())); return jobList; } }
在application.yml配置文件中添加以下配置:
simpleJob: com.dangdang.ddframe.job.example.job.simple: #job的包名 CourseSimpleJob: #job的类名 cron: 0/30 * * * * ? shardingTotalCount: 1 shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou SpringSimpleJob: #job的类名 cron: 0/25 * * * * ? shardingTotalCount: 3 shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
很显然这里的做法是通过将job的相关配置参数集中到一个文件里面,好处是配置参数可以统一起来集中管理。
3.4.Dataflow类型作业配置
@Configuration public class DataflowJobConfig { @Resource private ZookeeperRegistryCenter regCenter; @Resource private JobEventConfiguration jobEventConfiguration; @Bean public DataflowJob dataflowJob() { return new SpringDataflowJob(); } @Bean(initMethod = "init") public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount, @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration); } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build(); } }
这里在
dataflowJobScheduler
方法上的@Bean
注解中有一个initMethod = "init"
,这表示在创建JobScheduler
的实例之后会调用JobScheduler
的init
方法。而在前面2.3.3.Simple类型作业配置中我们是直接显示的调用init
方法的。3.5.Script类型作业配置
脚本类型的作业配置这里就不做展开了,直接参考上面的官方配置即可,或者根据上面的经验进行简单的修改。
-
作业运行
正常地启动springboot项目即可,作业会根据
cron
参数的配置自动执行。
部署
正常部署应用即可
- 启动Elastic-Job-Lite指定注册中心的Zookeeper。
- 运行包含Elastic-Job-Lite和业务代码的jar文件。不限与jar或war的启动方式。
运维平台(可选)
-
解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。
-
打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。
elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。
最后附上官方文档大全地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/
网友评论