本系列基于 Conductor
release v3.5.2
1. 概述
本文主要分享Workflows API的实现,以Redis实现为例,分析Metadata的实现方法。
Workflows API 功能为开始和管理工作流
2. Start Workflow
开始一个工作流时,先在com.netflix.conductor.core.dal.ExecutionDAOFacade#createWorkflow
方法中把工作流精简结构( WorkflowModel )保存到 redis 以及把 workflowId
放进DECIDER_QUEUE
队列中。
接着在com.netflix.conductor.core.execution.WorkflowExecutor
第394行
执行decide
触发工作流执行。
Schedule Task
com.netflix.conductor.core.execution.DeciderService
Decider evaluates the state of the workflow by inspecting the current state along with the
blueprint. The result of the evaluation is either to schedule further tasks, complete/fail the
workflow or do nothing.
com.netflix.conductor.core.execution.DeciderService#getTasksToBeScheduled
第855行
实例化 TaskModel 并给 input 设置值
com.netflix.conductor.core.execution.WorkflowExecutor#scheduleTask
第1715行
把 TaskModel 持久化
com.netflix.conductor.core.execution.WorkflowExecutor#addTaskToQueue
把任务加入队列中。队列名以任务名称为主组合 domain、isolationGroup、executionNameSpace 组成。
// taskType 为 TaskDef 的 name 字段
public static String getQueueName(
String taskType, String domain, String isolationGroup, String executionNameSpace)
3. Poll Task
com.netflix.conductor.service#poll
网友评论