美文网首页
聊聊PowerJob的LightTaskTracker

聊聊PowerJob的LightTaskTracker

作者: go4it | 来源:发表于2023-12-26 09:23 被阅读0次

    本文主要研究一下PowerJob的LightTaskTracker

    TaskTracker

    tech/powerjob/worker/core/tracker/task/TaskTracker.java

    @Slf4j
    public abstract class TaskTracker {
    
        /**
         * TaskTracker创建时间
         */
        protected final long createTime;
        /**
         * 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份
         */
        protected final long instanceId;
        /**
         * 任务实例信息
         */
        protected final InstanceInfo instanceInfo;
        /**
         * 追加的工作流上下文数据
         *
         * @since 2021/02/05
         */
        protected final Map<String, String> appendedWfContext;
        /**
         * worker 运行时元数据
         */
        protected final WorkerRuntime workerRuntime;
        /**
         * 是否结束
         */
        protected final AtomicBoolean finished;
        /**
         * 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down
         */
        protected int reportFailedCnt = 0;
    
        protected static final int MAX_REPORT_FAILED_THRESHOLD = 5;
    
        protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            this.createTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceId = req.getInstanceId();
    
            this.instanceInfo = new InstanceInfo();
    
            // PowerJob 值拷贝场景不多,引入三方值拷贝类库可能引入类冲突等问题,综合评估手写 ROI 最高
            instanceInfo.setJobId(req.getJobId());
            instanceInfo.setInstanceId(req.getInstanceId());
            instanceInfo.setWfInstanceId(req.getWfInstanceId());
            instanceInfo.setExecuteType(req.getExecuteType());
            instanceInfo.setProcessorType(req.getProcessorType());
            instanceInfo.setProcessorInfo(req.getProcessorInfo());
            instanceInfo.setJobParams(req.getJobParams());
            instanceInfo.setInstanceParams(req.getInstanceParams());
            instanceInfo.setThreadConcurrency(req.getThreadConcurrency());
            instanceInfo.setTaskRetryNum(req.getTaskRetryNum());
            instanceInfo.setLogConfig(req.getLogConfig());
    
            // 特殊处理超时时间
            if (instanceInfo.getInstanceTimeoutMS() <= 0) {
                instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);
            }
            // 只有工作流中的任务允许向工作流中追加上下文数据
            this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
            this.finished = new AtomicBoolean(false);
        }
    
        /**
         * 销毁
         */
        public abstract void destroy();
    
        /**
         * 停止任务
         */
        public abstract void stopTask();
    
    
        /**
         * 查询任务实例的详细运行状态
         *
         * @return 任务实例的详细运行状态
         */
        public abstract InstanceDetail fetchRunningStatus();
    
        //......
    }    
    

    TaskTracker是个抽象类,其构造器接收ServerScheduleJobReq、WorkerRuntime,然后根据ServerScheduleJobReq构建InstanceInfo;它定义了destroy、stopTask、fetchRunningStatus抽象方法

    LightTaskTracker

    tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java

    @Slf4j
    public class LightTaskTracker extends TaskTracker {
        /**
         * statusReportScheduledFuture
         */
        private final ScheduledFuture<?> statusReportScheduledFuture;
        /**
         * timeoutCheckScheduledFuture
         */
        private final ScheduledFuture<?> timeoutCheckScheduledFuture;
        /**
         * processFuture
         */
        private final Future<ProcessResult> processFuture;
        /**
         * 执行线程
         */
        private final AtomicReference<Thread> executeThread;
        /**
         * 处理器信息
         */
        private final ProcessorBean processorBean;
        /**
         * 上下文
         */
        private final TaskContext taskContext;
        /**
         * 任务状态
         */
        private TaskStatus status;
        /**
         * 任务开始执行的时间
         */
        private Long taskStartTime;
        /**
         * 任务执行结束的时间 或者 任务被 kill 掉的时间
         */
        private Long taskEndTime;
        /**
         * 任务处理结果
         */
        private ProcessResult result;
    
        private final AtomicBoolean timeoutFlag = new AtomicBoolean(false);
    
        protected final AtomicBoolean stopFlag = new AtomicBoolean(false);
    
        protected final AtomicBoolean destroyFlag = new AtomicBoolean(false);
    
    
        public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            super(req, workerRuntime);
            try {
                taskContext = constructTaskContext(req, workerRuntime);
                // 等待处理
                status = TaskStatus.WORKER_RECEIVED;
                // 加载 Processor
                processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
                executeThread = new AtomicReference<>();
                long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
                // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
                long initDelay = RandomUtils.nextInt(5000, 10000);
                // 上报任务状态
                statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
                // 超时控制
                if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
                    if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
                        timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
                    } else {
                        // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
                        timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
                    }
                } else {
                    timeoutCheckScheduledFuture = null;
                }
                // 提交任务到线程池
                processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask);
            } catch (Exception e) {
                log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req);
                destroy();
                throw e;
            }
    
        }
    
        //......
    }    
    

    LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后通过workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay调度checkAndReportStatus;若设置了instanceTimeout则调度timeoutCheck;最后通过workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit来执行processTask

    checkAndReportStatus

        private synchronized void checkAndReportStatus() {
            if (destroyFlag.get()) {
                // 已经被销毁,不需要上报状态
                log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status);
                return;
            }
            TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq();
            reportInstanceStatusReq.setAppId(workerRuntime.getAppId());
            reportInstanceStatusReq.setJobId(instanceInfo.getJobId());
            reportInstanceStatusReq.setInstanceId(instanceId);
            reportInstanceStatusReq.setWfInstanceId(instanceInfo.getWfInstanceId());
            reportInstanceStatusReq.setTotalTaskNum(1);
            reportInstanceStatusReq.setReportTime(System.currentTimeMillis());
            reportInstanceStatusReq.setStartTime(createTime);
            reportInstanceStatusReq.setSourceAddress(workerRuntime.getWorkerAddress());
            reportInstanceStatusReq.setSucceedTaskNum(0);
            reportInstanceStatusReq.setFailedTaskNum(0);
    
            if (stopFlag.get()) {
                if (finished.get()) {
                    // 已经被成功打断
                    destroy();
                    return;
                }
                final Thread workerThread = executeThread.get();
                if (!finished.get() && workerThread != null) {
                    // 未能成功打断任务,强制停止
                    try {
                        if (tryForceStopThread(workerThread)) {
                            finished.set(true);
                            taskEndTime = System.currentTimeMillis();
                            result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP);
                            log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
                            // 被终止的任务不需要上报状态
                            destroy();
                            return;
                        }
                    } catch (Exception e) {
                        log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e);
                    }
                }
            }
            if (finished.get()) {
                if (result.isSuccess()) {
                    reportInstanceStatusReq.setSucceedTaskNum(1);
                    reportInstanceStatusReq.setInstanceStatus(InstanceStatus.SUCCEED.getV());
                } else {
                    reportInstanceStatusReq.setFailedTaskNum(1);
                    reportInstanceStatusReq.setInstanceStatus(InstanceStatus.FAILED.getV());
                }
                // 处理工作流上下文
                if (taskContext.getWorkflowContext().getWfInstanceId() != null) {
                    reportInstanceStatusReq.setAppendedWfContext(taskContext.getWorkflowContext().getAppendedContextData());
                }
                reportInstanceStatusReq.setResult(suit(result.getMsg()));
                reportInstanceStatusReq.setEndTime(taskEndTime);
                // 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态
                reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1);
                reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq);
                return;
            }
            // 未完成的任务,只需要上报状态
            reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV());
            log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status);
            TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
        }
    

    checkAndReportStatus方法构建TaskTrackerReportInstanceStatusReq,然后根据stopFlag和finished进行对应处理,针对未完成的任务执行TransportUtils.ttReportInstanceStatus进行上报

    timeoutCheck

        private void timeoutCheck() {
            if (taskStartTime == null || System.currentTimeMillis() - taskStartTime < instanceInfo.getInstanceTimeoutMS()) {
                return;
            }
            if (finished.get() && result != null) {
                timeoutCheckScheduledFuture.cancel(true);
                return;
            }
            // 首次判断超时
            if (timeoutFlag.compareAndSet(false, true)) {
                // 超时,仅尝试打断任务
                log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS());
                processFuture.cancel(true);
                return;
            }
            if (finished.get()) {
                // 已经成功被打断
                log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime);
                return;
            }
            Thread workerThread = executeThread.get();
            if (workerThread == null) {
                return;
            }
            // 未能成功打断任务,强制终止
            try {
                if (tryForceStopThread(workerThread)) {
                    finished.set(true);
                    taskEndTime = System.currentTimeMillis();
                    result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP);
                    log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName());
                }
            } catch (Exception e) {
                log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e);
            }
        }
    
        private boolean tryForceStopThread(Thread thread) {
    
            String threadName = thread.getName();
    
            String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD);
            if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) {
                log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId);
                return false;
            }
    
            log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName);
            try {
                thread.stop();
                return true;
            } catch (Throwable t) {
                log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage());
            }
            return false;
        }    
    

    timeoutCheck先判断是否超时,接着判断是否finished,是则取消当前任务,接着更新timeoutFlag,然后通过processFuture.cancel(true)尝试打断任务;若任务未能成功打断则通过tryForceStopThread强制终止,这里用了thread.stop这个废弃方法

    processTask

        private ProcessResult processTask() {
            executeThread.set(Thread.currentThread());
            // 设置任务开始执行的时间
            taskStartTime = System.currentTimeMillis();
            status = TaskStatus.WORKER_PROCESSING;
            // 开始执行时,提交任务判断是否超时
            ProcessResult res = null;
            do {
                Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
                if (res != null && !res.isSuccess()) {
                    // 重试
                    taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);
                    log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());
                }
                try {
                    res = processorBean.getProcessor().process(taskContext);
                } catch (InterruptedException e) {
                    log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
                    Thread.currentThread().interrupt();
                    if (timeoutFlag.get()) {
                        res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
                    } else if (stopFlag.get()) {
                        res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);
                    } else {
                        res = new ProcessResult(false, e.toString());
                    }
                } catch (Exception e) {
                    log.warn("[TaskTracker-{}] process failed !", instanceId, e);
                    res = new ProcessResult(false, e.toString());
                }
                if (res == null) {
                    log.warn("[TaskTracker-{}] processor return null !", instanceId);
                    res = new ProcessResult(false, "Processor return null");
                }
            } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());
            executeThread.set(null);
            taskEndTime = System.currentTimeMillis();
            finished.set(true);
            result = res;
            status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
            // 取消超时检查任务
            if (timeoutCheckScheduledFuture != null) {
                timeoutCheckScheduledFuture.cancel(true);
            }
            log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result);
            // 执行完成后立即上报一次
            checkAndReportStatus();
            return result;
        }
    

    processTask通过一个while循环来执行,该循环的条件是处理结果非成功,重试次数小于最大重试次数,任务未超时,stopFlag为false;循环内部执行的是processorBean.getProcessor().process(taskContext),它会捕获InterruptedException及Exception;循环外则更新任务结束时间,取消timeoutCheckScheduledFuture,最后执行checkAndReportStatus进行上报

    LightTaskTracker.create

        /**
         * 静态方法创建 TaskTracker
         *
         * @param req 服务端调度任务请求
         * @return LightTaskTracker
         */
        public static LightTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
            try {
                return new LightTaskTracker(req, workerRuntime);
            } catch (Exception e) {
                reportCreateErrorToServer(req, workerRuntime, e);
            }
            return null;
        }
    

    LightTaskTracker提供了静态方法create用于创建LightTaskTracker

    TaskTrackerActor

    tech/powerjob/worker/actors/TaskTrackerActor.java

    @Slf4j
    @Actor(path = WTT_PATH)
    public class TaskTrackerActor {
    
        private final WorkerRuntime workerRuntime;
    
        public TaskTrackerActor(WorkerRuntime workerRuntime) {
            this.workerRuntime = workerRuntime;
        }
    
        /**
         * 服务器任务调度处理器
         */
        @Handler(path = WTT_HANDLER_RUN_JOB)
        public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
            log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
            Long instanceId = req.getInstanceId();
            // 区分轻量级任务模型以及重量级任务模型
            if (isLightweightTask(req)) {
                final LightTaskTracker taskTracker = LightTaskTrackerManager.getTaskTracker(instanceId);
                if (taskTracker != null) {
                    log.warn("[TaskTrackerActor] LightTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
                    return;
                }
                // 判断是否已经 overload
                if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() * LightTaskTrackerManager.OVERLOAD_FACTOR) {
                    // ignore this request
                    log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={}),current size = {}!",instanceId,LightTaskTrackerManager.currentTaskTrackerSize());
                    return;
                }
                if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum()) {
                    log.warn("[TaskTrackerActor] this worker will be overload soon,current size = {}!",LightTaskTrackerManager.currentTaskTrackerSize());
                }
                // 创建轻量级任务
                LightTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> LightTaskTracker.create(req, workerRuntime));
            } else {
                HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(instanceId);
                if (taskTracker != null) {
                    log.warn("[TaskTrackerActor] HeavyTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
                    return;
                }
                // 判断是否已经 overload
                if (HeavyTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum()) {
                    // ignore this request
                    log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={})! current size = {},", instanceId, HeavyTaskTrackerManager.currentTaskTrackerSize());
                    return;
                }
                // 原子创建,防止多实例的存在
                HeavyTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> HeavyTaskTracker.create(req, workerRuntime));
            }
        }
    
        //......
    }        
    

    TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,然后通过isLightweightTask判断任务模型,是轻量级任务的话,则通过LightTaskTrackerManager.getTaskTracker(instanceId)获取taskTracker,接着判断当前实例的LightTaskTracker数量是否过多,过多则直接返回;最后通过LightTaskTrackerManager.atomicCreateTaskTracker来维护instanceId与LightTaskTracker的关系,若不存在则通过LightTaskTracker.create(req, workerRuntime)创建LightTaskTracker

    isLightweightTask

        private boolean isLightweightTask(ServerScheduleJobReq serverScheduleJobReq) {
            final ExecuteType executeType = ExecuteType.valueOf(serverScheduleJobReq.getExecuteType());
            // 非单机执行的一定不是
            if (executeType != ExecuteType.STANDALONE){
                return false;
            }
            TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(serverScheduleJobReq.getTimeExpressionType());
            // 固定频率以及固定延迟的也一定不是
            return timeExpressionType != TimeExpressionType.FIXED_DELAY && timeExpressionType != TimeExpressionType.FIXED_RATE;
        }
    

    isLightweightTask的判断逻辑是如果executeType不是单机类型则不是轻量级任务,接着判断serverScheduleJobReq的timeExpressionType,类型不是FIXED_DELAY也不是FIXED_RATE的才是轻量级任务

    小结

    LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后调度checkAndReportStatus、timeoutCheck;最后执行processTask(没有把任务处理放到start方法,这些都在构造器里执行了);TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,用于根据请求的instanceId来创建和执行LightTaskTracker(通过ConcurrentHashMap来维护instanceId与LightTaskTracker的关系,避免重复执行)。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的LightTaskTracker

          本文链接:https://www.haomeiwen.com/subject/dwllndtx.html