美文网首页
聊聊powerjob的failedTaskNum

聊聊powerjob的failedTaskNum

作者: go4it | 来源:发表于2024-03-14 10:01 被阅读0次

    本文主要研究一下powerjob的failedTaskNum

    InstanceStatisticsHolder

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

        @Data
        protected static class InstanceStatisticsHolder {
            // 等待派发状态(仅存在 TaskTracker 数据库中)
            protected long waitingDispatchNum;
            // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行
            protected long workerUnreceivedNum;
            // ProcessorTracker确认接收,存在与线程池队列中,排队执行
            protected long receivedNum;
            // ProcessorTracker正在执行
            protected long runningNum;
            protected long failedNum;
            protected long succeedNum;
    
            public long getTotalTaskNum() {
                return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;
            }
        }
    

    InstanceStatisticsHolder用于存储task的各种状态的数量

    fetchRunningStatus

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java

        public InstanceDetail fetchRunningStatus() {
    
            InstanceDetail detail = new InstanceDetail();
            // 填充基础信息
            detail.setActualTriggerTime(createTime);
            detail.setStatus(InstanceStatus.RUNNING.getV());
            detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress());
    
            // 填充详细信息
            InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
            InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
            taskDetail.setSucceedTaskNum(holder.succeedNum);
            taskDetail.setFailedTaskNum(holder.failedNum);
            taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
            detail.setTaskDetail(taskDetail);
    
            return detail;
        }
    

    CommonTaskTracker的fetchRunningStatus会执行getInstanceStatisticsHolder获取InstanceStatisticsHolder,之后用holder.failedNum填充taskDetail的failedTaskNum

    getInstanceStatisticsHolder

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

        protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {
    
            Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
            InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
    
            holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);
            holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
            holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);
            holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);
            holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);
            holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
            return holder;
        }
    

    getInstanceStatisticsHolder通过taskPersistenceService.getTaskStatusStatistics获取status2Num,然后获取状态为TaskStatus.WORKER_PROCESS_FAILED的数量作为failedNum

    getTaskStatusStatistics

    powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java

        public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
            try {
    
                SimpleTaskQuery query = new SimpleTaskQuery();
                query.setInstanceId(instanceId);
                query.setSubInstanceId(subInstanceId);
                query.setQueryContent("status, count(*) as num");
                query.setOtherCondition("GROUP BY status");
    
                return execute(() -> {
                    List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
                    Map<TaskStatus, Long> result = Maps.newHashMap();
                    dbRES.forEach(row -> {
                        // H2 数据库都是大写...
                        int status = Integer.parseInt(String.valueOf(row.get("status")));
                        long num = Long.parseLong(String.valueOf(row.get("num")));
                        result.put(TaskStatus.of(status), num);
                    });
                    return result;
                });
            }catch (Exception e) {
                log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
            }
            return Maps.newHashMap();
        }
    

    TaskPersistenceService的getTaskStatusStatistics执行select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status

    TaskTracker will have a retry

    LightTaskTracker

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java

        private ProcessResult processTask() {
            executeThread.set(Thread.currentThread());
            // 设置任务开始执行的时间
            taskStartTime = System.currentTimeMillis();
            status = TaskStatus.WORKER_PROCESSING;
            // 开始执行时,提交任务判断是否超时
            ProcessResult res = null;
            do {
                Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
                if (res != null && !res.isSuccess()) {
                    // 重试
                    taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);
                    log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());
                }
                try {
                    res = processorBean.getProcessor().process(taskContext);
                } catch (InterruptedException e) {
                    log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
                    Thread.currentThread().interrupt();
                    if (timeoutFlag.get()) {
                        res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
                    } else if (stopFlag.get()) {
                        res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);
                    } else {
                        res = new ProcessResult(false, e.toString());
                    }
                } catch (Exception e) {
                    log.warn("[TaskTracker-{}] process failed !", instanceId, e);
                    res = new ProcessResult(false, e.toString());
                }
                if (res == null) {
                    log.warn("[TaskTracker-{}] processor return null !", instanceId);
                    res = new ProcessResult(false, "Processor return null");
                }
            } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());
            executeThread.set(null);
            taskEndTime = System.currentTimeMillis();
            finished.set(true);
            result = res;
            status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
            // 取消超时检查任务
            if (timeoutCheckScheduledFuture != null) {
                timeoutCheckScheduledFuture.cancel(true);
            }
            log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result);
            // 执行完成后立即上报一次
            checkAndReportStatus();
            return result;
        }
    

    LightTaskTracker的processTask的时候,在ProcessResult不为成功的时候,会递增重试次数,打印[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}

    updateTaskStatus

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

    public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
    
            if (finished.get()) {
                return;
            }
            TaskStatus nTaskStatus = TaskStatus.of(newStatus);
    
            int lockId = taskId.hashCode();
            try {
    
                // 阻塞获取锁
                segmentLock.lockInterruptible(lockId);
                TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);
    
                // 缓存中不存在,从数据库查
                if (taskBriefInfo == null) {
                    Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                    if (taskOpt.isPresent()) {
                        TaskDO taskDO = taskOpt.get();
                        taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
                    } else {
                        // 理论上不存在这种情况,除非数据库异常
                        log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
                        taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
                    }
                    // 写入缓存
                    taskId2BriefInfo.put(taskId, taskBriefInfo);
                }
    
                // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
                if (taskBriefInfo.getLastReportTime() > reportTime) {
                    log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
                            instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
                    return;
                }
                // 检查状态转移是否合法,fix issue 404
                if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
                    log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
                            instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
                    return;
                }
    
                // 此时本次请求已经有效,先更新相关信息
                taskBriefInfo.setLastReportTime(reportTime);
                taskBriefInfo.setStatus(nTaskStatus);
    
                // 处理失败的情况
                int configTaskRetryNum = instanceInfo.getTaskRetryNum();
                if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
    
                    // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
                    Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                    // 查询DB再失败的话,就不重试了...
                    if (taskOpt.isPresent()) {
                        int failedCnt = taskOpt.get().getFailedCnt();
                        if (failedCnt < configTaskRetryNum) {
    
                            TaskDO updateEntity = new TaskDO();
                            updateEntity.setFailedCnt(failedCnt + 1);
    
                            /*
                            地址规则:
                            1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
                            2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
                            3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
                             */
                            String taskName = taskOpt.get().getTaskName();
                            ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
                            if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
                                updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                            }
    
                            updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                            updateEntity.setLastReportTime(reportTime);
    
                            boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
                            if (retryTask) {
                                log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
                                return;
                            }
                        }
                    }
                }
    
                // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
                result = result == null ? "" : result;
                boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
    
                if (!updateResult) {
                    log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
                }
    
            } catch (InterruptedException ignore) {
                // ignore
            } catch (Exception e) {
                log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
            } finally {
                segmentLock.unlock(lockId);
            }
        }
    

    HeavyTaskTracker在updateTaskStatus的时候,对于retryTask会打印[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.
    这里前提是failedCnt < configTaskRetryNum,而这个configTaskRetryNum为instanceInfo.getTaskRetryNum()

    taskRetryNum

    powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java

    @Data
    public class InstanceInfo implements Serializable {
    
        /**
         * 基础信息
         */
        private Long jobId;
        private Long instanceId;
        private Long wfInstanceId;
    
        /**
         * 任务执行处理器信息
         */
        // 任务执行类型,单机、广播、MR
        private String executeType;
        // 处理器类型(JavaBean、Jar、脚本等)
        private String processorType;
        // 处理器信息
        private String processorInfo;
        // 定时类型
        private int timeExpressionType;
    
        /**
         * 超时时间
         */
        // 整个任务的总体超时时间
        private long instanceTimeoutMS;
    
        /**
         * 任务运行参数
         */
        // 任务级别的参数,相当于类的static变量
        private String jobParams;
        // 实例级别的参数,相当于类的普通变量
        private String instanceParams;
    
    
        // 每台机器的处理线程数上限
        private int threadConcurrency;
        // 子任务重试次数(任务本身的重试机制由server控制)
        private int taskRetryNum;
    
        private String logConfig;
    }
    

    InstanceInfo定义了taskRetryNum,用于指定子任务的重试次数,默认是1

    StatusCheckRunnable

    powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java

        private class StatusCheckRunnable implements Runnable {
    
            private static final long DISPATCH_TIME_OUT_MS = 15000;
    
            @SuppressWarnings("squid:S3776")
            private void innerRun() {
    
                InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
    
                long finishedNum = holder.succeedNum + holder.failedNum;
                long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
    
                log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
    
                TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
                req.setAppId(workerRuntime.getAppId());
                req.setJobId(instanceInfo.getJobId());
                req.setInstanceId(instanceId);
                req.setWfInstanceId(instanceInfo.getWfInstanceId());
                req.setTotalTaskNum(finishedNum + unfinishedNum);
                req.setSucceedTaskNum(holder.succeedNum);
                req.setFailedTaskNum(holder.failedNum);
                req.setReportTime(System.currentTimeMillis());
                req.setStartTime(createTime);
                req.setSourceAddress(workerRuntime.getWorkerAddress());
    
                boolean success = false;
                String result = null;
    
                // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
                if (unfinishedNum == 0) {
    
                    // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
                    if (finishedNum == 0) {
                        finished.set(true);
                        result = SystemInstanceResult.TASK_INIT_FAILED;
                    } else {
                        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
    
                        switch (executeType) {
    
                            // STANDALONE 只有一个任务,完成即结束
                            case STANDALONE:
                                finished.set(true);
                                List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
                                if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                                    result = SystemInstanceResult.UNKNOWN_BUG;
                                    log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
                                } else {
                                    result = allTask.get(0).getResult();
                                    success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                                }
                                break;
                            // MAP 不关心结果,最简单
                            case MAP:
                                finished.set(true);
                                success = holder.failedNum == 0;
                                result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                                break;
                            // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
                            default:
    
                                Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
                                if (lastTaskOptional.isPresent()) {
    
                                    // 存在则根据 reduce 任务来判断状态
                                    TaskDO resultTask = lastTaskOptional.get();
                                    TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
    
                                    if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
                                        finished.set(true);
                                        success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                        result = resultTask.getResult();
                                    }
    
                                } else {
    
                                    // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
                                    TaskDO newLastTask = new TaskDO();
                                    newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
                                    newLastTask.setTaskId(LAST_TASK_ID);
                                    newLastTask.setSubInstanceId(instanceId);
                                    newLastTask.setAddress(workerRuntime.getWorkerAddress());
                                    submitTask(Lists.newArrayList(newLastTask));
                                }
                        }
                    }
                }
    
                // 3. 检查任务实例整体是否超时
                if (isTimeout()) {
                    finished.set(true);
                    success = false;
                    result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
                }
    
                // 4. 执行完毕,报告服务器
                if (finished.get()) {
                    req.setResult(result);
                    // 上报追加的工作流上下文信息
                    req.setAppendedWfContext(appendedWfContext);
                    req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
                    reportFinalStatusThenDestroy(workerRuntime, req);
                    return;
                }
    
                // 5. 未完成,上报状态
                req.setInstanceStatus(InstanceStatus.RUNNING.getV());
                TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
    
                // 6.1 定期检查 -> 重试派发后未确认的任务
                long currentMS = System.currentTimeMillis();
                if (holder.workerUnreceivedNum != 0) {
                    taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
    
                        long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
                        if (elapsedTime > DISPATCH_TIME_OUT_MS) {
    
                            TaskDO updateEntity = new TaskDO();
                            updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                            // 特殊任务只能本机执行
                            if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {
                                updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                            }
                            // 失败次数 + 1
                            updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);
    
                            taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
    
                            log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
                                    instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
                        }
    
                    });
                }
    
                // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
                List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
                if (!disconnectedPTs.isEmpty()) {
                    log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
                    if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
                        ptStatusHolder.remove(disconnectedPTs);
                        log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
                    }
                }
            }
    
            /**
             * 任务是否超时
             */
            public boolean isTimeout() {
                if (instanceInfo.getInstanceTimeoutMS() > 0) {
                    return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();
                }
                return false;
            }
    
            @Override
            public void run() {
                try {
                    innerRun();
                } catch (Exception e) {
                    log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
                }
            }
        }
    

    StatusCheckRunnable默认每隔13s会汇报一次TaskTrackerReportInstanceStatusReq,针对MAP任务,它通过holder.failedNum == 0来判断任务实例是否执行成功与否,true则更新instance的status为InstanceStatus.SUCCEED,否则为InstanceStatus.FAILED

    小结

    powerjob的map reduce任务实例执行结果展示的failed次数取的是failedTaskNum,它来源于TaskPersistenceService的getTaskStatusStatistics执行select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status的TaskStatus.WORKER_PROCESS_FAILED的数量;默认子任务会有1次重试的机会。若有子任务失败,则最终该任务实例的状态为失败。而目前powerjob没有入口针对这些失败的子任务再进行重试,只能单独重新执行整个map reduce任务。

    相关文章

      网友评论

          本文标题:聊聊powerjob的failedTaskNum

          本文链接:https://www.haomeiwen.com/subject/wqqwzdtx.html