美文网首页Ovirt程序员
【Ovirt 笔记】任务机制分析与整理

【Ovirt 笔记】任务机制分析与整理

作者: 58bc06151329 | 来源:发表于2018-01-26 10:50 被阅读21次

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 3.4.5 版本。

基础说明

  • 作为执行命令 Command 行为的一种监控。
  • 在引擎管理门户的任务执行列表中可以进行查看。

数据库相关

  • job 中存放了执行的任务(行为)信息。
字段名称 字段说明 字段类型 其它
job_id 任务 ID uuid 不能为空
action_type 操作类型 character varying(50) 不能为空 VdcActionType
description 显示任务内容 text 不能为空
status 任务执行状态 character varying(32) 不能为空 JobExecutionStatus
owner_id 任务执行者 uuid \
visible 任务是否显示 boolean 默认为 true
start_time 任务开始时间 timestamp with time zone 不能为空
end_time 任务结束时间 timestamp with time zone \
last_update_time 任务信息最后修改时间 timestamp with time zone \
correlation_id 相关操作(Command)的线程 ID character varying(50) 不能为空
is_external 是否外部任务 boolean 默认为 false
is_auto_cleared 任务是否定时自动清除 boolean 默认为 true
  • 操作类型枚举类 VdcActionType。通过配置 ExecutionMessages.properties 资源文件实现国际化。
  • 任务执行状态枚举类 JobExecutionStatus。通过配置 Enums.properties 资源文件实现国际化。
任务执行状态 说明 场景
STARTED 任务启动 任务开始启动时
FINISHED 任务完成 任务正常结束时
FAILED 任务失败 任务执行失败时
ABORTED 任务终止 分布式存储执行任务状态返回的任务终止状态
UNKNOWN 任务未知 初始化默认状态
  • 定时自动清除

    • 后台创建调度 JobRepositoryCleanupManager 实现。
    • 调度中执行存储过程 DeleteCompletedJobsOlderThanDate
    • 调度时间可配置
      • SucceededJobCleanupTimeInMinutes 执行成功的任务结束时间后多久被删除,结束 10 分钟后被删除。
      • FailedJobCleanupTimeInMinutes 执行失败的任务结束后多久被删除,结束 1 个小时后被删除。
      • JobCleanupRateInMinutes 执行任务的频率,默认每 10 分钟执行一次。
  • step 中存放了执行任务的所有步骤信息。

字段名称 字段说明 字段类型 其它
step_id 步骤 ID uuid 不能为空
parent_step_id 父类步骤 ID uuid \
job_id 所属任务 ID uuid 不能为空
step_type 步骤类型 character varying(32) 不能为空 StepEnum
description 步骤描述信息 text 不能为空
step_number 步骤序号 integer 不能为空,从 0 开始
status 步骤状态 character varying(32) 不能为空 JobExecutionStatus
start_time 步骤开始时间 timestamp with time zone 不能为空
end_time 步骤结束时间 timestamp with time zone \
correlation_id 相关操作(Command)的线程 ID character varying(50) 不能为空
external_system_type 外部系统类型 character varying(32) \ ExternalSystemType
external_id 外部系统 ID uuid \
is_external 是否外部步骤 boolean 默认为 false
  • 步骤类型枚举类 StepEnum。通过配置 ExecutionMessages.properties 资源文件实现国际化。
步骤类型 说明
VALIDATING 验证
EXECUTING 执行
FINALIZING 结束
  • 步骤类型枚举类 StepEnum 中可以设置了 AsyncTaskType 异步类型。
  • 设置异步类型的步骤,可以添加异步任务,表 async_tasks 中保存了异步任务信息。
字段名称 字段说明 字段类型 其它
task_id 异步任务 ID uuid 不能为空
action_type 操作类型 ID integer 不能为空
status 任务状态 integer 不能为空 AsyncTaskStatusEnum
result 异步任务返回 integer 不能为空 AsyncTaskResultEnum
action_parameters 执行操作 Command 参数序列化 text \
action_params_class 执行操作 Command 参数类名称 character varying(256) \
step_id 步骤 ID uuid \
command_id 命令 ID uuid 不能为空
started_at 任务开始时间 timestamp with time zone \
storage_pool_id 存储域 ID uuid \
task_type 任务类型 integer 不能为空,默认为 0,AsyncTaskType
task_parameters 异步任务参数序列化 text \
task_params_class 异步任务参数类 character varying(256) \
vdsm_task_id 返回的 vdsm 运行异步任务 ID uuid 通过返回 uuidReturn.mUuid 中获取
root_command_id 父类 Command 的 ID uuid \
  • 异步任务执行状态枚举类 AsyncTaskStatusEnum
异步状态 状态码 状态说明
unknown 0 任务未知状态。
init 1 任务还没有开始前的初始化状态。
running 2 任务正在运行状态。
finished 3 任务完成状态。
aborting 4 任务异常终止状态。
cleaning 5 终止请求失败的任务状态,等待清理。
  • 异步任务执行返回枚举类 AsyncTaskResultEnum
异步任务返回 返回说明
success 成功
failure 失败
cleanSuccess 清除成功
cleanFailure 清除失败
  • 异步任务实际上是通过异步任务工厂(AsyncTaskFactory),构建了 SPMAsyncTask SPM 异步任务。任务状态为 AsyncTaskState
SPM 异步任务状态 说明
Initializing 正在初始化状态
Polling 轮询状态
Ended 结束状态
AttemptingEndAction 尝试结束操作状态
ClearFailed 清除错误状态
Cleared 清除干净状态
  • 外部系统类型枚举类 ExternalSystemType
    • VDSM
    • GLUSTER

功能模块详细

  • Backend 执行 Command 命令,创建默认的 Command 命令执行上下文对象。
@Override
public VdcReturnValueBase runAction(CommandBase<?> action, ExecutionContext executionContext) {
        return runAction(action, true, ExecutionHandler.createDefaultContexForTasks(executionContext));
}

public static CommandContext createDefaultContexForTasks(ExecutionContext parentContext, EngineLock lock) {
        ExecutionContext executionContext = new ExecutionContext();

        if (parentContext != null) {
            if (parentContext.getJob() != null) {
                Step parentStep = parentContext.getParentTasksStep();
                if (parentStep != null) {
                    executionContext.setParentTasksStep(parentStep);
                }
            } else {
                executionContext.setParentTasksStep(parentContext.getParentTasksStep());
            }
        }
        return new CommandContext(executionContext, lock);
}
  • 命令上下文对象 CommandContext 包含

    • ExecutionContext 执行任务上下文对象,用于执行过程中的任务流程记录。
    • EngineLock 引擎全局锁,用于引擎资源 锁机制 的实现。
    • CompensationContext 补偿上下文对象,用于执行命令 补偿机制 的实现。
  • 创建执行任务上下文对象,设置默认步骤。

    • 默认包含 VALIDATING(验证)EXECUTING(执行) 两个步骤。
inal static List<StepEnum> DEFAULT_STEPS_LIST = Arrays.asList(StepEnum.VALIDATING, StepEnum.EXECUTING);

public ExecutionContext() {
        stepsList = DEFAULT_STEPS_LIST;
}
  • 一个 Command 命令对应一个执行任务上下文 ExecutionContext
    • 属性 job 中包含执行任务上下文的任务信息。
    • 属性 step 中包含执行任务上下文的步骤信息。
    • 属性 ExecutionMethod 标明执行的 Command 命令是一个任务还是一个步骤。
    • 属性 isMonitored 定义监管记录该任务或步骤。
    • 属性 shouldEndJob 定义该任务或步骤完成的时候同时结束所在任务。
    • 属性 parentTasksStep 定义当前处于 EXECUTING(执行) 状态的步骤。
    • 属性 isTasksMonitored 定义监管记录当前处于 EXECUTING(执行) 状态的步骤。
    • 属性 isCompleted 定义任务上下文是否完成。
    • 属性 isJobRequired 定义是否为必须的任务。

任务步骤的操作相关。

  • ExecutionHandler 类中任务操作相关实现。
创建任务
public static Job createJob(VdcActionType actionType, CommandBase<?> command) {
        Job job = new Job();

        job.setId(Guid.newGuid());
        job.setActionType(actionType);
        job.setDescription(ExecutionMessageDirector.resolveJobMessage(actionType, command.getJobMessageProperties()));
        job.setJobSubjectEntities(getSubjectEntities(command.getPermissionCheckSubjects()));
        job.setOwnerId(command.getUserId());
        job.setStatus(JobExecutionStatus.STARTED);
        job.setStartTime(new Date());
        job.setCorrelationId(command.getCorrelationId());

        return job;
}
  • 任务描述的生成。通过配置 ExecutionMessages.properties 资源文件实现国际化。
    • 关键字 job. 开头为任务描述。
    • 关键字 step. 开头为步骤描述。
ExecutionMessageDirector.resolveJobMessage(actionType, command.getJobMessageProperties())
  • 执行任务时相关的实体对象保存。
job.setJobSubjectEntities(getSubjectEntities(command.getPermissionCheckSubjects()))
  • 设置任务的开始时间和开始状态。
job.setStartTime(new Date());
job.setCorrelationId(command.getCorrelationId());
获取任务对象
private static Job getJob(CommandBase<?> command, VdcActionType actionType) {
        VdcActionParametersBase params = command.getParameters();
        Job job;
        // if Job is external, we had already created the Job by AddExternalJobCommand, so just get it from DB
        if (params.getJobId() != null) {
            job = DbFacade.getInstance().getJobDao().get((Guid)params.getJobId());
        }
        else {
            job = createJob(actionType, command);
            JobRepositoryFactory.getJobRepository().saveJob(job);
        }
        return job;
}
  • 数据中存在则直接从数据库中获取。
  • 不存在则创建新的任务对象。
获取与执行任务相关的实体类
private static Map<Guid, VdcObjectType> getSubjectEntities(List<PermissionSubject> permSubjectList) {
        Map<Guid, VdcObjectType> entities = new HashMap<Guid, VdcObjectType>();
        for (PermissionSubject permSubj : permSubjectList) {
            if (permSubj.getObjectId() != null && permSubj.getObjectType() != null) {
                entities.put(permSubj.getObjectId(), permSubj.getObjectType());
            }
        }
        return entities;
}
结束任务(在任务上下文中生效)
public static void endJob(ExecutionContext context, boolean exitStatus) {
        if (context == null) {
            return;
        }

        Job job = context.getJob();

        try {
            if (context.isMonitored()) {
                if (context.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
                    if (context.shouldEndJob() || !(job.isAsyncJob() && exitStatus)) {
                        context.setCompleted(true);
                        endJob(exitStatus, job);
                    }
                } else {
                    Step step = context.getStep();
                    if (context.getExecutionMethod() == ExecutionMethod.AsStep && step != null) {
                        if (context.shouldEndJob()) {
                            if (job == null) {
                                job = JobRepositoryFactory.getJobRepository().getJob(step.getJobId());
                            }

                            if (job != null) {
                                context.setCompleted(true);
                                endJob(exitStatus, job);
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error(e);
        }
}

private static void endJob(boolean exitStatus, Job job) {
        job.markJobEnded(exitStatus);
        try {
            JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
        } catch (Exception e) {
            log.errorFormat("Failed to end Job {0}, {1}", job.getId(), job.getActionType().name(), e);
        }
}
public void updateCompletedJobAndSteps(final Job job) {
        TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {

            @Override
            public Void runInTransaction() {
                jobDao.update(job);
                stepDao.updateJobStepsCompleted(job.getId(), job.getStatus(), job.getEndTime());
                return null;
            }
        });
}
设置任务为异步任务(在任务上下文中生效)
public static void setAsyncJob(ExecutionContext executionContext, boolean isAsync) {
        if (executionContext == null) {
            return;
        }
        Job job = executionContext.getJob();
        if (executionContext.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
            job.setIsAsyncJob(isAsync);
        }
}
根据设置的步骤状态添加步骤(在任务上下文中生效)
public static Step addTaskStep(ExecutionContext context, StepEnum stepName, String description) {
        if (context == null) {
            return null;
        }
        Step step = null;

        if (context.isTasksMonitored()) {
            Step parentTaskStep = context.getParentTasksStep();
            if (parentTaskStep != null) {
                step = addSubStep(parentTaskStep, stepName, description);
            }
        }

        return step;
}
根据设置的步骤状态结束步骤
public static void endTaskStep(Guid stepId, JobExecutionStatus exitStatus) {
        try {
            if (stepId != null) {
                Step step = JobRepositoryFactory.getJobRepository().getStep(stepId);

                if (step != null) {
                    step.markStepEnded(exitStatus);
                    JobRepositoryFactory.getJobRepository().updateStep(step);
                }
            }
        } catch (Exception e) {
            log.errorFormat("Failed to terminate step {0} with status {1}", stepId, exitStatus, e);
        }
}
根据设置的步骤状态结束步骤(在任务上下文中生效)
public static void endTaskJob(ExecutionContext context, boolean exitStatus) {
        if (context == null) {
            return;
        }

        try {
            if (context.getExecutionMethod() == ExecutionMethod.AsJob && context.getJob() != null) {
                endJob(context, exitStatus);
            } else {
                Step parentStep = context.getStep();
                if (context.getExecutionMethod() == ExecutionMethod.AsStep && parentStep != null) {
                    Step finalizingStep = parentStep.getStep(StepEnum.FINALIZING);
                    if (finalizingStep != null) {
                        finalizingStep.markStepEnded(exitStatus);
                        JobRepositoryFactory.getJobRepository().updateStep(finalizingStep);
                    }
                    parentStep.markStepEnded(exitStatus);
                    JobRepositoryFactory.getJobRepository().updateStep(parentStep);

                    List<Step> steps = DbFacade.getInstance().getStepDao().getStepsByJobId(parentStep.getJobId());
                    boolean hasChildStepsRunning = false;
                    for (Step step : steps) {
                        if (step.getStatus() == JobExecutionStatus.STARTED && step.getParentStepId() != null) {
                            hasChildStepsRunning = true;
                            break;
                        }
                    }
                    if (!hasChildStepsRunning) {
                        endJob(exitStatus, JobRepositoryFactory.getJobRepository().getJob(parentStep.getJobId()));
                    }
                }
            }
        } catch (RuntimeException e) {
            log.error(e);
        }
}
检测任务或步骤中是否包含外部系统步骤
public static boolean checkIfJobHasTasks(ExecutionContext context) {
        if (context == null || !context.isMonitored()) {
            return false;
        }

        try {
            Guid jobId = null;
            if (context.getExecutionMethod() == ExecutionMethod.AsJob && context.getJob() != null) {
                jobId = context.getJob().getId();
            } else if (context.getExecutionMethod() == ExecutionMethod.AsStep && context.getStep() != null) {
                jobId = context.getStep().getId();
            }

            if (jobId != null) {
                return DbFacade.getInstance().getJobDao().checkIfJobHasTasks(jobId);
            }
        } catch (RuntimeException e) {
            log.error(e);
        }

        return false;
}
  • 通过存储过程 CheckIfJobHasTasks 进行了查询。
SELECT EXISTS(
        SELECT *
        FROM   step
        WHERE  job_id = v_job_id
        AND    external_id is not null
        AND    external_system_type in ('VDSM','GLUSTER'));
通过默认的行为监控准备运行的命令
public static void prepareCommandForMonitoring(CommandBase<?> command,
            VdcActionType actionType,
            boolean runAsInternal) {

        ExecutionContext context = command.getExecutionContext();
        if (context == null) {
            context = new ExecutionContext();
        }

        try {
            boolean isMonitored = shouldMonitorCommand(actionType, runAsInternal);

            // A monitored job is created for monitored external flows
            if (isMonitored || context.isJobRequired()) {
                Job job = getJob(command, actionType);
                context.setExecutionMethod(ExecutionMethod.AsJob);
                context.setJob(job);
                command.setExecutionContext(context);
                command.setJobId(job.getId());
                context.setMonitored(true);
            }
        } catch (Exception e) {
            log.errorFormat("Failed to prepare command of type {0} for monitoring due to error {1}",
                    actionType.name(),
                    ExceptionUtils.getMessage(e),
                    e);
        }
}
创建步骤(在任务上下文中生效)
public static Step addStep(ExecutionContext context, StepEnum stepName, String description, boolean isExternal) {
        if (context == null) {
            return null;
        }
        Step step = null;

        if (context.isMonitored()) {
            if (description == null) {
                description = ExecutionMessageDirector.getInstance().getStepMessage(stepName);
            }

            try {
                Job job = context.getJob();
                if (context.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
                    step = job.addStep(stepName, description);
                    try {
                        step.setExternal(isExternal);
                        JobRepositoryFactory.getJobRepository().saveStep(step);
                    } catch (Exception e) {
                        log.errorFormat("Failed to save new step {0} for job {1}, {2}.", stepName.name(),
                                job.getId(), job.getActionType().name(), e);
                        job.getSteps().remove(step);
                        step = null;
                    }
                } else {
                    Step contextStep = context.getStep();
                    if (context.getExecutionMethod() == ExecutionMethod.AsStep && contextStep != null) {
                        step = addSubStep(contextStep, stepName, description);
                        step.setExternal(isExternal);
                    }
                }
            } catch (Exception e) {
                log.error(e);
            }
        }
        return step;
}
  • Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),添加步骤到任务中。
step = job.addStep(stepName, description);
                    try {
                        step.setExternal(isExternal);
                        JobRepositoryFactory.getJobRepository().saveStep(step);
                    } catch (Exception e) {
                        log.errorFormat("Failed to save new step {0} for job {1}, {2}.", stepName.name(),
                                job.getId(), job.getActionType().name(), e);
                        job.getSteps().remove(step);
                        step = null;
}
  • Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsStep),添加步骤到父类步骤中。
if (context.getExecutionMethod() == ExecutionMethod.AsStep && contextStep != null) {
                        step = addSubStep(contextStep, stepName, description);
                        step.setExternal(isExternal);
结束步骤(在任务上下文中生效)
public static void endStep(ExecutionContext context, Step step,
            boolean exitStatus) {
        if (context == null) {
            return;
        }
        if (context.isMonitored()) {
            Job job = context.getJob();
            try {
                if (step != null) {
                    step.markStepEnded(exitStatus);
                    JobRepositoryFactory.getJobRepository().updateStep(step);
                }

                if (context.getExecutionMethod() == ExecutionMethod.AsJob
                        && job != null && !exitStatus) {
                    // step failure will cause the job to be marked as failed
                    context.setCompleted(true);
                    job.markJobEnded(false);
                    JobRepositoryFactory.getJobRepository()
                            .updateCompletedJobAndSteps(job);
                } else {
                    Step parentStep = context.getStep();
                    if (context.getExecutionMethod() == ExecutionMethod.AsStep
                            && parentStep != null) {
                        context.setCompleted(true);
                        if (!exitStatus) {
                            job.markJobEnded(false);
                            JobRepositoryFactory.getJobRepository()
                                    .updateCompletedJobAndSteps(job);
                        }
                    }
                }
            } catch (Exception e) {
                log.error(e);
            }
        }
}
  • 设置该步骤最终的结束状态为失败状态(exitStatus = false),Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),设置任务结束并且执行失败。
  • Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),设置所属父步骤结束并且所属任务执行失败。
step.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(step);
context.setCompleted(true);
job.markJobEnded(false);
JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
  • 设置该步骤最终的结束状态为成功状态(exitStatus = true),Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsStep),设置父类步骤结束并且执行成功。
context.setCompleted(true);
创建子步骤
private static Step addSubStep(Step parentStep, StepEnum stepName, String description) {
        Step step = null;

        if (parentStep != null) {
            if (description == null) {
                description = ExecutionMessageDirector.getInstance().getStepMessage(stepName);
            }
            step = parentStep.addStep(stepName, description);

            try {
                JobRepositoryFactory.getJobRepository().saveStep(step);
            } catch (Exception e) {
                log.errorFormat("Failed to save new step {0} for step {1}, {2}.", stepName.name(),
                        parentStep.getId(), parentStep.getStepType().name(), e);
                parentStep.getSteps().remove(step);
                step = null;
            }
        }
        return step;
}
  • 为一个步骤添加子步骤。
创建子步骤(在任务上下文中生效)
public static Step addSubStep(ExecutionContext context, Step parentStep, StepEnum newStepName, String description, boolean isExternal) {
        Step step = null;

        if (context == null || parentStep == null) {
            return null;
        }

        try {
            if (context.isMonitored()) {
                if (description == null) {
                    description = ExecutionMessageDirector.getInstance().getStepMessage(newStepName);
                }

                if (context.getExecutionMethod() == ExecutionMethod.AsJob) {
                    if (DbFacade.getInstance().getStepDao().exists(parentStep.getId())) {
                        if (parentStep.getJobId().equals(context.getJob().getId())) {
                            step = parentStep.addStep(newStepName, description);
                        }
                    }
                } else if (context.getExecutionMethod() == ExecutionMethod.AsStep) {
                    step = parentStep.addStep(newStepName, description);
                }
            }
            if (step != null) {
                step.setExternal(isExternal);
                JobRepositoryFactory.getJobRepository().saveStep(step);
            }
        } catch (Exception e) {
            log.error(e);
        }
        return step;
}
  • Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),父步骤所属该任务,将子步骤创建至父步骤中。
 if (parentStep.getJobId().equals(context.getJob().getId())) {
       step = parentStep.addStep(newStepName, description);
}
  • Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),将子步骤创建至该步骤中。
 step = parentStep.addStep(newStepName, description);
开始执行最终的完成步骤(在任务上下文中生效)
public static Step startFinalizingStep(ExecutionContext executionContext) {
        if (executionContext == null) {
            return null;
        }
        Step step = null;

        try {
            if (executionContext.getExecutionMethod() == ExecutionMethod.AsJob) {
                Job job = executionContext.getJob();
                if (job != null) {
                    Step executingStep = job.getStep(StepEnum.EXECUTING);
                    Step finalizingStep =
                            job.addStep(StepEnum.FINALIZING,
                                    ExecutionMessageDirector.getInstance().getStepMessage(StepEnum.FINALIZING));

                    if (executingStep != null) {
                        executingStep.markStepEnded(true);
                        JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
                                finalizingStep);
                    } else {
                        JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
                    }
                }
            } else if (executionContext.getExecutionMethod() == ExecutionMethod.AsStep) {
                Step parentStep = executionContext.getStep();
                if (parentStep != null) {
                    Step executingStep = parentStep.getStep(StepEnum.EXECUTING);
                    Step finalizingStep =
                            parentStep.addStep(StepEnum.FINALIZING, ExecutionMessageDirector.getInstance()
                                    .getStepMessage(StepEnum.FINALIZING));
                    if (executingStep != null) {
                        executingStep.markStepEnded(true);
                        JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
                                finalizingStep);
                    } else {
                        JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
                    }
                }
            }
        } catch (Exception e) {
            log.error(e);
        }
        return step;
}
  • Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),任务中创建 FINALIZING(结束)步骤。
Step executingStep = job.getStep(StepEnum.EXECUTING);
                    Step finalizingStep =
                            job.addStep(StepEnum.FINALIZING,
                                    ExecutionMessageDirector.getInstance().getStepMessage(StepEnum.FINALIZING));

                    if (executingStep != null) {
                        executingStep.markStepEnded(true);
                        JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
                                finalizingStep);
                    } else {
                        JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
                    }
  • Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),创建 FINALIZING(结束)子步骤。
Step executingStep = parentStep.getStep(StepEnum.EXECUTING);
                    Step finalizingStep =
                            parentStep.addStep(StepEnum.FINALIZING, ExecutionMessageDirector.getInstance()
                                    .getStepMessage(StepEnum.FINALIZING));
                    if (executingStep != null) {
                        executingStep.markStepEnded(true);
                        JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
                                finalizingStep);
                    } else {
                        JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
                    }
public void updateExistingStepAndSaveNewStep(final Step existingStep, final Step newStep) {
        TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {

            @Override
            public Void runInTransaction() {
                jobDao.updateJobLastUpdateTime(existingStep.getJobId(), new Date());
                stepDao.update(existingStep);
                stepDao.save(newStep);
                return null;
            }
        });
    }
更新外部步骤信息
public static void updateStepExternalId(Step step, Guid externalId, ExternalSystemType systemType) {
        if (step != null) {
            step.getExternalSystem().setId(externalId);
            step.getExternalSystem().setType(systemType);
            try {
                JobRepositoryFactory.getJobRepository().updateStep(step);
            } catch (Exception e) {
                log.errorFormat("Failed to save step {0}, {1} for system-type {2} with id {3}",
                        step.getId(),
                        step.getStepType().name(),
                        systemType.name(),
                        externalId,
                        e);

            }
        }
}
创建内部任务上下文
public static CommandContext createInternalJobContext(EngineLock lock) {
        ExecutionContext executionContext = new ExecutionContext();
        executionContext.setJobRequired(true);
        executionContext.setMonitored(true);
        return new CommandContext(executionContext, lock);
}
  • 任务为必须执行任务
executionContext.setJobRequired(true);
  • 任务为必须监控的任务
executionContext.setMonitored(true);
  • 追加资源对象锁
return new CommandContext(executionContext, lock);
创建任务上下文子项(以当前任务上下文为父项)
public static CommandContext createDefaultContexForTasks(ExecutionContext parentContext, EngineLock lock) {
        ExecutionContext executionContext = new ExecutionContext();

        if (parentContext != null) {
            if (parentContext.getJob() != null) {
                Step parentStep = parentContext.getParentTasksStep();
                if (parentStep != null) {
                    executionContext.setParentTasksStep(parentStep);
                }
            } else {
                executionContext.setParentTasksStep(parentContext.getParentTasksStep());
            }
        }
        return new CommandContext(executionContext, lock);
}
将给定实体的所有任务状态更新为一致
public static void updateSpecificActionJobCompleted(Guid entityId, VdcActionType actionType, boolean status) {
        try {
            List<Job> jobs = JobRepositoryFactory.getJobRepository().getJobsByEntityAndAction(entityId, actionType);
            for (Job job : jobs) {
                if (job.getStatus() == JobExecutionStatus.STARTED)
                    job.markJobEnded(status);
                JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
            }
        } catch (RuntimeException e) {
            log.error(e);
        }
}
public List<Job> getJobsByEntityAndAction(Guid entityId, VdcActionType actionType) {
        List<Job> jobList = new ArrayList<Job>();
        List<Guid> jobIdsList = jobSubjectEntityDao.getJobIdByEntityId(entityId);

        for (Guid jobId : jobIdsList) {
            Job job = jobDao.get(jobId);
            if (job != null && job.getActionType() == actionType) {
                jobList.add(job);
            }
        }
        return jobList;
}

Command 命令执行流程

Command 命令执行流程
  • 橘黄色方法中,包含有 Command 命令任务相关的操作。
  • MultipleActionsRunner.executeValidatedCommand 方法中创建了 Command 命令的任务记录。
ExecutionHandler.prepareCommandForMonitoring(command, command.getActionType(), command.isInternalExecution());
  • CommandBase.executeAction 方法中记录了 VALIDATING(验证) 的步骤。
validatingStep = ExecutionHandler.addStep(getExecutionContext(), StepEnum.VALIDATING, null);

actionAllowed = getReturnValue().getCanDoAction() || internalCanDoAction();
if (!isExternal) {
       ExecutionHandler.endStep(getExecutionContext(), validatingStep, actionAllowed);
}
  • CommandBase.execute 方法中记录了 EXECUTING(执行) 的步骤。
ExecutionHandler.addStep(getExecutionContext(), StepEnum.EXECUTING, null);

if (!hasTasks() && !ExecutionHandler.checkIfJobHasTasks(getExecutionContext())) {
         ExecutionHandler.endJob(getExecutionContext(), getSucceeded());
}
  • Command 命令执行类中,通过 getExecutionContext() 就能获取当前 Command 命令的任务上下文,从而获取该命令相关的任务和步骤。

异步任务执行流程

异步任务执行流程
  • 添加异步任务在执行命令 Command 任务行为中的步骤。
Step taskStep = ExecutionHandler.addTaskStep(getExecutionContext(),  StepEnum.getStepNameByTaskType(asyncTaskCreationInfo.getTaskType()), description);
  • 建立异步任务连接
SPMAsyncTask task = concreteCreateTask(taskId, asyncTaskCreationInfo, parentCommand)
  • 保存异步任务数据到 async_tasks 表中。
AsyncTaskUtils.addOrUpdateTaskInDB(task);
  • 更新异步任务外部系统类型和 vdsm 任务 ID 等。
ExecutionHandler.updateStepExternalId(taskStep, vdsmTaskId, ExternalSystemType.VDSM);
  • CommandBase.executeAction 方法执行完成后,清除异步任务表 async_tasks 中数据。
if (!getReturnValue().getSucceeded()) {
        clearAsyncTasksWithOutVdsmId();
}

private void clearAsyncTasksWithOutVdsmId() {
        if (!getReturnValue().getTaskPlaceHolderIdList().isEmpty()) {
            TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {
                @Override
                public Void runInTransaction() {
                    for (Guid asyncTaskId : getReturnValue().getTaskPlaceHolderIdList()) {
                        AsyncTasks task = getAsyncTaskDao().get(asyncTaskId);
                        if (task != null && Guid.isNullOrEmpty(task.getVdsmTaskId())) {
                            AsyncTaskManager.removeTaskFromDbByTaskId(task.getTaskId());
                        }

                    }
                    return null;
                }
            });
        }
}

相关文章

网友评论

    本文标题:【Ovirt 笔记】任务机制分析与整理

    本文链接:https://www.haomeiwen.com/subject/lbyuaxtx.html