美文网首页
tbschedule源码

tbschedule源码

作者: modou1618 | 来源:发表于2019-03-10 17:01 被阅读0次

    基于网上的一个tbschedule开源版本代码。

    一 zk节点使用

    zk节点.png

    1.1 调度策略

    ScheduleStrategy {
    //任务策略名称
    private String strategyName;
    //允许执行任务的ip列表
    private String[] IPList;
    //单机最大执行器数
    private int numOfSingleServer;
    //指定需要执行调度的总执行器数
    private int assignNum;
    //类型,Schedule,Class、Bean
    private Kind kind;
    //类型schedule:baseTaskType+$+ownSign
    // class类型,类名。  bean类型,bean名
    private String taskName;
    //任务参数
    private String taskParameter;
    //服务状态: pause,resume
    private String sts = STS_RESUME;
    }
    

    1.2 调度策略实时状态

    ScheduleStrategyRunntime{
    //见ScheduleStrategy属性说明
    String strategyName;
    String uuid;
    String ip;
    private Kind kind;
    private String taskName;
    private String taskParameter;
    //需要的ScheduleServer执行器数量,
    //ScheduleStrategy.assignNum中分配到当前factory的数量
    int requestNum;
    //当前的任务数量
    int currentNum;
    //消息
    String message;
    }
    

    1.3 调度任务类型

    ScheduleTaskType{
    //任务类型
    private String baseTaskType;
    //向配置中心更新心跳信息的频率
    private long heartBeatRate = 5 * 1000;// 1分钟
    //判断一个服务器死亡的周期。为了安全,至少是心跳周期的两倍以上
    private long judgeDeadInterval = 1 * 60 * 1000;// 2分钟
    //当没有数据的时候,休眠的时间
    private int sleepTimeNoData = 500;
    //在每次数据处理完后休眠的时间
    private int sleepTimeInterval = 0;
    //每次获取数据的数量
    private int fetchDataNumber = 500;
    //在批处理的时候,每次处理的数据量
    private int executeNumber = 1;
    private int threadNumber = 5;
    //调度器类型
    private String processorType = "SLEEP";
    //允许执行的开始时间
    private String permitRunStartTime;
    //允许执行的结束时间
    private String permitRunEndTime;
    //清除过期环境信息的时间间隔,以天为单位
    private double expireOwnSignInterval = 1;
    //处理任务的BeanName
    private String dealBeanName;
    //任务bean的参数,由用户自定义格式的字符串
    private String taskParameter;
    // 任务类型:静态static,动态dynamic
    private String taskKind = TASKKIND_STATIC;
    //任务项数组
    private String[] taskItems;
    //每个线程组能处理的最大任务项目书目
    private int maxTaskItemsOfOneThreadGroup = 0;
    //版本号
    private long version;
    //服务状态: pause,resume
    private String sts = STS_RESUME;
    }
    

    1.4 调度执行器

    ScheduleServer{
    //全局唯一编号
    private String uuid;
    private long id;
    //任务类型
    private String taskType;
    //原始任务类型
    private String baseTaskType;
    private String ownSign;
    //机器IP地址
    private String ip;
    //机器名称
    private String hostName;
    //数据处理线程数量
    private int threadNum;
    //服务开始时间
    private Timestamp registerTime;
    //最后一次心跳通知时间
    private Timestamp heartBeatTime;
    //最后一次取数据时间
    private Timestamp lastFetchDataTime;
    /**
     * 处理描述信息,例如读取的任务数量,
    处理成功的任务数量,处理失败的数量,处理耗时
     * FetchDataCount=4430,FetcheDataNum=438570,
    DealDataSucess=438570,DealDataFail=0,
    DealSpendTime=651066
     */
    private String dealInfoDesc;
    private String nextRunStartTime;
    private String nextRunEndTime;
    // 配置中心的当前时间
    private Timestamp centerServerTime;
    //数据版本号
    private long version;
    private boolean isRegister;
    private String managerFactoryUUID;
    }
    

    二 初始化

    image.png

    三 调度任务启动流程

    image.png

    四 任务执行

    • 两种任务执行线程,TBScheduleProcessorNotSleep和TBScheduleProcessorSleep
    • 发送scheduleserver心跳,失败则清楚缓存任务线程,重新注册scheduleServer
    • TBScheduleProcessorNotSleep执行流程


      TBScheduleProcessorNotSleep.png
    • TBScheduleProcessorSleep执行流程


      TBScheduleProcessorSleep.png

    五 任务暂停

    • 通知任务执行线程isStopSchedule=true停止处理数据
    • 清空内存待处理任务数据

    相关文章

      网友评论

          本文标题:tbschedule源码

          本文链接:https://www.haomeiwen.com/subject/xlnypqtx.html