美文网首页
聊聊PowerJob的HeavyTaskTracker

聊聊PowerJob的HeavyTaskTracker

作者: go4it | 来源:发表于2023-12-27 09:30 被阅读0次

    本文主要研究一下PowerJob的HeavyTaskTracker

    HeavyTaskTracker

    tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

    @Slf4j
    public abstract class HeavyTaskTracker extends TaskTracker {
    
        /**
         * ProcessTracker 状态管理
         */
        protected final ProcessorTrackerStatusHolder ptStatusHolder;
        /**
         * 数据库持久化服务
         */
        protected final TaskPersistenceService taskPersistenceService;
        /**
         * 定时任务线程池
         */
        protected ScheduledExecutorService scheduledPool;
        /**
         * 任务信息缓存
         */
        private final Cache<String, TaskBriefInfo> taskId2BriefInfo;
    
    
        /**
         * 分段锁
         */
        private final SegmentLock segmentLock;
        private static final int UPDATE_CONCURRENCY = 4;
    
        protected HeavyTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            // 初始化成员变量
            super(req,workerRuntime);
            // 赋予时间表达式类型
            instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());
            // 保护性操作
            instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
            this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
            this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
            // 构建缓存
            taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build();
    
            // 构建分段锁
            segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
    
            // 子类自定义初始化操作
            initTaskTracker(req);
    
            log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId);
        }
    
        //......
    
        /**
         * 初始化 TaskTracker
         *
         * @param req 服务器调度任务实例运行请求
         */
        protected abstract void initTaskTracker(ServerScheduleJobReq req);    
    }    
    

    HeavyTaskTracker继承了TaskTracker,它也是个抽象类,其构造器主要是创建了ProcessorTrackerStatusHolder、taskId2BriefInfo、SegmentLock;它定义了抽象方法initTaskTracker;它提供了updateAppendedWfContext、updateTaskStatus、submitTask、receiveProcessorTrackerHeartbeat、broadcast方法;它实现了destroy、stopTask方法

    updateAppendedWfContext

        public void updateAppendedWfContext(Map<String, String> newAppendedWfContext) {
    
            // check
            if (instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) {
                // 只有工作流中的任务才有存储的必要
                return;
            }
            // 检查追加的上下文大小是否超出限制
            if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
                log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
                // ignore appended workflow context data
                return;
            }
    
            for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) {
                String originValue = appendedWfContext.put(entry.getKey(), entry.getValue());
                log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue());
            }
    
        }
    

    updateAppendedWfContext方法用于给工作流实例添加上下文数据,添加到了父类定义的appendedWfContext中

    updateTaskStatus

        public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
    
            if (finished.get()) {
                return;
            }
            TaskStatus nTaskStatus = TaskStatus.of(newStatus);
    
            int lockId = taskId.hashCode();
            try {
    
                // 阻塞获取锁
                segmentLock.lockInterruptible(lockId);
                TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);
    
                // 缓存中不存在,从数据库查
                if (taskBriefInfo == null) {
                    Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                    if (taskOpt.isPresent()) {
                        TaskDO taskDO = taskOpt.get();
                        taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
                    } else {
                        // 理论上不存在这种情况,除非数据库异常
                        log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
                        taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
                    }
                    // 写入缓存
                    taskId2BriefInfo.put(taskId, taskBriefInfo);
                }
    
                // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
                if (taskBriefInfo.getLastReportTime() > reportTime) {
                    log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
                            instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
                    return;
                }
                // 检查状态转移是否合法,fix issue 404
                if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
                    log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
                            instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
                    return;
                }
    
                // 此时本次请求已经有效,先更新相关信息
                taskBriefInfo.setLastReportTime(reportTime);
                taskBriefInfo.setStatus(nTaskStatus);
    
                // 处理失败的情况
                int configTaskRetryNum = instanceInfo.getTaskRetryNum();
                if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
    
                    // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
                    Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                    // 查询DB再失败的话,就不重试了...
                    if (taskOpt.isPresent()) {
                        int failedCnt = taskOpt.get().getFailedCnt();
                        if (failedCnt < configTaskRetryNum) {
    
                            TaskDO updateEntity = new TaskDO();
                            updateEntity.setFailedCnt(failedCnt + 1);
    
                            /*
                            地址规则:
                            1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
                            2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
                            3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
                             */
                            String taskName = taskOpt.get().getTaskName();
                            ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
                            if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
                                updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                            }
    
                            updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                            updateEntity.setLastReportTime(reportTime);
    
                            boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
                            if (retryTask) {
                                log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
                                return;
                            }
                        }
                    }
                }
    
                // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
                result = result == null ? "" : result;
                boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
    
                if (!updateResult) {
                    log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
                }
    
            } catch (InterruptedException ignore) {
                // ignore
            } catch (Exception e) {
                log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
            } finally {
                segmentLock.unlock(lockId);
            }
        }
    

    updateTaskStatus使用任务id的hashcode作为分段锁的id,加锁然后获取taskBriefInfo(内存),更新reportTime和nTaskStatus,之后对于失败的情况将数据持久化到数据库

    submitTask

        public boolean submitTask(List<TaskDO> newTaskList) {
            if (finished.get()) {
                return true;
            }
            if (CollectionUtils.isEmpty(newTaskList)) {
                return true;
            }
            // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
            newTaskList.forEach(task -> {
                task.setInstanceId(instanceId);
                task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                task.setFailedCnt(0);
                task.setLastModifiedTime(System.currentTimeMillis());
                task.setCreatedTime(System.currentTimeMillis());
                task.setLastReportTime(-1L);
            });
    
            log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
            return taskPersistenceService.batchSave(newTaskList);
        }
    

    submitTask遍历newTaskList,挨个更新状态和时间,然后使用taskPersistenceService.batchSave(newTaskList)保存

    receiveProcessorTrackerHeartbeat

        public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
            log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
            ptStatusHolder.updateStatus(heartbeatReq);
    
            // 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
            if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
                String idlePtAddress = heartbeatReq.getAddress();
                // 该 ProcessorTracker 已销毁,重置为初始状态
                ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
                List<TaskDO> unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
                if (!CollectionUtils.isEmpty(unfinishedTask)) {
                    log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
                    unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
                }
            }
        }
    

    receiveProcessorTrackerHeartbeat用于处理ProcessorTrackerStatusReportReq,它先更新ptStatusHolder,接着对于IDLE类型判断是否还有未完成的任务,有则更新为WORKER_PROCESS_FAILED

    broadcast

        public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
    
            if (finished.get()) {
                return;
            }
    
            log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", instanceId, subInstanceId, preExecuteSuccess, preTaskId, result);
    
            // 生成集群子任务
            if (preExecuteSuccess) {
                List<String> allWorkerAddress = ptStatusHolder.getAllProcessorTrackers();
                List<TaskDO> subTaskList = Lists.newLinkedList();
                for (int i = 0; i < allWorkerAddress.size(); i++) {
                    TaskDO subTask = new TaskDO();
                    subTask.setSubInstanceId(subInstanceId);
                    subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
                    subTask.setTaskId(preTaskId + "." + i);
                    // 广播任务直接写入派发地址
                    subTask.setAddress(allWorkerAddress.get(i));
                    subTaskList.add(subTask);
                }
                submitTask(subTaskList);
            } else {
                log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result);
            }
        }
    

    broadcast方法对于preExecuteSuccess为true的会根据ptStatusHolder.getAllProcessorTrackers()来创建TaskDO,最后执行submitTask提交

    destroy

        public void destroy() {
    
            finished.set(true);
    
            Stopwatch sw = Stopwatch.createStarted();
            // 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。
            scheduledPool.shutdown();
    
            // 1. 通知 ProcessorTracker 释放资源
            TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
            stopRequest.setInstanceId(instanceId);
            ptStatusHolder.getAllProcessorTrackers().forEach(ptAddress -> {
                // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
                TransportUtils.ttStopPtInstance(stopRequest, ptAddress, workerRuntime.getTransporter());
            });
    
            // 2. 删除所有数据库数据
            boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
            if (!dbSuccess) {
                log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId);
            } else {
                log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
            }
    
            // 3. 移除顶层引用,送去 GC
            HeavyTaskTrackerManager.removeTaskTracker(instanceId);
    
            log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", instanceId, sw.stop());
    
            // 4. 强制关闭线程池
            if (!scheduledPool.isTerminated()) {
                CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow());
            }
    
        }
    

    destroy方法更新finished为true,执行scheduledPool.shutdown(),然后给AllProcessorTrackers发送TaskTrackerStopInstanceReq,接着删除该instanceId的所有task,最后对于scheduledPool还未关闭的执行shutdownNow

    stopTask

        public void stopTask() {
            destroy();
        }
    

    stopTask执行的是destroy方法

    CommonTaskTracker

    tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java

    @Slf4j
    @ToString
    public class CommonTaskTracker extends HeavyTaskTracker {
    
        /**
         * 根任务 ID
         */
        public static final String ROOT_TASK_ID = "0";
        /**
         * 最后一个任务 ID
         * 除 {@link #ROOT_TASK_ID} 外任何数都可以
         */
        public static final String LAST_TASK_ID = "9999";
    
        protected CommonTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            super(req, workerRuntime);
        }
    
        //......
    }    
    

    CommonTaskTracker继承了HeavyTaskTracker

    initTaskTracker

        protected void initTaskTracker(ServerScheduleJobReq req) {
    
            // CommonTaskTrackerTimingPool 缩写
            String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
            this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
    
            // 持久化根任务
            persistenceRootTask();
    
            // 开启定时状态检查
            int delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "13"));
            scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3, delay, TimeUnit.SECONDS);
    
            // 如果是 MR 任务,则需要启动执行器动态检测装置
            ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType());
            if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
                scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
            }
    
            // 最后启动任务派发器,否则会出现 TaskTracker 还未创建完毕 ProcessorTracker 已开始汇报状态的情况
            scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 10, 5000, TimeUnit.MILLISECONDS);
        }
    

    initTaskTracker方法初始化scheduledPool、persistenceRootTask、调度StatusCheckRunnable、对于MR任务调度WorkerDetector,最后调度Dispatcher

    persistenceRootTask

        private void persistenceRootTask() {
    
            TaskDO rootTask = new TaskDO();
            rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            rootTask.setInstanceId(instanceInfo.getInstanceId());
            rootTask.setTaskId(ROOT_TASK_ID);
            rootTask.setFailedCnt(0);
            rootTask.setAddress(workerRuntime.getWorkerAddress());
            rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
            rootTask.setCreatedTime(System.currentTimeMillis());
            rootTask.setLastModifiedTime(System.currentTimeMillis());
            rootTask.setLastReportTime(-1L);
            rootTask.setSubInstanceId(instanceId);
    
            if (taskPersistenceService.save(rootTask)) {
                log.info("[TaskTracker-{}] create root task successfully.", instanceId);
            } else {
                log.error("[TaskTracker-{}] create root task failed.", instanceId);
                throw new PowerJobException("create root task failed for instance: " + instanceId);
            }
        }
    

    persistenceRootTask先创建rootTask,然后通过taskPersistenceService.save保存

    StatusCheckRunnable

        private class StatusCheckRunnable implements Runnable {
    
            private static final long DISPATCH_TIME_OUT_MS = 15000;
    
            @SuppressWarnings("squid:S3776")
            private void innerRun() {
    
                InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
    
                long finishedNum = holder.succeedNum + holder.failedNum;
                long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
    
                log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
    
                TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
                req.setAppId(workerRuntime.getAppId());
                req.setJobId(instanceInfo.getJobId());
                req.setInstanceId(instanceId);
                req.setWfInstanceId(instanceInfo.getWfInstanceId());
                req.setTotalTaskNum(finishedNum + unfinishedNum);
                req.setSucceedTaskNum(holder.succeedNum);
                req.setFailedTaskNum(holder.failedNum);
                req.setReportTime(System.currentTimeMillis());
                req.setStartTime(createTime);
                req.setSourceAddress(workerRuntime.getWorkerAddress());
    
                boolean success = false;
                String result = null;
    
                // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
                if (unfinishedNum == 0) {
    
                    // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
                    if (finishedNum == 0) {
                        finished.set(true);
                        result = SystemInstanceResult.TASK_INIT_FAILED;
                    } else {
                        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
    
                        switch (executeType) {
    
                            // STANDALONE 只有一个任务,完成即结束
                            case STANDALONE:
                                finished.set(true);
                                List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
                                if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                                    result = SystemInstanceResult.UNKNOWN_BUG;
                                    log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
                                } else {
                                    result = allTask.get(0).getResult();
                                    success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                                }
                                break;
                            // MAP 不关心结果,最简单
                            case MAP:
                                finished.set(true);
                                success = holder.failedNum == 0;
                                result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                                break;
                            // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
                            default:
    
                                Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
                                if (lastTaskOptional.isPresent()) {
    
                                    // 存在则根据 reduce 任务来判断状态
                                    TaskDO resultTask = lastTaskOptional.get();
                                    TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
    
                                    if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
                                        finished.set(true);
                                        success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                        result = resultTask.getResult();
                                    }
    
                                } else {
    
                                    // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
                                    TaskDO newLastTask = new TaskDO();
                                    newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
                                    newLastTask.setTaskId(LAST_TASK_ID);
                                    newLastTask.setSubInstanceId(instanceId);
                                    newLastTask.setAddress(workerRuntime.getWorkerAddress());
                                    submitTask(Lists.newArrayList(newLastTask));
                                }
                        }
                    }
                }
    
                // 3. 检查任务实例整体是否超时
                if (isTimeout()) {
                    finished.set(true);
                    success = false;
                    result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
                }
    
                // 4. 执行完毕,报告服务器
                if (finished.get()) {
                    req.setResult(result);
                    // 上报追加的工作流上下文信息
                    req.setAppendedWfContext(appendedWfContext);
                    req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
                    reportFinalStatusThenDestroy(workerRuntime, req);
                    return;
                }
    
                // 5. 未完成,上报状态
                req.setInstanceStatus(InstanceStatus.RUNNING.getV());
                TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
    
                // 6.1 定期检查 -> 重试派发后未确认的任务
                long currentMS = System.currentTimeMillis();
                if (holder.workerUnreceivedNum != 0) {
                    taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
    
                        long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
                        if (elapsedTime > DISPATCH_TIME_OUT_MS) {
    
                            TaskDO updateEntity = new TaskDO();
                            updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                            // 特殊任务只能本机执行
                            if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {
                                updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                            }
                            // 失败次数 + 1
                            updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);
    
                            taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
    
                            log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
                                    instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
                        }
    
                    });
                }
    
                // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
                List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
                if (!disconnectedPTs.isEmpty()) {
                    log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
                    if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
                        ptStatusHolder.remove(disconnectedPTs);
                        log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
                    }
                }
            }
    
            /**
             * 任务是否超时
             */
            public boolean isTimeout() {
                if (instanceInfo.getInstanceTimeoutMS() > 0) {
                    return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();
                }
                return false;
            }
    
            @Override
            public void run() {
                try {
                    innerRun();
                } catch (Exception e) {
                    log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
                }
            }
        }
    

    StatusCheckRunnable实现了Runnable接口,其run方法执行innerRun;innerRun先构建TaskTrackerReportInstanceStatusReq,之后根据executeType来判断任务是否已经结束,接着判断任务是否超时,针对执行完毕的执行reportFinalStatusThenDestroy,对于未完成的通过ttReportInstanceStatus上报;对于workerUnreceivedNum不为0的会取出状态为DISPATCH_SUCCESS_WORKER_UNCHECK的任务进行更新,最后针对DisconnectedProcessorTrackers上的任务执行taskPersistenceService.updateLostTasks

    FrequentTaskTracker

    tech/powerjob/worker/core/tracker/task/heavy/FrequentTaskTracker.java

    @Slf4j
    public class FrequentTaskTracker extends HeavyTaskTracker {
    
        /**
         * 时间表达式类型
         */
        private TimeExpressionType timeExpressionType;
    
        private long timeParams;
        /**
         * 最大同时运行实例数
         */
        private int maxInstanceNum;
    
        /**
         * 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争严重推荐 LongAdder)
         */
        private AtomicLong triggerTimes;
    
        private AtomicLong succeedTimes;
    
        private AtomicLong failedTimes;
        /**
         * 任务发射器
         */
        private Launcher launcher;
        /**
         * 保存最近10个子任务的信息,供用户查询(user -> server -> worker 传递查询)
         */
        private LRUCache<Long, SubInstanceInfo> recentSubInstanceInfo;
        /**
         * 保存运行中的任务
         */
        private Map<Long, SubInstanceTimeHolder> subInstanceId2TimeHolder;
    
        private AlertManager alertManager;
    
        private static final int HISTORY_SIZE = 10;
        private static final String LAST_TASK_ID_PREFIX = "L";
        private static final int MIN_INTERVAL = 50;
    
        protected FrequentTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            super(req, workerRuntime);
        }
    
        //......
    }    
    

    FrequentTaskTracker继承了HeavyTaskTracker,它主要是用于处理秒级任务

    initTaskTracker

        protected void initTaskTracker(ServerScheduleJobReq req) {
    
            // 0. 初始化实例变量
            timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
            timeParams = Long.parseLong(req.getTimeExpression());
            maxInstanceNum = req.getMaxInstanceNum();
    
            triggerTimes = new AtomicLong(0);
            succeedTimes = new AtomicLong(0);
            failedTimes = new AtomicLong(0);
    
            recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE);
            subInstanceId2TimeHolder = Maps.newConcurrentMap();
    
            // 1. 初始化定时调度线程池
            String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
            this.scheduledPool = Executors.newScheduledThreadPool(4, factory);
            this.alertManager = constructAlertManager(req);
            // 2. 启动任务发射器
            launcher = new Launcher();
            if (timeExpressionType == TimeExpressionType.FIXED_RATE) {
                // 固定频率需要设置最小间隔
                if (timeParams < MIN_INTERVAL) {
                    throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000");
                }
                scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
            } else {
                scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS);
            }
    
            // 3. 启动任务分发器(事实上,秒级任务应该都是单机任务,且感觉不需要失败重试机制,那么 Dispatcher 的存在就有点浪费系统资源了...)
            scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
            // 4. 启动状态检查器
            scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
            // 5. 启动执行器动态检测装置
            scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
        }
    

    initTaskTracker方法主要是初始化LRUCache、scheduledPool、alertManager、调度Launcher、Dispatcher、Checker、WorkerDetector

    小结

    HeavyTaskTracker继承了TaskTracker,它也是个抽象类,其构造器主要是创建了ProcessorTrackerStatusHolder、taskId2BriefInfo、SegmentLock;它定义了抽象方法initTaskTracker;它提供了updateAppendedWfContext、updateTaskStatus、submitTask、receiveProcessorTrackerHeartbeat、broadcast方法;它实现了destroy、stopTask方法;它有两个实现类,分别是CommonTaskTracker用于处理任务派发和状态更新,FrequentTaskTracker用于处理秒级任务。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的HeavyTaskTracker

          本文链接:https://www.haomeiwen.com/subject/clsrndtx.html