美文网首页
定时器(elastic-job 一)

定时器(elastic-job 一)

作者: 寂静的春天1988 | 来源:发表于2020-06-16 09:26 被阅读0次

Elastic-Job官网地址:http://elasticjob.io/index_zh.html
Elastic-Job-Lite官方文档地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/

elastic-job的优缺点

优点:与spring继承、支持分布式、支持集群
缺点:动态添加的任务,分布式中其他机器不能执行

环境准备

maven、MySQL、zookeeper

zookeeper集群启动

1、下载zookeeper
2、修改zoo_sample.cfg为zoo.cfg
3、复制3份zookeeper应用
4、修改配置文件zoo.cfg


image.png

5、创建myid文件
在盘符根目录下创建tmp目录,然后在tmp目录下创建3个zookeeper-1,zookeeper-2,zookeeper-3的目录,然后分别在目录中创建myid文件内容值分别为1、2、3
6、zkServer.cmd分别运行zookeeper三台应用

任务类型

1、simple类型任务
2、dataflow类型任务 (流式任务)
3、script类型

分片的概念

将一个任务拆分成多个独立的任务,每个服务获得一个或多个分片项。

作业的高可用

将分片 分为1片,多于1台服务器执行作业,1主n从。
当主宕机后,从服务器将启动。

极速入门

simple任务

1、简单定时任务的简单实现,只需实现execute方法
2、提供了弹性扩容和分片功能

public class MySimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("当前分配项:"+shardingContext.getShardingItem()
        +",总分片项:");
    }

}
    public static void main(String[] args) {
        new JobScheduler(zkCenter(), jobConfig()).init();
    }
    
    
    /**
     * 注册中心
     * @return
     */
    public static CoordinatorRegistryCenter zkCenter() {
        ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "simple-job");
        CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
        crc.init();
        return crc;
    }
    
    public static LiteJobConfiguration jobConfig() {
        
        String jobName="testJob";
        String cron="0/10 * * * * ?";
        //分片数
        int shardingTotalCount=2;
        // job核心参数
        JobCoreConfiguration jcc=JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount).build();
        // job类型配置
        JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());
        // job根配置
        LiteJobConfiguration ljc= LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();
        return ljc;
    }

dataflow

image.png
public class MyDataflowJob implements DataflowJob<Order> {

    private List<Order> orders = new ArrayList<Order>(100);
    {
        for (int i = 0; i < 100; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            order.setStatus(0);
            orders.add(order);
        }
    }

    /**
     * 抓取数据
     */
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        // 状态为 未处理,订单号%分片总数==当前分片项
        List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0).filter(
                o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
                .collect(Collectors.toList());

        List<Order> subList = null;
        if (orderList != null && orderList.size() > 0) {
            // 每次处理10条
            subList = orderList.subList(0, 10);
        }
        // 模拟耗时
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        System.out.println("抓取数据 当前分片" + shardingContext.getShardingItem()+"===>"+subList);

        return subList;
    }

    /**
     * 处理数据
     */
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) {
        data.forEach(o -> o.setStatus(1));
        // 模拟耗时
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        System.out.println("处理数据 当前分片" + shardingContext.getShardingItem());

    }

}
    public static void main(String[] args) {
        new JobScheduler(zkCenter(), jobConfig()).init();
    }
    
    
    /**
     * 注册中心
     * @return
     */
    public static CoordinatorRegistryCenter zkCenter() {
        ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "dataflow-job");
        CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
        crc.init();
        return crc;
    }
    
    public static LiteJobConfiguration jobConfig() {
        String jobName="testJob";
        String cron="0/10 * * * * ?";
        //分片数
        int shardingTotalCount=2;
        // job核心参数
        JobCoreConfiguration jcc=JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount).build();
        //是否开启流式任务
        boolean streamingProcess=true;
        // job类型配置
        JobTypeConfiguration jtc=new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), streamingProcess);
        // job根配置
        LiteJobConfiguration ljc= LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();
        return ljc;
    }

script任务


    public static void main(String[] args) {
        new JobScheduler(zkCenter(), jobConfig()).init();
    }
    
    
    /**
     * 注册中心
     * @return
     */
    public static CoordinatorRegistryCenter zkCenter() {
        ZookeeperConfiguration zc =new ZookeeperConfiguration("localhost:2181", "script-job");
        CoordinatorRegistryCenter crc=new ZookeeperRegistryCenter(zc);
        crc.init();
        return crc;
    }
    
    public static LiteJobConfiguration jobConfig() {
        String jobName="scriptJob";
        String cron="0/10 * * * * ?";
        //分片数
        int shardingTotalCount=2;
        // job核心参数
        JobCoreConfiguration jcc=JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount).misfire(false).build();
        //是否开启流式任务
        boolean streamingProcess=true;
        // job类型配置
        String scriptAddress="C:\\java\\workdata\\test.cmd";
        JobTypeConfiguration jtc=new ScriptJobConfiguration(jcc, scriptAddress);
        // job根配置
        LiteJobConfiguration ljc= LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();
        return ljc;
    }

test.cmd脚本内容
echo sharding execution context is %1

报错:JobConfigurationException: Execute script failure.


image.png

解决不了。

使用spring schema整合

pom文件

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.26</version>
        </dependency>
    </dependencies>

web.xml

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
  <display-name>spring-job</display-name>
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath*:spring-config.xml</param-value>
  </context-param>
   <!-- 配置监听器 -->
   <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
   </listener>
</web-app>
public class MySimpleJob implements SimpleJob{

    public void execute(ShardingContext shardingContext) {
        System.out.println("我是分片项===》"+shardingContext.getShardingItem());
    }

}

spring-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd">
    
    <!-- 注册中心配置 -->
    <reg:zookeeper namespace="spring-job" server-lists="localhost:2181" id="zkCenter"/>
    
    <!-- simple作业配置 -->
    <job:simple id="mySimpleJob1" overwrite="true" sharding-total-count="1" cron="*/5 * * * *  ?" registry-center-ref="zkCenter" class="com.fuiou.job.MySimpleJob"></job:simple>
</beans>

注意job:simple一定要有id。 overwrite="true"

java api 整合simple作业

和上面的快速入门一致

springboot整合simple作业

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

application.properties

#elaticJob
elasticjob.zookeeper.server-list=localhost:2181
elasticjob.zookeeper.namespace=spring-job
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@Getter
@Setter
public class ZookeeperProperties {
    
    //zookeeper的地址列表
    private String serverList;
    
    //zookeeper的命名空间
    private String namespace;
    
}
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {

    @Autowired
    private ZookeeperProperties zookeeperProperties;

    /**
     * 注册中心
     * 
     * @return
     */
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter() {
        String serverList = zookeeperProperties.getServerList();
        String namespace = zookeeperProperties.getNamespace();
        ZookeeperConfiguration zc = new ZookeeperConfiguration(serverList, namespace);
        CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
        return crc;
    }

}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {

    String jobName() default "";

    String cron() default "";

    int shardingTotalCount() default 1;

    boolean overwrite() default false;

}
@Configuration
// 如果配置了zookeeper注册中心
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;
    
    @PostConstruct
    public void initSimpleJob() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] instances = instance.getClass().getInterfaces();
            for (Class<?> superInstance : instances) {
                // 如果实现了SimpleJob接口
                if (superInstance == SimpleJob.class) {
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    
                    // job核心参数
                    JobCoreConfiguration jcc=JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount).build();
                    // job类型配置
                    JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                    // job根配置
                    LiteJobConfiguration ljc= LiteJobConfiguration
                            .newBuilder(jtc)
                            .overwrite(overwrite)
                            .build();
                    // 加入到JobScheduler
                    new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
                }
            }

        }
    }
}
@ElasticSimpleJob(jobName = "mySimpleJob",cron = "*/10 * * * * ?",shardingTotalCount = 2,overwrite = true)
public class MySimpleJob implements SimpleJob{

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("我是分片项==》"+shardingContext.getShardingItem()+"===总分片项是===》"+shardingContext.getShardingTotalCount());
        
    }

}

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.config.ZookeeperAutoConfig,com.config.SimpleJobAutoConfig

注意:SimpleJobAutoConfig 中使用了@AutoConfigureAfter注解来控制 SimpleJobAutoConfig一定是ZookeeperAutoConfig之后加载的。
@AutoConfigureAfter直接使用并不能生效,需要在resources文件夹下创建META-INF,然后创建spring.factories。
spring.factories中的配置的类,要挪动到Application扫描不到的包内!扫描不到!

详情见:https://blog.csdn.net/f641385712/article/details/105596178?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase

dataflow整合

和simple任务基本一致,注意别忘了streamingProcess属性。

后记

我通过打印当前线程名Thread.currentThread().getName()发现,不同任务之间是不同的线程,同一个任务之间不同分片项也是不同的线程,但是同一个任务,同一个分片项是同一个线程。但是同一个任务不同的分片即使是不同的线程也会阻塞,不太明白。

结论:不同的任务不会发生阻塞,同一个任务会发生阻塞。可以使用@EnableAsync+@Async 异步执行。
一般而言会预估定时器的执行耗费时间,避免在上一个定时器执行时间尚未完成,就来到下一个定时器的执行时间。

相关文章

网友评论

      本文标题:定时器(elastic-job 一)

      本文链接:https://www.haomeiwen.com/subject/pihgtktx.html