美文网首页
定时器(elastic-job 二)

定时器(elastic-job 二)

作者: 寂静的春天1988 | 来源:发表于2020-06-17 17:58 被阅读0次

    分片策略

    平均分片策略:举例10个分片,3台服务。第一个台服务分片0,1,2,9。第二台服务分片3,4,5。第三台服务6,7,8。

    作业名的哈希值奇偶数决定IP升降序算法的分片策略:
    举例10个分片,3台服务。作业名哈希值为奇数就和平均分配一致。如果是偶数,第三台服务0,1,2,9。第二台服务分片3,4,5。第一个台服务分片6,7,8。

    作业名哈希值对服务器列表进行轮转:作业名对服务器总数取余,余数作为起点。
    举例10个分片,3台服务。如果hash值对3取余,如果余0,那么和平均分配一样,如果余1那么第二台服务器就是起点,第二台服务0,1,2,9。第三台服务分片3,4,5。第一台服务分片6,7,8。

    个人总结:说到底这三种策略还是平均分配策略,不过是起点的服务器不一样而已。

    自定义分片策略

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface ElasticSimpleJob {
    
        String jobName() default "";
    
        String cron() default "";
    
        int shardingTotalCount() default 1;
    
        boolean overwrite() default false;
        
        Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
    
    }
    
    public class MyShardingStrategy implements JobShardingStrategy {
    
        @Override
        public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName,
                int shardingTotalCount) {
            Map<JobInstance, List<Integer>> map=new HashMap<JobInstance, List<Integer>>();
            
            ArrayDeque<Integer> queue=new ArrayDeque<Integer>();
            for (int i = 0; i < shardingTotalCount; i++) {
                queue.add(i);
            }
            while(queue.size()>0) {
                for (JobInstance jobInstance : jobInstances) {
                    if(queue.size()>0) {
                        List<Integer> shardingItems=map.get(jobInstance);
                        if(shardingItems!=null&&shardingItems.size()>0) {
                            shardingItems.add(queue.pop());
                        }else {
                            List<Integer> newShardingItems = new ArrayList<Integer>();
                            newShardingItems.add(queue.pop());
                            map.put(jobInstance, newShardingItems);
                        }
                        
                    }
                }
            }
            
            return map;
        }
    
    }
    
    @Configuration
    // 如果配置了zookeeper注册中心
    @ConditionalOnBean(CoordinatorRegistryCenter.class)
    @AutoConfigureAfter(ZookeeperAutoConfig.class)
    public class SimpleJobAutoConfig {
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private CoordinatorRegistryCenter zkCenter;
        
        @PostConstruct
        public void initSimpleJob() {
            Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
            for (Map.Entry<String, Object> entry : beans.entrySet()) {
                Object instance = entry.getValue();
                Class<?>[] instances = instance.getClass().getInterfaces();
                for (Class<?> superInstance : instances) {
                    // 如果实现了SimpleJob接口
                    if (superInstance == SimpleJob.class) {
                        ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                        String jobName = annotation.jobName();
                        String cron = annotation.cron();
                        int shardingTotalCount = annotation.shardingTotalCount();
                        boolean overwrite = annotation.overwrite();
                        Class<?> jobStrategy = annotation.jobStrategy();
                        // job核心参数
                        JobCoreConfiguration jcc=JobCoreConfiguration
                                .newBuilder(jobName, cron, shardingTotalCount).build();
                        // job类型配置
                        JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                        // job根配置
                        LiteJobConfiguration ljc= LiteJobConfiguration
                                .newBuilder(jtc)
                                .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                                .overwrite(overwrite)
                                .build();
                        // 加入到JobScheduler
                        new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
                    }
                }
    
            }
        }
    }
    
    @ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                    ,shardingTotalCount = 10,overwrite = true,
                    jobStrategy = MyShardingStrategy.class)
    public class MySimpleJob implements SimpleJob{
        
        @Autowired
        private TestService testService;
        
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("当前分片项==》"+shardingContext.getShardingItem());
            
        }
    
    }
    

    事件追踪

    1、配置好datasource(略)
    2、修改配置,开启事件追踪

    @AutoConfigureAfter(ZookeeperAutoConfig.class)
    public class SimpleJobAutoConfig {
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private CoordinatorRegistryCenter zkCenter;
        
        @Autowired
        private DataSource dataSource;
        
        @PostConstruct
        public void initSimpleJob() {
            Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
            for (Map.Entry<String, Object> entry : beans.entrySet()) {
                Object instance = entry.getValue();
                Class<?>[] instances = instance.getClass().getInterfaces();
                for (Class<?> superInstance : instances) {
                    // 如果实现了SimpleJob接口
                    if (superInstance == SimpleJob.class) {
                        ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                        String jobName = annotation.jobName();
                        String cron = annotation.cron();
                        int shardingTotalCount = annotation.shardingTotalCount();
                        boolean overwrite = annotation.overwrite();
                        Class<?> jobStrategy = annotation.jobStrategy();
                        boolean jobEvent=annotation.jobEevent();
                        
                        // job核心参数
                        JobCoreConfiguration jcc=JobCoreConfiguration
                                .newBuilder(jobName, cron, shardingTotalCount).build();
                        // job类型配置
                        JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                        // job根配置
                        LiteJobConfiguration ljc= LiteJobConfiguration
                                .newBuilder(jtc)
                                .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                                .overwrite(overwrite)
                                .build();
                        
                        JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
                        if(jobEvent) {
                            // 加入到JobScheduler
                            new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec).init();
                        }else {
                            // 加入到JobScheduler
                            new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc).init();
                        }
                        
                    }
                }
    
            }
        }
    }
    
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface ElasticSimpleJob {
    
        String jobName() default "";
    
        String cron() default "";
    
        int shardingTotalCount() default 1;
    
        boolean overwrite() default false;
        
        Class<?> jobStrategy() default AverageAllocationJobShardingStrategy.class;
        
        boolean jobEevent() default false;
    }
    
    @ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                    ,shardingTotalCount = 10,overwrite = true,
                    jobStrategy = MyShardingStrategy.class
                    ,jobEevent = true)
    public class MySimpleJob implements SimpleJob{
        
        @Autowired
        private TestService testService;
        
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("当前分片项==》"+shardingContext.getShardingItem());
            
        }
    
    }
    

    注意开启事件追踪后,会自动增加两张表job_execution_log和job_status_trace_log。记录定时器相关内容。

    作业监听器

    监听作业执行前和作业执行后

    作业监听器类型:
    1、每个作业节点都均执行,无需考虑分布式(推荐)
    2、仅单个作业节点执行

    类型一:

    public class MyNormalListener implements ElasticJobListener {
    
        @Override
        public void beforeJobExecuted(ShardingContexts shardingContexts) {
            System.out.println(shardingContexts.getJobName()+",方法前!");
    
        }
    
        @Override
        public void afterJobExecuted(ShardingContexts shardingContexts) {
            System.out.println(shardingContexts.getJobName()+",方法后!");
    
        }
    
    }
    
    @Configuration
    // 如果配置了zookeeper注册中心
    @ConditionalOnBean(CoordinatorRegistryCenter.class)
    @AutoConfigureAfter(ZookeeperAutoConfig.class)
    public class SimpleJobAutoConfig {
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private CoordinatorRegistryCenter zkCenter;
        
        @Autowired
        private DataSource dataSource;
        
        @PostConstruct
        public void initSimpleJob() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
            Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
            for (Map.Entry<String, Object> entry : beans.entrySet()) {
                Object instance = entry.getValue();
                Class<?>[] instances = instance.getClass().getInterfaces();
                for (Class<?> superInstance : instances) {
                    // 如果实现了SimpleJob接口
                    if (superInstance == SimpleJob.class) {
                        ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                        String jobName = annotation.jobName();
                        String cron = annotation.cron();
                        int shardingTotalCount = annotation.shardingTotalCount();
                        boolean overwrite = annotation.overwrite();
                        Class<?> jobStrategy = annotation.jobStrategy();
                        boolean jobEvent=annotation.jobEevent();
                        Class<? extends ElasticJobListener>[] jobListners=annotation.jobListner();
                        ElasticJobListener[] listnerInstances=new ElasticJobListener[jobListners.length];
                        for (int i = 0; i < jobListners.length; i++) {
                            Class<? extends ElasticJobListener> listner =jobListners[i];
                            ElasticJobListener elasticJobListener =listner.getDeclaredConstructor().newInstance();
                            listnerInstances[i]=elasticJobListener;
                        }
                        
                        
                        
                        // job核心参数
                        JobCoreConfiguration jcc=JobCoreConfiguration
                                .newBuilder(jobName, cron, shardingTotalCount).build();
                        // job类型配置
                        JobTypeConfiguration jtc=new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());
                        // job根配置
                        LiteJobConfiguration ljc= LiteJobConfiguration
                                .newBuilder(jtc)
                                .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                                .overwrite(overwrite)
                                .build();
                        
                        JobEventConfiguration jec=new JobEventRdbConfiguration(dataSource);
                        if(jobEvent) {
                            // 加入到JobScheduler
                            new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,jec,listnerInstances).init();
                        }else {
                            // 加入到JobScheduler
                            MyNormalListener listener=new MyNormalListener();
                            new SpringJobScheduler((ElasticJob)instance, zkCenter, ljc,listnerInstances).init();
                        }
                        
                    }
    
    @ElasticSimpleJob(jobName = "mySimpleJob1",cron = "*/3 * * * * ?"
                    ,shardingTotalCount = 10,overwrite = true,
                    jobStrategy = MyShardingStrategy.class
                    ,jobEevent = true,
                    jobListner = MyNormalListener.class)
    public class MySimpleJob implements SimpleJob{
        
        @Autowired
        private TestService testService;
        
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("当前分片项==》"+shardingContext.getShardingItem());
            
        }
    
    }
    

    注意作业监听器和分片无关,多个分片也只会监听一次,只和作业节点和任务有关。

    运维平台

    略(有时间补)

    相关文章

      网友评论

          本文标题:定时器(elastic-job 二)

          本文链接:https://www.haomeiwen.com/subject/ryklxktx.html