Elastic-Job官网地址:http://elasticjob.io/index_zh.html
Elastic-Job-Lite官方文档地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/
elastic-job的优缺点
优点:与spring继承、支持分布式、支持集群
缺点:动态添加的任务,分布式中其他机器不能执行
环境准备
maven、MySQL、zookeeper
zookeeper集群启动
1、下载zookeeper
2、修改zoo_sample.cfg为zoo.cfg
3、复制3份zookeeper应用
4、修改配置文件zoo.cfg
image.png
5、创建myid文件
在盘符根目录下创建tmp目录,然后在tmp目录下创建3个zookeeper-1,zookeeper-2,zookeeper-3的目录,然后分别在目录中创建myid文件内容值分别为1、2、3
6、zkServer.cmd分别运行zookeeper三台应用
任务类型
1、simple类型任务
2、dataflow类型任务 (流式任务)
3、script类型
分片的概念
将一个任务拆分成多个独立的任务,每个服务获得一个或多个分片项。
作业的高可用
将分片 分为1片,多于1台服务器执行作业,1主n从。
当主宕机后,从服务器将启动。
极速入门
simple任务
1、简单定时任务的简单实现,只需实现execute方法
2、提供了弹性扩容和分片功能
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("当前分配项:"+shardingContext.getShardingItem()
+",总分片项:");
}
}
public static void main(String[] args) {
new JobScheduler(zkCenter(), jobConfig()).init();
}
/**
* 注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "simple-job");
CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
crc.init();
return crc;
}
public static LiteJobConfiguration jobConfig() {
String jobName="testJob";
String cron="0/10 * * * * ?";
//分片数
int shardingTotalCount=2;
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
// job类型配置
JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
dataflow
image.pngpublic class MyDataflowJob implements DataflowJob<Order> {
private List<Order> orders = new ArrayList<Order>(100);
{
for (int i = 0; i < 100; i++) {
Order order = new Order();
order.setOrderId(i + 1);
order.setStatus(0);
orders.add(order);
}
}
/**
* 抓取数据
*/
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
// 状态为 未处理,订单号%分片总数==当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0).filter(
o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(Collectors.toList());
List<Order> subList = null;
if (orderList != null && orderList.size() > 0) {
// 每次处理10条
subList = orderList.subList(0, 10);
}
// 模拟耗时
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("抓取数据 当前分片" + shardingContext.getShardingItem()+"===>"+subList);
return subList;
}
/**
* 处理数据
*/
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
data.forEach(o -> o.setStatus(1));
// 模拟耗时
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("处理数据 当前分片" + shardingContext.getShardingItem());
}
}
public static void main(String[] args) {
new JobScheduler(zkCenter(), jobConfig()).init();
}
/**
* 注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "dataflow-job");
CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
crc.init();
return crc;
}
public static LiteJobConfiguration jobConfig() {
String jobName="testJob";
String cron="0/10 * * * * ?";
//分片数
int shardingTotalCount=2;
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
//是否开启流式任务
boolean streamingProcess=true;
// job类型配置
JobTypeConfiguration jtc=new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), streamingProcess);
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
script任务
public static void main(String[] args) {
new JobScheduler(zkCenter(), jobConfig()).init();
}
/**
* 注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "script-job");
CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
crc.init();
return crc;
}
public static LiteJobConfiguration jobConfig() {
String jobName="scriptJob";
String cron="0/10 * * * * ?";
//分片数
int shardingTotalCount=2;
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).misfire(false).build();
//是否开启流式任务
boolean streamingProcess=true;
// job类型配置
String scriptAddress="C:\\java\\workdata\\test.cmd";
JobTypeConfiguration jtc=new ScriptJobConfiguration(jcc, scriptAddress);
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
test.cmd脚本内容
echo sharding execution context is %1
报错:JobConfigurationException: Execute script failure.
image.png
解决不了。
使用spring schema整合
pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
web.xml
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<display-name>spring-job</display-name>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:spring-config.xml</param-value>
</context-param>
<!-- 配置监听器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
</web-app>
public class MySimpleJob implements SimpleJob{
public void execute(ShardingContext shardingContext) {
System.out.println("我是分片项===》"+shardingContext.getShardingItem());
}
}
spring-config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!-- 注册中心配置 -->
<reg:zookeeper namespace="spring-job" server-lists="localhost:2181" id="zkCenter"/>
<!-- simple作业配置 -->
<job:simple id="mySimpleJob1" overwrite="true" sharding-total-count="1" cron="*/5 * * * * ?" registry-center-ref="zkCenter" class="com.fuiou.job.MySimpleJob"></job:simple>
</beans>
注意job:simple一定要有id。 overwrite="true"
java api 整合simple作业
和上面的快速入门一致
springboot整合simple作业
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
application.properties
#elaticJob
elasticjob.zookeeper.server-list=localhost:2181
elasticjob.zookeeper.namespace=spring-job
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@Getter
@Setter
public class ZookeeperProperties {
//zookeeper的地址列表
private String serverList;
//zookeeper的命名空间
private String namespace;
}
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
/**
* 注册中心
*
* @return
*/
@Bean(initMethod = "init")
public CoordinatorRegistryCenter zkCenter() {
String serverList = zookeeperProperties.getServerList();
String namespace = zookeeperProperties.getNamespace();
ZookeeperConfiguration zc = new ZookeeperConfiguration(serverList, namespace);
CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
return crc;
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
String jobName() default "";
String cron() default "";
int shardingTotalCount() default 1;
boolean overwrite() default false;
}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CoordinatorRegistryCenter zkCenter;
@PostConstruct
public void initSimpleJob() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()) {
Object instance = entry.getValue();
Class<?>[] instances = instance.getClass().getInterfaces();
for (Class<?> superInstance : instances) {
// 如果实现了SimpleJob接口
if (superInstance == SimpleJob.class) {
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
// job核心参数
JobCoreConfiguration jcc=JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount).build();
// job类型配置
JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
// job根配置
LiteJobConfiguration ljc= LiteJobConfiguration
.newBuilder(jtc)
.overwrite(overwrite)
.build();
// 加入到JobScheduler
new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
}
}
}
}
}
@ElasticSimpleJob(jobName = "mySimpleJob",cron = "*/10 * * * * ?",shardingTotalCount = 2,overwrite = true)
public class MySimpleJob implements SimpleJob{
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("我是分片项==》"+shardingContext.getShardingItem()+"===总分片项是===》"+shardingContext.getShardingTotalCount());
}
}
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.config.ZookeeperAutoConfig,com.config.SimpleJobAutoConfig
注意:SimpleJobAutoConfig 中使用了@AutoConfigureAfter注解来控制 SimpleJobAutoConfig一定是ZookeeperAutoConfig之后加载的。
@AutoConfigureAfter直接使用并不能生效,需要在resources文件夹下创建META-INF,然后创建spring.factories。
spring.factories中的配置的类,要挪动到Application扫描不到的包内!扫描不到!
dataflow整合
和simple任务基本一致,注意别忘了streamingProcess属性。
后记
我通过打印当前线程名Thread.currentThread().getName()发现,不同任务之间是不同的线程,同一个任务之间不同分片项也是不同的线程,但是同一个任务,同一个分片项是同一个线程。但是同一个任务不同的分片即使是不同的线程也会阻塞,不太明白。
结论:不同的任务不会发生阻塞,同一个任务会发生阻塞。可以使用@EnableAsync+@Async 异步执行。
一般而言会预估定时器的执行耗费时间,避免在上一个定时器执行时间尚未完成,就来到下一个定时器的执行时间。
网友评论