美文网首页
elastic-job分布式任务

elastic-job分布式任务

作者: 离别刀 | 来源:发表于2019-06-17 17:46 被阅读0次

    项目依赖

        sprintboot项目依赖如下
        <!--框架核心jar包-->
        <dependency>
            <groupId>com.github.kuhn-he</groupId>
            <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
            <version>2.1.5</version>
        </dependency>
        
    一般spring项目参见如下      
        <dependency>
          <groupId>com.dangdang</groupId>
          <artifactId>elastic-job-lite-core</artifactId>
          <version>2.1.2</version>
        </dependency>
        <dependency>
          <groupId>com.dangdang</groupId>
          <artifactId>elastic-job-lite-spring</artifactId>
          <version>2.1.2</version>
        </dependency>
        
    可以参见此文 
    https://blog.csdn.net/u010889990/article/details/80000012
    或者
    https://blog.csdn.net/xvshu/article/details/80755988 
    

    配置文件

    esjob.serverList=127.0.0.2:2181,127.0.0.3:2181,127.0.0.6:2181
    esjob.namespace=esjob
    esjob.userJob.shardingTotalCount=3
    esjob.userJob.shardingItemParameters=0=0,1=1,2=2
    

    Zookeeper配置中心

    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    //基于ZK的esjob配置中心
    @Configuration
    @ConfigurationProperties("esjob")
    public class ElasticJobConfig {
        
        private String serverList;
        private String namespace;
        @Bean
        public ZookeeperConfiguration zkConfig() {
            return new ZookeeperConfiguration(serverList, namespace);
        }
    
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
            return new ZookeeperRegistryCenter(config);
        }
    }
    

    SimpleJobConfig配置

    import cn.fastcampus.scheduler.DemoSimpleJob;
    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    @Configuration
    public class SimpleJobConfig {
        @Value("${esjob.userJob.cron}")
        private final String cron;
        @Value("${esjob.userJob.shardingTotalCount}") 
        private final int shardingTotalCount
        @Value("${esjob.userJob.shardingItemParameters}") 
        private final String shardingItemParameters
    
        @Resource
        private ZookeeperRegistryCenter regCenter;
        
        @Bean
        public DemoSimpleJob simpleJob(){
            return new DemoSimpleJob();
        }
    
        @Bean(initMethod = "init")
        public JobScheduler simpleJobScheduler(final DemoSimpleJob simpleJob, String cron,int shardingTotalCount,String shardingItemParameters){
            return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
        }
    
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
            return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
        }
    }
    

    定时任务DemoSimpleJob

    
    import cn.cicada.common.metrics.MetricsTimer;
    import cn.cicada.common.metrics.MetricsUtil;
    import cn.cicada.common.utils.StopWatch;
    import com.dangdang.ddframe.job.api.ShardingContext;
    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.chain.impl.ContextBase;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.util.CollectionUtils;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class DemoSimpleJob implements SimpleJob {
        
       private final Logger log = LoggerFactory.getLogger(DemoSimpleJob.class);
       
       @Autowired
       private UserMapper userMapper;
      
    
        @Override
        public void execute(ShardingContext shardingContext) {
         
            int totalCount = shardingContext.getShardingTotalCount();
            
            log.info("[logFLag:{}] sharding context [Item:{}][parameter:{}],", logFlag, shardingContext.getShardingItem(), shardingContext.getShardingParameter());
            
            //dosomething
            List<User> schoolList = userMapper.getUsersByShardingMode(totalCount, Integer.valueOf(shardingContext.getShardingParameter()));
            
            
        }
    
    }
    
    public interface UserMapper {
        
        @Select("select * from t_user where id % #{mode} = #{residue} and status = 0")
        List<User> getUsersByShardingMode(@Param("mode") int mode, @Param("residue") int residue);
        
    }
    
    

    相关文章

      网友评论

          本文标题:elastic-job分布式任务

          本文链接:https://www.haomeiwen.com/subject/ogdzfctx.html