Elastic-job使用javacode方式实现
1.导入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
2.开发job
package com.elasticjob;
import java.util.Date;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.time.DateFormatUtils;
/**
* @Author: chao.zhu
* @description:
* @CreateDate: 2018/08/16
* @Version: 1.0
*/
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
try{
System.out.println(DateFormatUtils.format(new Date(),"yyyy-MM-dd hh:mm:ss")+":定时任务开始执行");
//设置zeekeeper的节点内容
String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZkClient zkClient = new ZkClient(ZK_CONNECTION,5000);
//创建临时节点
String v = zkClient.readData("/name",false);
System.out.println("/test内容:"+v);
zkClient.writeData("/name",v+"$");
}catch (Exception e){
e.printStackTrace();
}finally {
}
}
}
3.测试类
package com.elasticjob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
/**
* @Author: chao.zhu
* @description:
* @CreateDate: 2018/08/16
* @Version: 1.0
*/
public class JobTest2{
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 创建作业配置
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/30 * * * * ?", 1).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
碰到的坑
1.JobCoreConfiguration注册的是持久化zookeeper节点,并且不会更新节点数据。也就是说第一次注册的corn之后,修改corn不会发送变化。可以进入zookeeper中查看节点数据
[zk: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183(CONNECTED) 11] ls /elastic-job-demo/demoSimpleJob
[leader, servers, config, instances, sharding]
[zk: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183(CONNECTED) 12] get /elastic-job-demo/demoSimpleJob/config
{"jobName":"demoSimpleJob","jobClass":"com.elasticjob.MyElasticJob","jobType":"SIMPLE","cron":"0/30 * * * * ?","shardingTotalCount":1,"shardingItemParameters":"","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":false}
cZxid = 0xa00000433
ctime = Thu Aug 16 11:01:26 CST 2018
mZxid = 0xa00000433
mtime = Thu Aug 16 11:01:26 CST 2018
pZxid = 0xa00000433
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 613
numChildren = 0
知道zookeeper的人应该明白我们所有的定时任务节点都会在Scheduler下面。我们在使用zookeeper的时候创建了命名空间,也就是后续的定时任务节点都会在这个命名空间下面。
节点 | 含义 |
---|---|
leader | 保存选举获得的master节点 |
servers | 集群中运行任务的节点IP |
config | 配置的corn,定时任务名称等数据都存放在这个节点下 |
instances | 多个服务器启动时每个服务器都会注册一个实例到这个节点下 |
sharding | 分片信息 |
LiteJobConfiguration里面有个overwrite是可以重写定时任务配置
Elastic-job使用spring命名空间配置
导入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.6-SNAPSHOT</version>
</dependency>
Elastic_job监控中心
启动监控中心
1.使用mvn install编译打包elastic-job-lite-console
2.打包完毕会生产elastic-job-lite-console-2.1.6-SNAPSHOT.tar.gz
3.解压运行start.sh
4.登录:http://localhost:8899/
5.添加注册中心
Elastic_job源码分析
//创建定时任务配置信息
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0 0/1 * * * ?", 1).build();
//使用简单任务包装上面创建的定时任务配置信息。除了SimpleJob还有两种定时任务类型
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
//定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
//定义配置中心,zookeeper
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
regCenter.init();
//将所有信息整合到JobScheduler
new JobScheduler(regCenter,simpleJobRootConfig).init();
编程式开发定时任务采用了装饰者模式
。
- 作业初始化操作
/**
* 初始化作业.
*/
public void init() {
//创建命名空间节点,创建任务节点,创建任务的配置属性节点config。
//config内容:{"jobName":"demoSimpleJob","jobClass":"com.elasticjob.MyElasticJob","jobType":"SIMPLE","cron":"0/30 * * * * ?","shardingTotalCount":1,"shardingItemParameters":"","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":false}
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
//分片信息,将任务的名字作为key,分片大小作为value,放到map里面
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
//创建Quartz需要的类
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(),
createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),
liteJobConfigFromRegCenter.getJobName());
//1.用job的名字对应jobScheduleController放到map中
//2.用job的名字对应注册中心regCenter放到map中
//3.将任务节点的name写到treeCache里面,没有写到zk中
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
//注册,并启动任务!这个方法比较重
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
//使用原生的quartz启动定时任务
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
- 分析:
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
/**
* 更新作业配置.
*
* @param liteJobConfig 作业配置
* @return 更新后的作业配置
*/
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
//持久化作业配置。将liteJobConfig写到zk里面
configService.persist(liteJobConfig);
//然后重新从zk里面读取出来
return configService.load(false);
}
- 分析:
schedulerFacade.registerStartUpInfo
/**
* 注册作业启动信息.
* SchedulerFacade的门面类主要是用来操作zk的。下面的各种service类其实都是利用zk和jobName来做各种事情。
* 看之前一定要记住是在分布式环境下来分析这个方法!!切记切记
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
//TODO 重要方法,需要仔细看
//开启所有的监听。这个方法很重要!!!,用来监听任务的状态
listenerManager.startAllListeners();
//选举主节点,采用的方式是curator的LeaderLatch来选举
leaderService.electLeader();
//将服务器IP地址写到jobName/servers/ip 写到zk里面
serverService.persistOnline(enabled);
//将运行的实例写到zk里面。节点为:jobName/instances/ip@-@进程号
instanceService.persistOnline();
//持久化分片节点,节点:jobName/sharding
shardingService.setReshardingFlag();
//监控服务的监听器,需要开启才会进行监听,开启方式:jobConfig.setMonitorPort(9888);
monitorService.listen();
//自诊断修复
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
网友评论