美文网首页
es-job 使用详解-1

es-job 使用详解-1

作者: 大黑跟小白的日常 | 来源:发表于2020-08-19 17:22 被阅读0次

项目说明

当前工程是基于 spring-boot 构建的 spring-cloud 工程下的一个 job工程;

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
        <spring-cloud-alibaba.version>2.1.2.RELEASE</spring-cloud-alibaba.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

pom 依赖

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
            <!--    解决esjob curator 跟 springcloud 冲突问题    -->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.curator</groupId>
                    <artifactId>curator-recipes</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.curator</groupId>
                    <artifactId>curator-framework</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-client</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--    解决esjob curator 跟 springcloud 冲突问题    -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.12.0</version>
        </dependency>

Java 配置类,配置 任务注册中心 及 任务事件配置

package com.itcast.cupid.esjob.config;

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

import javax.sql.DataSource;

/**
 * @Author G_Y
 * @Date 2020/8/15 8:08
 * @Description: // ElasticJob 框架配置类
 **/
@Configuration
@RefreshScope
@Order(0)
public class EsJobConfig {
    // zk 连接信息
    @Value("${esjob.zkServerlists}")
    private String zkServerList;

    // esjob 命名空间
    @Value("${esjob.zkNamespace}")
    private String zkNamespace;

    @Autowired
    private DataSource dataSource; //数据源已经存在,直接引入

    /**
     * zk配置
     * @return
     */
    @Bean("zookeeperConfiguration")
    public ZookeeperConfiguration zookeeperConfiguration() {
        return new ZookeeperConfiguration(zkServerList, zkNamespace);
    }

    // 初始化zk 任务注册中心
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter esjobRegistryCenter(@Qualifier("zookeeperConfiguration") ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }

    /**
     * 将作业运行的痕迹进行持久化到DB
     *
     * @return
     */
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }

}

具体配置信息

image.png

配置 esjob 监听器,用于具体任务

/**
 * @Author G_Y
 * @Date 2020/8/15 8:36
 * @Description: // esjob 监听器
 * 接下来实现一个分布式的任务监听器,
 * 如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。
 * 监听器在之前的ElasticJobConfig已经注册到了Spring容器之中
 **/
@Component
public class MyElasticJobListener extends AbstractDistributeOnceElasticJobListener {

    public MyElasticJobListener(@Value("${esjob.startedTimeoutMilliseconds}") long startedTimeoutMilliseconds,
                                @Value("${esjob.completedTimeoutMilliseconds}")  long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("----doBeforeJobExecutedAtLastStarted--");
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("-----doAfterJobExecutedAtLastCompleted----");
    }
}

定义具体任务

/**
 * @Author G_Y
 * @Date 2020/8/19 16:17
 * @Description: // 简单任务
 **/
@Component
@Slf4j
public class SimpleTestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("作业分片:"+shardingContext.getShardingItem());
        //分片参数,(0=aaa,1=bbb,2=ccc,3=ddd,参数就是aaa、bbb...)
        String jobParameter = shardingContext.getJobParameter();
        String shardingParameter = shardingContext.getShardingParameter();

        //打印出任务相关信息,JobParameter用于传递任务的ID
        log.info("任务名:{}, 片数:{}, shardingParameter={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingParameter);

        System.out.println("==================="+jobParameter+"===================");
        System.out.println("=================="+shardingParameter+"===================");

        // 业务操作

        System.out.println("==============业务操作===============");
        // Feign 调度 service 执行具体业务处理 逻辑
    }
}

配置具体任务的调度方案并开启调度执行

@Configuration
@Order(10)
public class EsJobTaskConfig {
    // 如下 都是 已经 提前 配置过的  job任务 需要 用到 的组件
    @Autowired
    private CoordinatorRegistryCenter regCenter;

    @Autowired
    private MyElasticJobListener myElasticJobListener;

    @Autowired
    private JobEventConfiguration jobEventConfiguration;

    //每个bean代表了一个定时任务执行策略的添加
    @Bean(initMethod = "init")
    public JobScheduler createSimpleTestJobShedule(@Qualifier("simpleTestJob") final SimpleTestJob job) {
        // final String jobName = "[-test es job 1-]testSimpleJob";
        final String cron = "*/20 * * * * ?";//每20s执行一次
        int testJob1ShardingTotalCount = 2;
        final String shardingItemParameters = "0=aaa,1=bbb";
        LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),
                cron, testJob1ShardingTotalCount, shardingItemParameters, "testSimpleJob2");
        return new SpringJobScheduler(job, regCenter, jobConfiguration, jobEventConfiguration, myElasticJobListener);
    }

    /**
     * 配置任务详细信息
     * @param jobClass 任务执行类
     * @param cron  执行策略
     * @param shardingTotalCount 分片数量
     * @param shardingItemParameters 分片个性化参数
     * @return
     */
     /**
      * @Author G_Y
      * @Description: : 构建 执行 任务 参数配置 的方法
      * @Date 2020/8/15 8:58
      **/
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters,
                                                        final String jobParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //设置shardingItemParameters
        jobCoreConfigurationBuilder.failover(true);// 故障转移
        jobCoreConfigurationBuilder.jobParameter(jobParameters); // 还可以针对job设置条件参数
        if(StringUtils.isNoneEmpty(shardingItemParameters)){
            jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();
        //创建SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                .monitorPort(9888)//设置dump端口
                .build();
        return liteJobConfiguration;
    }
}

启动测试

单台机器


image.png

多台机器 具体 分片则会 分散到各机器执行

优化版本 及 ui控制界面 待续

相关文章

网友评论

      本文标题:es-job 使用详解-1

      本文链接:https://www.haomeiwen.com/subject/qaaidktx.html