项目说明
当前工程是基于 spring-boot 构建的 spring-cloud 工程下的一个 job工程;
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR4</spring-cloud.version>
<spring-cloud-alibaba.version>2.1.2.RELEASE</spring-cloud-alibaba.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
pom 依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
<!-- 解决esjob curator 跟 springcloud 冲突问题 -->
<exclusions>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 解决esjob curator 跟 springcloud 冲突问题 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
Java 配置类,配置 任务注册中心 及 任务事件配置
package com.itcast.cupid.esjob.config;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import javax.sql.DataSource;
/**
* @Author G_Y
* @Date 2020/8/15 8:08
* @Description: // ElasticJob 框架配置类
**/
@Configuration
@RefreshScope
@Order(0)
public class EsJobConfig {
// zk 连接信息
@Value("${esjob.zkServerlists}")
private String zkServerList;
// esjob 命名空间
@Value("${esjob.zkNamespace}")
private String zkNamespace;
@Autowired
private DataSource dataSource; //数据源已经存在,直接引入
/**
* zk配置
* @return
*/
@Bean("zookeeperConfiguration")
public ZookeeperConfiguration zookeeperConfiguration() {
return new ZookeeperConfiguration(zkServerList, zkNamespace);
}
// 初始化zk 任务注册中心
@Bean(initMethod = "init")
public CoordinatorRegistryCenter esjobRegistryCenter(@Qualifier("zookeeperConfiguration") ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
/**
* 将作业运行的痕迹进行持久化到DB
*
* @return
*/
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
具体配置信息
image.png配置 esjob 监听器,用于具体任务
/**
* @Author G_Y
* @Date 2020/8/15 8:36
* @Description: // esjob 监听器
* 接下来实现一个分布式的任务监听器,
* 如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。
* 监听器在之前的ElasticJobConfig已经注册到了Spring容器之中
**/
@Component
public class MyElasticJobListener extends AbstractDistributeOnceElasticJobListener {
public MyElasticJobListener(@Value("${esjob.startedTimeoutMilliseconds}") long startedTimeoutMilliseconds,
@Value("${esjob.completedTimeoutMilliseconds}") long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
System.out.println("----doBeforeJobExecutedAtLastStarted--");
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
System.out.println("-----doAfterJobExecutedAtLastCompleted----");
}
}
定义具体任务
/**
* @Author G_Y
* @Date 2020/8/19 16:17
* @Description: // 简单任务
**/
@Component
@Slf4j
public class SimpleTestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("作业分片:"+shardingContext.getShardingItem());
//分片参数,(0=aaa,1=bbb,2=ccc,3=ddd,参数就是aaa、bbb...)
String jobParameter = shardingContext.getJobParameter();
String shardingParameter = shardingContext.getShardingParameter();
//打印出任务相关信息,JobParameter用于传递任务的ID
log.info("任务名:{}, 片数:{}, shardingParameter={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
shardingParameter);
System.out.println("==================="+jobParameter+"===================");
System.out.println("=================="+shardingParameter+"===================");
// 业务操作
System.out.println("==============业务操作===============");
// Feign 调度 service 执行具体业务处理 逻辑
}
}
配置具体任务的调度方案并开启调度执行
@Configuration
@Order(10)
public class EsJobTaskConfig {
// 如下 都是 已经 提前 配置过的 job任务 需要 用到 的组件
@Autowired
private CoordinatorRegistryCenter regCenter;
@Autowired
private MyElasticJobListener myElasticJobListener;
@Autowired
private JobEventConfiguration jobEventConfiguration;
//每个bean代表了一个定时任务执行策略的添加
@Bean(initMethod = "init")
public JobScheduler createSimpleTestJobShedule(@Qualifier("simpleTestJob") final SimpleTestJob job) {
// final String jobName = "[-test es job 1-]testSimpleJob";
final String cron = "*/20 * * * * ?";//每20s执行一次
int testJob1ShardingTotalCount = 2;
final String shardingItemParameters = "0=aaa,1=bbb";
LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),
cron, testJob1ShardingTotalCount, shardingItemParameters, "testSimpleJob2");
return new SpringJobScheduler(job, regCenter, jobConfiguration, jobEventConfiguration, myElasticJobListener);
}
/**
* 配置任务详细信息
* @param jobClass 任务执行类
* @param cron 执行策略
* @param shardingTotalCount 分片数量
* @param shardingItemParameters 分片个性化参数
* @return
*/
/**
* @Author G_Y
* @Description: : 构建 执行 任务 参数配置 的方法
* @Date 2020/8/15 8:58
**/
private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters){
//JobCoreConfigurationBuilder
JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
//设置shardingItemParameters
jobCoreConfigurationBuilder.failover(true);// 故障转移
jobCoreConfigurationBuilder.jobParameter(jobParameters); // 还可以针对job设置条件参数
if(StringUtils.isNoneEmpty(shardingItemParameters)){
jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();
//创建SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
//创建LiteJobConfiguration
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
.monitorPort(9888)//设置dump端口
.build();
return liteJobConfiguration;
}
}
启动测试
单台机器
image.png
多台机器 具体 分片则会 分散到各机器执行
优化版本 及 ui控制界面 待续
网友评论