美文网首页
es-job 使用详解-1

es-job 使用详解-1

作者: 大黑跟小白的日常 | 来源:发表于2020-08-19 17:22 被阅读0次

    项目说明

    当前工程是基于 spring-boot 构建的 spring-cloud 工程下的一个 job工程;

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
            <spring-cloud-alibaba.version>2.1.2.RELEASE</spring-cloud-alibaba.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        </properties>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.cloud</groupId>
                    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                    <version>${spring-cloud-alibaba.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    

    pom 依赖

            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-spring</artifactId>
                <version>2.1.5</version>
                <!--    解决esjob curator 跟 springcloud 冲突问题    -->
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-recipes</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-framework</artifactId>
                    </exclusion>
                    <exclusion>
                        <artifactId>curator-client</artifactId>
                        <groupId>org.apache.curator</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--    解决esjob curator 跟 springcloud 冲突问题    -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>2.12.0</version>
            </dependency>
    

    Java 配置类,配置 任务注册中心 及 任务事件配置

    package com.itcast.cupid.esjob.config;
    
    import com.dangdang.ddframe.job.event.JobEventConfiguration;
    import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
    import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.annotation.Order;
    
    import javax.sql.DataSource;
    
    /**
     * @Author G_Y
     * @Date 2020/8/15 8:08
     * @Description: // ElasticJob 框架配置类
     **/
    @Configuration
    @RefreshScope
    @Order(0)
    public class EsJobConfig {
        // zk 连接信息
        @Value("${esjob.zkServerlists}")
        private String zkServerList;
    
        // esjob 命名空间
        @Value("${esjob.zkNamespace}")
        private String zkNamespace;
    
        @Autowired
        private DataSource dataSource; //数据源已经存在,直接引入
    
        /**
         * zk配置
         * @return
         */
        @Bean("zookeeperConfiguration")
        public ZookeeperConfiguration zookeeperConfiguration() {
            return new ZookeeperConfiguration(zkServerList, zkNamespace);
        }
    
        // 初始化zk 任务注册中心
        @Bean(initMethod = "init")
        public CoordinatorRegistryCenter esjobRegistryCenter(@Qualifier("zookeeperConfiguration") ZookeeperConfiguration config) {
            return new ZookeeperRegistryCenter(config);
        }
    
        /**
         * 将作业运行的痕迹进行持久化到DB
         *
         * @return
         */
        @Bean
        public JobEventConfiguration jobEventConfiguration() {
            return new JobEventRdbConfiguration(dataSource);
        }
    
    }
    

    具体配置信息

    image.png

    配置 esjob 监听器,用于具体任务

    /**
     * @Author G_Y
     * @Date 2020/8/15 8:36
     * @Description: // esjob 监听器
     * 接下来实现一个分布式的任务监听器,
     * 如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。
     * 监听器在之前的ElasticJobConfig已经注册到了Spring容器之中
     **/
    @Component
    public class MyElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    
        public MyElasticJobListener(@Value("${esjob.startedTimeoutMilliseconds}") long startedTimeoutMilliseconds,
                                    @Value("${esjob.completedTimeoutMilliseconds}")  long completedTimeoutMilliseconds) {
            super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
        }
    
        @Override
        public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
            System.out.println("----doBeforeJobExecutedAtLastStarted--");
        }
    
        @Override
        public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
            System.out.println("-----doAfterJobExecutedAtLastCompleted----");
        }
    }
    

    定义具体任务

    /**
     * @Author G_Y
     * @Date 2020/8/19 16:17
     * @Description: // 简单任务
     **/
    @Component
    @Slf4j
    public class SimpleTestJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("作业分片:"+shardingContext.getShardingItem());
            //分片参数,(0=aaa,1=bbb,2=ccc,3=ddd,参数就是aaa、bbb...)
            String jobParameter = shardingContext.getJobParameter();
            String shardingParameter = shardingContext.getShardingParameter();
    
            //打印出任务相关信息,JobParameter用于传递任务的ID
            log.info("任务名:{}, 片数:{}, shardingParameter={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                    shardingParameter);
    
            System.out.println("==================="+jobParameter+"===================");
            System.out.println("=================="+shardingParameter+"===================");
    
            // 业务操作
    
            System.out.println("==============业务操作===============");
            // Feign 调度 service 执行具体业务处理 逻辑
        }
    }
    

    配置具体任务的调度方案并开启调度执行

    @Configuration
    @Order(10)
    public class EsJobTaskConfig {
        // 如下 都是 已经 提前 配置过的  job任务 需要 用到 的组件
        @Autowired
        private CoordinatorRegistryCenter regCenter;
    
        @Autowired
        private MyElasticJobListener myElasticJobListener;
    
        @Autowired
        private JobEventConfiguration jobEventConfiguration;
    
        //每个bean代表了一个定时任务执行策略的添加
        @Bean(initMethod = "init")
        public JobScheduler createSimpleTestJobShedule(@Qualifier("simpleTestJob") final SimpleTestJob job) {
            // final String jobName = "[-test es job 1-]testSimpleJob";
            final String cron = "*/20 * * * * ?";//每20s执行一次
            int testJob1ShardingTotalCount = 2;
            final String shardingItemParameters = "0=aaa,1=bbb";
            LiteJobConfiguration jobConfiguration = createJobConfiguration(job.getClass(),
                    cron, testJob1ShardingTotalCount, shardingItemParameters, "testSimpleJob2");
            return new SpringJobScheduler(job, regCenter, jobConfiguration, jobEventConfiguration, myElasticJobListener);
        }
    
        /**
         * 配置任务详细信息
         * @param jobClass 任务执行类
         * @param cron  执行策略
         * @param shardingTotalCount 分片数量
         * @param shardingItemParameters 分片个性化参数
         * @return
         */
         /**
          * @Author G_Y
          * @Description: : 构建 执行 任务 参数配置 的方法
          * @Date 2020/8/15 8:58
          **/
        private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                            final String cron,
                                                            final int shardingTotalCount,
                                                            final String shardingItemParameters,
                                                            final String jobParameters){
            //JobCoreConfigurationBuilder
            JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
            //设置shardingItemParameters
            jobCoreConfigurationBuilder.failover(true);// 故障转移
            jobCoreConfigurationBuilder.jobParameter(jobParameters); // 还可以针对job设置条件参数
            if(StringUtils.isNoneEmpty(shardingItemParameters)){
                jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
            }
            JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();
            //创建SimpleJobConfiguration
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
            //创建LiteJobConfiguration
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                    .monitorPort(9888)//设置dump端口
                    .build();
            return liteJobConfiguration;
        }
    }
    

    启动测试

    单台机器


    image.png

    多台机器 具体 分片则会 分散到各机器执行

    优化版本 及 ui控制界面 待续

    相关文章

      网友评论

          本文标题:es-job 使用详解-1

          本文链接:https://www.haomeiwen.com/subject/qaaidktx.html