序
本文主要研究一下PowerJob的HeavyTaskTracker
HeavyTaskTracker
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
@Slf4j
public abstract class HeavyTaskTracker extends TaskTracker {
/**
* ProcessTracker 状态管理
*/
protected final ProcessorTrackerStatusHolder ptStatusHolder;
/**
* 数据库持久化服务
*/
protected final TaskPersistenceService taskPersistenceService;
/**
* 定时任务线程池
*/
protected ScheduledExecutorService scheduledPool;
/**
* 任务信息缓存
*/
private final Cache<String, TaskBriefInfo> taskId2BriefInfo;
/**
* 分段锁
*/
private final SegmentLock segmentLock;
private static final int UPDATE_CONCURRENCY = 4;
protected HeavyTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
// 初始化成员变量
super(req,workerRuntime);
// 赋予时间表达式类型
instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());
// 保护性操作
instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
// 构建缓存
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build();
// 构建分段锁
segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
// 子类自定义初始化操作
initTaskTracker(req);
log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId);
}
//......
/**
* 初始化 TaskTracker
*
* @param req 服务器调度任务实例运行请求
*/
protected abstract void initTaskTracker(ServerScheduleJobReq req);
}
HeavyTaskTracker继承了TaskTracker,它也是个抽象类,其构造器主要是创建了ProcessorTrackerStatusHolder、taskId2BriefInfo、SegmentLock;它定义了抽象方法initTaskTracker;它提供了updateAppendedWfContext、updateTaskStatus、submitTask、receiveProcessorTrackerHeartbeat、broadcast方法;它实现了destroy、stopTask方法
updateAppendedWfContext
public void updateAppendedWfContext(Map<String, String> newAppendedWfContext) {
// check
if (instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) {
// 只有工作流中的任务才有存储的必要
return;
}
// 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
return;
}
for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) {
String originValue = appendedWfContext.put(entry.getKey(), entry.getValue());
log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue());
}
}
updateAppendedWfContext方法用于给工作流实例添加上下文数据,添加到了父类定义的appendedWfContext中
updateTaskStatus
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
}
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
int lockId = taskId.hashCode();
try {
// 阻塞获取锁
segmentLock.lockInterruptible(lockId);
TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);
// 缓存中不存在,从数据库查
if (taskBriefInfo == null) {
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
if (taskOpt.isPresent()) {
TaskDO taskDO = taskOpt.get();
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
} else {
// 理论上不存在这种情况,除非数据库异常
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
}
// 写入缓存
taskId2BriefInfo.put(taskId, taskBriefInfo);
}
// 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
if (taskBriefInfo.getLastReportTime() > reportTime) {
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
return;
}
// 检查状态转移是否合法,fix issue 404
if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
return;
}
// 此时本次请求已经有效,先更新相关信息
taskBriefInfo.setLastReportTime(reportTime);
taskBriefInfo.setStatus(nTaskStatus);
// 处理失败的情况
int configTaskRetryNum = instanceInfo.getTaskRetryNum();
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
// 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
// 查询DB再失败的话,就不重试了...
if (taskOpt.isPresent()) {
int failedCnt = taskOpt.get().getFailedCnt();
if (failedCnt < configTaskRetryNum) {
TaskDO updateEntity = new TaskDO();
updateEntity.setFailedCnt(failedCnt + 1);
/*
地址规则:
1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
*/
String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
updateEntity.setLastReportTime(reportTime);
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
return;
}
}
}
}
// 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
result = result == null ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
}
} catch (InterruptedException ignore) {
// ignore
} catch (Exception e) {
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
segmentLock.unlock(lockId);
}
}
updateTaskStatus使用任务id的hashcode作为分段锁的id,加锁然后获取taskBriefInfo(
内存
),更新reportTime和nTaskStatus,之后对于失败的情况将数据持久化到数据库
submitTask
public boolean submitTask(List<TaskDO> newTaskList) {
if (finished.get()) {
return true;
}
if (CollectionUtils.isEmpty(newTaskList)) {
return true;
}
// 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
newTaskList.forEach(task -> {
task.setInstanceId(instanceId);
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
task.setFailedCnt(0);
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
task.setLastReportTime(-1L);
});
log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
return taskPersistenceService.batchSave(newTaskList);
}
submitTask遍历newTaskList,挨个更新状态和时间,然后使用taskPersistenceService.batchSave(newTaskList)保存
receiveProcessorTrackerHeartbeat
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
String idlePtAddress = heartbeatReq.getAddress();
// 该 ProcessorTracker 已销毁,重置为初始状态
ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
List<TaskDO> unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}
receiveProcessorTrackerHeartbeat用于处理ProcessorTrackerStatusReportReq,它先更新ptStatusHolder,接着对于IDLE类型判断是否还有未完成的任务,有则更新为WORKER_PROCESS_FAILED
broadcast
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
if (finished.get()) {
return;
}
log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", instanceId, subInstanceId, preExecuteSuccess, preTaskId, result);
// 生成集群子任务
if (preExecuteSuccess) {
List<String> allWorkerAddress = ptStatusHolder.getAllProcessorTrackers();
List<TaskDO> subTaskList = Lists.newLinkedList();
for (int i = 0; i < allWorkerAddress.size(); i++) {
TaskDO subTask = new TaskDO();
subTask.setSubInstanceId(subInstanceId);
subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
subTask.setTaskId(preTaskId + "." + i);
// 广播任务直接写入派发地址
subTask.setAddress(allWorkerAddress.get(i));
subTaskList.add(subTask);
}
submitTask(subTaskList);
} else {
log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result);
}
}
broadcast方法对于preExecuteSuccess为true的会根据ptStatusHolder.getAllProcessorTrackers()来创建TaskDO,最后执行submitTask提交
destroy
public void destroy() {
finished.set(true);
Stopwatch sw = Stopwatch.createStarted();
// 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。
scheduledPool.shutdown();
// 1. 通知 ProcessorTracker 释放资源
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptAddress -> {
// 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
TransportUtils.ttStopPtInstance(stopRequest, ptAddress, workerRuntime.getTransporter());
});
// 2. 删除所有数据库数据
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) {
log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId);
} else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
}
// 3. 移除顶层引用,送去 GC
HeavyTaskTrackerManager.removeTaskTracker(instanceId);
log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", instanceId, sw.stop());
// 4. 强制关闭线程池
if (!scheduledPool.isTerminated()) {
CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow());
}
}
destroy方法更新finished为true,执行scheduledPool.shutdown(),然后给AllProcessorTrackers发送TaskTrackerStopInstanceReq,接着删除该instanceId的所有task,最后对于scheduledPool还未关闭的执行shutdownNow
stopTask
public void stopTask() {
destroy();
}
stopTask执行的是destroy方法
CommonTaskTracker
tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java
@Slf4j
@ToString
public class CommonTaskTracker extends HeavyTaskTracker {
/**
* 根任务 ID
*/
public static final String ROOT_TASK_ID = "0";
/**
* 最后一个任务 ID
* 除 {@link #ROOT_TASK_ID} 外任何数都可以
*/
public static final String LAST_TASK_ID = "9999";
protected CommonTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
super(req, workerRuntime);
}
//......
}
CommonTaskTracker继承了HeavyTaskTracker
initTaskTracker
protected void initTaskTracker(ServerScheduleJobReq req) {
// CommonTaskTrackerTimingPool 缩写
String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
// 持久化根任务
persistenceRootTask();
// 开启定时状态检查
int delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "13"));
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3, delay, TimeUnit.SECONDS);
// 如果是 MR 任务,则需要启动执行器动态检测装置
ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType());
if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
// 最后启动任务派发器,否则会出现 TaskTracker 还未创建完毕 ProcessorTracker 已开始汇报状态的情况
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 10, 5000, TimeUnit.MILLISECONDS);
}
initTaskTracker方法初始化scheduledPool、persistenceRootTask、调度StatusCheckRunnable、对于MR任务调度WorkerDetector,最后调度Dispatcher
persistenceRootTask
private void persistenceRootTask() {
TaskDO rootTask = new TaskDO();
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
rootTask.setInstanceId(instanceInfo.getInstanceId());
rootTask.setTaskId(ROOT_TASK_ID);
rootTask.setFailedCnt(0);
rootTask.setAddress(workerRuntime.getWorkerAddress());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setLastModifiedTime(System.currentTimeMillis());
rootTask.setLastReportTime(-1L);
rootTask.setSubInstanceId(instanceId);
if (taskPersistenceService.save(rootTask)) {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
} else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
throw new PowerJobException("create root task failed for instance: " + instanceId);
}
}
persistenceRootTask先创建rootTask,然后通过taskPersistenceService.save保存
StatusCheckRunnable
private class StatusCheckRunnable implements Runnable {
private static final long DISPATCH_TIME_OUT_MS = 15000;
@SuppressWarnings("squid:S3776")
private void innerRun() {
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
long finishedNum = holder.succeedNum + holder.failedNum;
long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setAppId(workerRuntime.getAppId());
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setWfInstanceId(instanceInfo.getWfInstanceId());
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(holder.succeedNum);
req.setFailedTaskNum(holder.failedNum);
req.setReportTime(System.currentTimeMillis());
req.setStartTime(createTime);
req.setSourceAddress(workerRuntime.getWorkerAddress());
boolean success = false;
String result = null;
// 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
if (unfinishedNum == 0) {
// 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
if (finishedNum == 0) {
finished.set(true);
result = SystemInstanceResult.TASK_INIT_FAILED;
} else {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
switch (executeType) {
// STANDALONE 只有一个任务,完成即结束
case STANDALONE:
finished.set(true);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
result = SystemInstanceResult.UNKNOWN_BUG;
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
} else {
result = allTask.get(0).getResult();
success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
}
break;
// MAP 不关心结果,最简单
case MAP:
finished.set(true);
success = holder.failedNum == 0;
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
break;
// MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
default:
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
if (lastTaskOptional.isPresent()) {
// 存在则根据 reduce 任务来判断状态
TaskDO resultTask = lastTaskOptional.get();
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
finished.set(true);
success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
result = resultTask.getResult();
}
} else {
// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID);
newLastTask.setSubInstanceId(instanceId);
newLastTask.setAddress(workerRuntime.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask));
}
}
}
}
// 3. 检查任务实例整体是否超时
if (isTimeout()) {
finished.set(true);
success = false;
result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
}
// 4. 执行完毕,报告服务器
if (finished.get()) {
req.setResult(result);
// 上报追加的工作流上下文信息
req.setAppendedWfContext(appendedWfContext);
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
reportFinalStatusThenDestroy(workerRuntime, req);
return;
}
// 5. 未完成,上报状态
req.setInstanceStatus(InstanceStatus.RUNNING.getV());
TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
// 6.1 定期检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis();
if (holder.workerUnreceivedNum != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
if (elapsedTime > DISPATCH_TIME_OUT_MS) {
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
// 特殊任务只能本机执行
if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
// 失败次数 + 1
updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
}
});
}
// 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
ptStatusHolder.remove(disconnectedPTs);
log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
}
}
}
/**
* 任务是否超时
*/
public boolean isTimeout() {
if (instanceInfo.getInstanceTimeoutMS() > 0) {
return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();
}
return false;
}
@Override
public void run() {
try {
innerRun();
} catch (Exception e) {
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
}
}
}
StatusCheckRunnable实现了Runnable接口,其run方法执行innerRun;innerRun先构建TaskTrackerReportInstanceStatusReq,之后根据executeType来判断任务是否已经结束,接着判断任务是否超时,针对执行完毕的执行reportFinalStatusThenDestroy,对于未完成的通过ttReportInstanceStatus上报;对于workerUnreceivedNum不为0的会取出状态为DISPATCH_SUCCESS_WORKER_UNCHECK的任务进行更新,最后针对DisconnectedProcessorTrackers上的任务执行taskPersistenceService.updateLostTasks
FrequentTaskTracker
tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java
@Slf4j
public class FrequentTaskTracker extends HeavyTaskTracker {
/**
* 时间表达式类型
*/
private TimeExpressionType timeExpressionType;
private long timeParams;
/**
* 最大同时运行实例数
*/
private int maxInstanceNum;
/**
* 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争严重推荐 LongAdder)
*/
private AtomicLong triggerTimes;
private AtomicLong succeedTimes;
private AtomicLong failedTimes;
/**
* 任务发射器
*/
private Launcher launcher;
/**
* 保存最近10个子任务的信息,供用户查询(user -> server -> worker 传递查询)
*/
private LRUCache<Long, SubInstanceInfo> recentSubInstanceInfo;
/**
* 保存运行中的任务
*/
private Map<Long, SubInstanceTimeHolder> subInstanceId2TimeHolder;
private AlertManager alertManager;
private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L";
private static final int MIN_INTERVAL = 50;
protected FrequentTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
super(req, workerRuntime);
}
//......
}
FrequentTaskTracker继承了HeavyTaskTracker,它主要是用于处理秒级任务
initTaskTracker
protected void initTaskTracker(ServerScheduleJobReq req) {
// 0. 初始化实例变量
timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
timeParams = Long.parseLong(req.getTimeExpression());
maxInstanceNum = req.getMaxInstanceNum();
triggerTimes = new AtomicLong(0);
succeedTimes = new AtomicLong(0);
failedTimes = new AtomicLong(0);
recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE);
subInstanceId2TimeHolder = Maps.newConcurrentMap();
// 1. 初始化定时调度线程池
String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(4, factory);
this.alertManager = constructAlertManager(req);
// 2. 启动任务发射器
launcher = new Launcher();
if (timeExpressionType == TimeExpressionType.FIXED_RATE) {
// 固定频率需要设置最小间隔
if (timeParams < MIN_INTERVAL) {
throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
}
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
} else {
scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS);
}
// 3. 启动任务分发器(事实上,秒级任务应该都是单机任务,且感觉不需要失败重试机制,那么 Dispatcher 的存在就有点浪费系统资源了...)
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
// 4. 启动状态检查器
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
// 5. 启动执行器动态检测装置
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
initTaskTracker方法主要是初始化LRUCache、scheduledPool、alertManager、调度Launcher、Dispatcher、Checker、WorkerDetector
小结
HeavyTaskTracker继承了TaskTracker,它也是个抽象类,其构造器主要是创建了ProcessorTrackerStatusHolder、taskId2BriefInfo、SegmentLock;它定义了抽象方法initTaskTracker;它提供了updateAppendedWfContext、updateTaskStatus、submitTask、receiveProcessorTrackerHeartbeat、broadcast方法;它实现了destroy、stopTask方法;它有两个实现类,分别是CommonTaskTracker用于处理任务派发和状态更新,FrequentTaskTracker用于处理秒级任务。
网友评论