美文网首页
分布式任务调度框架Elastic-job

分布式任务调度框架Elastic-job

作者: Young_5942 | 来源:发表于2020-07-19 20:18 被阅读0次

    简介

    Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成
    Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
    Elastic-Job-Cloud以私有云平台的方式提供集资源、调度以及分片为一体的全量级解决方案,依赖Mesos和Zookeeper。

    概述

    任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
    Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。提供个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

    架构图

    Elastic-job架构图

    Elastic-job-lite

    Elastic-job-lite 是一种轻量级的分布式任务调度的解决方案,基于Quartz实现任务的调度和基于zookeeper来协调多个作业服务器。

    Elastic-job-lite 分片的时机

    首先需要时候分片通过调用setReshardingFlag() 函数来实现,
    在zk创建持久节点 /{namespace}/{jobname}/leader/sharding/necessary

    public void setReshardingFlag() {
        jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }
    
    以下情况会触发分片
    1.ReconcileService#runOneIteration()

    ReconcileServicd继承com.google.common.util.concurrent.AbstractScheduledService
    来实现定时任务,每隔1分钟运行一次

    @Override
    protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
    }
    

    runOneIteration

    @Override
    protected void runOneIteration() {
         //加载job的配置
        JobConfiguration config = configService.load(true);
        //默认的协调间隔是10分钟
        int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
       // 成立条件:协调时间间隔大于0 && 系统当前时间 - 上一次协调时间 >= 10min
        if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
            lastReconcileTime = System.currentTimeMillis();
            //成立条件:自己是leader && zk下没有 /${namespace}/${jobname}/leader/sharding/necessary这个节点 && servers节点下有下线的机器
            if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
                log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
                 //创建重新分片
                shardingService.setReshardingFlag();
            }
        }
    }
    
    2.作业服务器启动,job初始化是想zk注册job信息的时候,创建需要分片的节点

    SetUpFacade#registerStartUpInfo()

    public void registerStartUpInfo(final boolean enabled) {
       // 注册监听器
        listenerManager.startAllListeners();
       //选主
        leaderService.electLeader();
       //在servers节点
        serverService.persistOnline(enabled);
       //在instance节点
        instanceService.persistOnline();
        //在需要分片节点
        shardingService.setReshardingFlag();
       //启动reconcileService定时任务
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
    
    3.监听节点,当servers节点变化时候触发重新分片

    ShardingListenerManager.ListenServersChangedJobListener

    class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //成立条件job存在 && (是instance节点产生的变化 或者是server节点产生的变化)
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }
    
    4.监听节点,当config节点变化时候触发重新分片

    ShardingTotalCountChangedJobListener

    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
           // 判断条件:变化节点是config && 当前分片总数不为0
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
               //获取sharding count 
               int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
                //如果获取的sharding count 和 本地的sharding count不同  设置重新分片
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:分布式任务调度框架Elastic-job

          本文链接:https://www.haomeiwen.com/subject/vgmrkktx.html