基于网上的一个tbschedule开源版本代码。
一 zk节点使用
zk节点.png1.1 调度策略
ScheduleStrategy {
//任务策略名称
private String strategyName;
//允许执行任务的ip列表
private String[] IPList;
//单机最大执行器数
private int numOfSingleServer;
//指定需要执行调度的总执行器数
private int assignNum;
//类型,Schedule,Class、Bean
private Kind kind;
//类型schedule:baseTaskType+$+ownSign
// class类型,类名。 bean类型,bean名
private String taskName;
//任务参数
private String taskParameter;
//服务状态: pause,resume
private String sts = STS_RESUME;
}
1.2 调度策略实时状态
ScheduleStrategyRunntime{
//见ScheduleStrategy属性说明
String strategyName;
String uuid;
String ip;
private Kind kind;
private String taskName;
private String taskParameter;
//需要的ScheduleServer执行器数量,
//ScheduleStrategy.assignNum中分配到当前factory的数量
int requestNum;
//当前的任务数量
int currentNum;
//消息
String message;
}
1.3 调度任务类型
ScheduleTaskType{
//任务类型
private String baseTaskType;
//向配置中心更新心跳信息的频率
private long heartBeatRate = 5 * 1000;// 1分钟
//判断一个服务器死亡的周期。为了安全,至少是心跳周期的两倍以上
private long judgeDeadInterval = 1 * 60 * 1000;// 2分钟
//当没有数据的时候,休眠的时间
private int sleepTimeNoData = 500;
//在每次数据处理完后休眠的时间
private int sleepTimeInterval = 0;
//每次获取数据的数量
private int fetchDataNumber = 500;
//在批处理的时候,每次处理的数据量
private int executeNumber = 1;
private int threadNumber = 5;
//调度器类型
private String processorType = "SLEEP";
//允许执行的开始时间
private String permitRunStartTime;
//允许执行的结束时间
private String permitRunEndTime;
//清除过期环境信息的时间间隔,以天为单位
private double expireOwnSignInterval = 1;
//处理任务的BeanName
private String dealBeanName;
//任务bean的参数,由用户自定义格式的字符串
private String taskParameter;
// 任务类型:静态static,动态dynamic
private String taskKind = TASKKIND_STATIC;
//任务项数组
private String[] taskItems;
//每个线程组能处理的最大任务项目书目
private int maxTaskItemsOfOneThreadGroup = 0;
//版本号
private long version;
//服务状态: pause,resume
private String sts = STS_RESUME;
}
1.4 调度执行器
ScheduleServer{
//全局唯一编号
private String uuid;
private long id;
//任务类型
private String taskType;
//原始任务类型
private String baseTaskType;
private String ownSign;
//机器IP地址
private String ip;
//机器名称
private String hostName;
//数据处理线程数量
private int threadNum;
//服务开始时间
private Timestamp registerTime;
//最后一次心跳通知时间
private Timestamp heartBeatTime;
//最后一次取数据时间
private Timestamp lastFetchDataTime;
/**
* 处理描述信息,例如读取的任务数量,
处理成功的任务数量,处理失败的数量,处理耗时
* FetchDataCount=4430,FetcheDataNum=438570,
DealDataSucess=438570,DealDataFail=0,
DealSpendTime=651066
*/
private String dealInfoDesc;
private String nextRunStartTime;
private String nextRunEndTime;
// 配置中心的当前时间
private Timestamp centerServerTime;
//数据版本号
private long version;
private boolean isRegister;
private String managerFactoryUUID;
}
二 初始化
image.png三 调度任务启动流程
image.png四 任务执行
- 两种任务执行线程,TBScheduleProcessorNotSleep和TBScheduleProcessorSleep
- 发送scheduleserver心跳,失败则清楚缓存任务线程,重新注册scheduleServer
-
TBScheduleProcessorNotSleep执行流程
TBScheduleProcessorNotSleep.png -
TBScheduleProcessorSleep执行流程
TBScheduleProcessorSleep.png
五 任务暂停
- 通知任务执行线程
isStopSchedule=true
停止处理数据 - 清空内存待处理任务数据
网友评论