美文网首页
源码分析

源码分析

作者: 飞盏 | 来源:发表于2018-10-25 17:06 被阅读0次

    5. 核心源码分析

    作业初始化流程

    5.1 核心入口:JobScheduler作业调度器

    /**
     * 作业调度器.
     * 
     * @author zhangliang
     * @author caohao
     */
    public class JobScheduler {
        
        private static final String SCHEDULER_INSTANCE_NAME_SUFFIX = "Scheduler";
        
        private static final String CRON_TRIGGER_IDENTITY_SUFFIX = "Trigger";
    
        //作业启动器
        private final JobExecutor jobExecutor;
        
        public JobScheduler(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
            jobExecutor = new JobExecutor(regCenter, jobConfig, elasticJobListeners);
        }
        
        /**
         * 初始化作业.
         */
        public void init() {
             //作业启动器初始化
            jobExecutor.init();
            //建造者模式构造jobDetail
            JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobExecutor.getJobName()).build();
            //保留job的状态信息
            jobDetail.getJobDataMap().put("elasticJob", jobExecutor.getElasticJob());
            JobScheduleController jobScheduleController;
            try {
                //实例化作业调度控制器
                jobScheduleController = new JobScheduleController(
                        initializeScheduler(jobDetail.getKey().toString()), jobDetail, jobExecutor.getSchedulerFacade(), Joiner.on("_").join(jobExecutor.getJobName(), CRON_TRIGGER_IDENTITY_SUFFIX));
                jobScheduleController.scheduleJob(jobExecutor.getSchedulerFacade().getCron());
            } catch (final SchedulerException ex) {
                throw new JobException(ex);
            }
            //向作业注册表注册JobScheduleController实例
            JobRegistry.getInstance().addJobScheduleController(jobExecutor.getJobName(), jobScheduleController);
        }
        
        private Scheduler initializeScheduler(final String jobName) throws SchedulerException {
            //工厂方法构造quartz的Scheduler实例
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties(jobName));
            Scheduler result = factory.getScheduler();
            //注册Trigger监听事件
             result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
            return result;
        }
    
         private Properties getBaseQuartzProperties(final String jobName) {
            Properties result = new Properties();
            result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
            //并发执行线程数为1,意味着job任务同步执行,防止同一个任务执行时间过长被多次执行
            result.put("org.quartz.threadPool.threadCount", "1");
            result.put("org.quartz.scheduler.instanceName", Joiner.on("_").join(jobName, SCHEDULER_INSTANCE_NAME_SUFFIX));
                if (!jobExecutor.getSchedulerFacade().isMisfire()) {
                    result.put("org.quartz.jobStore.misfireThreshold", "1");
                }
                prepareEnvironments(result);
                return result;
                }
    
            //钩子方法,用于子类覆盖
            protected void prepareEnvironments(final Properties props) {
            }
      }
    

    5.3 作业启动器的init方法

    /** 
         * JobExecutor   
         * 初始化作业.
         */
        public void init() {
            log.debug("Elastic job: job controller init, job name is: {}.", jobName);
            //清除上次secheduler的信息
            schedulerFacade.clearPreviousServerStatus();
            //向注册中心注册当前job
            regCenter.addCacheData("/" + jobName);
            //门面类执行具体业务初始化工作
            schedulerFacade.registerStartUpInfo();
        }
    
        /**
         * SchedulerFacade门面类
         * 注册Elastic-Job启动信息.
         */
        public void registerStartUpInfo() {
            //启动所有监听事件
            listenerManager.startAllListeners();
            //强制主节点选举
            leaderElectionService.leaderForceElection();
            //持久化分布式作业配置信息
            configService.persistJobConfiguration();
            //持久化作业服务器上线相关信息
            serverService.persistServerOnline();
            //清除暂停作业的标记
            serverService.clearJobPausedStatus();
            if (JobType.DATA_FLOW == configService.getJobType()) {
                //异步开启定时批量统计处理数据数量的作业
                statisticsService.startProcessCountJob();
            }
            //设置需要重新分片的标记
            shardingService.setReshardingFlag();
            //初始化作业监听服务
            monitorService.listen();
        }
    

    5.3 作业注册表

    /**
     * 作业注册表.
     * 
     * @author zhangliang
     * @author caohao
     */
    //多线程双检锁:保证单例线程安全
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class JobRegistry {
        //为什么要用volatile关键字?
        private static volatile JobRegistry instance;
        //全局的作业被以map形式缓存在注册表单例中
        private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
        
        /**
         * 获取作业注册表实例.
         * 
         * @return 作业注册表实例
         */
        public static JobRegistry getInstance() {
            if (null == instance) {
                synchronized (JobRegistry.class) {
                    if (null == instance) {
                        //实际上实例化分为分配内存和执行构造方法两部分,如果不加volatile,会导致指令重排序,导致构造方法先被执行。
                        //而另一个线程到达临界区代码段,从而获取到一个未被完全实例化的instance。
                        instance = new JobRegistry();
                    }
                }
            }
            return instance;
        }
        
        /**
         * 添加作业调度控制器.
         * 
         * @param jobName 作业名称
         * @param jobScheduleController 作业调度控制器
         */
        public void addJobScheduleController(final String jobName, final JobScheduleController jobScheduleController) {
            schedulerMap.put(jobName, jobScheduleController);
        }
        
        /**
         * 获取作业调度控制器.
         * 
         * @param jobName 作业名称
         * @return 作业调度控制器
         */
        public JobScheduleController getJobScheduleController(final String jobName) {
            return schedulerMap.get(jobName);
        }
    }
    

    5.4 JobExecutor作业启动器

    /**
     * 作业启动器.
     * 
     * @author zhangliang
     */
    @Slf4j
    @Getter
    public class JobExecutor {
        
        private final String jobName;
        //分布式注册中心
        private final CoordinatorRegistryCenter regCenter;
        //作业具体执行器
        private final ElasticJob elasticJob;
        //为调度器提供内部服务的门面类
        private final SchedulerFacade schedulerFacade;
    
     /**
         * 初始化作业.
         */
        public void init() {
            log.debug("Elastic job: job controller init, job name is: {}.", jobName);
            //清楚上次作业状态信息
            schedulerFacade.clearPreviousServerStatus();
            //向注册中心注册当前任务
            regCenter.addCacheData("/" + jobName);
            //注册Elastic-Job启动信息
            schedulerFacade.registerStartUpInfo();
        }
        
        public JobExecutor(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
            jobName = jobConfig.getJobName();
            this.regCenter = regCenter;
            List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
            setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
            elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
            schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
        }
        
        private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
            GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig);
            for (ElasticJobListener each : elasticJobListeners) {
                if (each instanceof AbstractDistributeOnceElasticJobListener) {
                    ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
                }
            }
        }
        
        private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
            ElasticJob result;
            try {
                result = (ElasticJob) jobConfig.getJobClass().newInstance();
            } catch (final InstantiationException | IllegalAccessException ex) {
                throw new JobException(ex);
            }
            result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
            return result;
        }
    }
    

    5.2注册中心模块

     @Override
        public void init() {
            //如果开关开启,则启动zk内部服务器,提供job节点注册服务
            if (zkConfig.isUseNestedZookeeper()) {
                NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(),
                                                                             zkConfig.getNestedDataDir());
            }
            log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
            //创建zk连接客户端
            Builder builder = CuratorFrameworkFactory.builder()
                                                     .connectString(zkConfig.getServerLists())
                                                     .retryPolicy(new ExponentialBackoffRetry(
                                                             zkConfig.getBaseSleepTimeMilliseconds(),
                                                             zkConfig.getMaxRetries(),
                                                             zkConfig.getMaxSleepTimeMilliseconds()))
                                                     .namespace(zkConfig.getNamespace());
            if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
                builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
            }
            if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
                builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
            }
            //根据配置,开启权限验证
            if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
                builder.authorization("digest", zkConfig.getDigest().getBytes(Charset.forName("UTF-8")))
                       .aclProvider(new ACLProvider() {
    
                           @Override
                           public List<ACL> getDefaultAcl() {
                               return ZooDefs.Ids.CREATOR_ALL_ACL;
                           }
    
                           @Override
                           public List<ACL> getAclForPath(final String path) {
                               return ZooDefs.Ids.CREATOR_ALL_ACL;
                           }
                       });
            }
            client = builder.build();
            client.start();
            try {
                //客户端锁定并尝试连接注册中心                
                client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(),
                                           TimeUnit.MILLISECONDS);
                if (!client.getZookeeperClient().isConnected()) {
                    throw new KeeperException.OperationTimeoutException();
                }
                if (!Strings.isNullOrEmpty(zkConfig.getLocalPropertiesPath())) {
                    //根据路径读取配置文件,并创建节点
                    fillData();
                }
                //CHECKSTYLE:OFF
            } catch (final Exception ex) {
                //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
    

    4.2 plugin模块中的三种作业类型

    作业类型

    elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。

    /**
     * 简单的分布式作业.
     * 
     * <p>
     * 仅保证作业可被分布式定时调用, 不提供任何作业处理逻辑.
     * </p>
     * 
     * @author zhangliang
     * @author caohao
     */
    @Slf4j
    public abstract class AbstractSimpleElasticJob extends AbstractElasticJob {
        
        @Override
        protected final void executeJob(final JobExecutionMultipleShardingContext shardingContext) {
            process(shardingContext);
        }
        
        /**
         * 执行作业.
         * 
         * @param shardingContext 作业分片规则配置上下文
         */
        public abstract void process(final JobExecutionMultipleShardingContext shardingContext);
    }
    

    Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。可通过DataflowJobConfiguration配置是否流式处理。流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。实际开发中,Dataflow类型的job还是很有好用的。

    
    /**
     * 保证同一分片顺序性的批量处理数据流程的作业.
     * 
     * @author zhangliang
     *
     * @param <T> 数据流作业处理的数据实体类型
     */
    public abstract class AbstractBatchSequenceDataFlowElasticJob<T> extends AbstractBatchDataFlowElasticJob<T, JobExecutionSingleShardingContext> {
    }
    
    /**
     * 高吞吐量批量处理数据流程的作业.
     * 
     * @author zhangliang
     *
     * @param <T> 数据流作业处理的数据实体类型
     */
    public abstract class AbstractBatchThroughputDataFlowElasticJob<T> extends AbstractBatchDataFlowElasticJob<T, JobExecutionMultipleShardingContext> {
    }
    
    /**
     * 保证同一分片顺序性的逐条处理数据流程的作业.
     * 
     * @author zhangliang
     *
     * @param <T> 数据流作业处理的数据实体类型
     */
    public abstract class AbstractIndividualSequenceDataFlowElasticJob<T> extends AbstractIndividualDataFlowElasticJob<T, JobExecutionSingleShardingContext> {
    }
    
    /**
     * 高吞吐量逐条处理数据流程的作业.
     * 
     * @author zhangliang
     *
     * @param <T> 数据流作业处理的数据实体类型
     */
    public abstract class AbstractIndividualThroughputDataFlowElasticJob<T> extends AbstractIndividualDataFlowElasticJob<T, JobExecutionMultipleShardingContext> {
    }
    

    4.3 plugin中的分片策略

    三种分片策略

    AverageAllocationJobShardingStrategy:基于平均分配算法的分片策略;
    OdevitySortByNameJobShardingStrategy:根据作业名的哈希值奇偶数决定IP升降序算法的分片策略;
    RotateServerByNameJobShardingStrategy:根据作业名的哈希值对服务器列表进行轮转的分片策略;

    /**
     * 基于平均分配算法的分片策略.
     * 
     * <p>
     * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
     * 如: 
     * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
     * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
     * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
     * </p>
     * 
     * @author zhangliang
     */
    public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
        
        @Override
        public Map<String, List<Integer>> sharding(final List<String> serversList, final JobShardingStrategyOption option) {
            if (serversList.isEmpty()) {
                return Collections.emptyMap();
            }
            Map<String, List<Integer>> result = shardingAliquot(serversList, option.getShardingTotalCount());
            addAliquant(serversList, option.getShardingTotalCount(), result);
            return result;
        }
        //平均分配前面若干项
        private Map<String, List<Integer>> shardingAliquot(final List<String> serversList, final int shardingTotalCount) {
            Map<String, List<Integer>> result = new LinkedHashMap<>(serversList.size());
            int itemCountPerSharding = shardingTotalCount / serversList.size();
            int count = 0;
            for (String each : serversList) {
                List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
                for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                    shardingItems.add(i);
                }
                result.put(each, shardingItems);
                count++;
            }
            return result;
        }
        
       //追加不能整除的分片索引
        private void addAliquant(final List<String> serversList, final int shardingTotalCount, final Map<String, List<Integer>> shardingResult) {
            int aliquant = shardingTotalCount % serversList.size();
            int count = 0;
            for (Entry<String, List<Integer>> entry : shardingResult.entrySet()) {
                if (count < aliquant) {
                    entry.getValue().add(shardingTotalCount / serversList.size() * serversList.size() + count);
                }
                count++;
            }
        }
    }
    
    
    /**
     * 根据作业名的哈希值奇偶数决定IP升降序算法的分片策略.
     * 
     * <p>
     * 作业名的哈希值为奇数则IP升序.
     * 作业名的哈希值为偶数则IP降序.
     * 用于不同的作业平均分配负载至不同的服务器.
     * 如: 
     * 1. 如果有3台服务器, 分成2片, 作业名称的哈希值为奇数, 则每台服务器分到的分片是: 1=[0], 2=[1], 3=[].
     * 2. 如果有3台服务器, 分成2片, 作业名称的哈希值为偶数, 则每台服务器分到的分片是: 3=[0], 2=[1], 1=[].
     * </p>
     * 
     * @author zhangliang
     */
    public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
        
        private AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
        
        @Override
        public Map<String, List<Integer>> sharding(final List<String> serversList, final JobShardingStrategyOption option) {
            long jobNameHash = option.getJobName().hashCode();
            if (0 == jobNameHash % 2) {
                Collections.reverse(serversList);
            }
            return averageAllocationJobShardingStrategy.sharding(serversList, option);
        }
    }
    
    
    /**
     * 根据作业名的哈希值对服务器列表进行轮转的分片策略.
     * 向左偏移offset位之后进行平均分配 
     * 
     * @author weishubin
     */
    public class RotateServerByNameJobShardingStrategy implements JobShardingStrategy {
        
        private AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
        
        @Override
        public Map<String, List<Integer>> sharding(final List<String> serversList, final JobShardingStrategyOption option) {
            return averageAllocationJobShardingStrategy.sharding(rotateServerList(serversList, option.getJobName()), option);
        }
        
        private List<String> rotateServerList(final List<String> serversList, final String jobName) {
            int serverSize = serversList.size();
            int offset = Math.abs(jobName.hashCode()) % serverSize;
            if (0 == offset) {
                return serversList;
            }
            List<String> result = new ArrayList<>(serverSize);
            for (int i = 0; i < serverSize; i++) {
                int index = (i + offset) % serverSize;
                result.add(serversList.get(index));
            }
            return result;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:源码分析

          本文链接:https://www.haomeiwen.com/subject/tsielftx.html