conductor

作者: dozenx | 来源:发表于2022-06-30 13:31 被阅读0次

<meta charset="utf-8">

官网

官方文档

https://netflix.github.io/conductor/configuration/systask/#set-variable-task

安装部署文档

先来安装线docker-compose

官方安装文档

https://netflix.github.io/conductor/architecture/#installing-and-running

现在部署在192.168.212.52上 用docker 部署 起了四个服务

cd docker docker-compose up

http://localhost:5000/

workflow ui

http://192.168.212.52:5000/

swagger

http://192.168.212.52:8080/index.html

http://192.168.212.52:8080/index.html#!/Metadata_Management/registerTaskDef

主要概念

TaskDefinition

Workflow Definition

System Tasks

conductor的设计思想

task 通过name 关联自己的具体定义 。

在创建之初 通过WorkflowTaskCoordinator

WorkflowTaskCoordinator coordinator = builder.withWorkers(worker1, worker2).withThreadCount(threadCount).withTaskClient(taskClient).build();

来关联work1 work2,

Worker worker1 = new LeaderRatifyWorker("leaderRatify"); Worker worker2 = new ManagerRatifyWorker("managerRatify");

来绑定具体的java 类

class LeaderRatifyWorker implements Worker { private String taskDefName; public SampleWorker(String taskDefName) { this.taskDefName = taskDefName; } @Override public String getTaskDefName() { return taskDefName; } @Override public TaskResult execute(Task task) { System.out.printf("Executing %s%n", taskDefName); System.out.println("staffName:" + task.getInputData().get("staffName")); System.out.println("staffDepartment:" + task.getInputData().get("staffDepartment")); TaskResult result = new TaskResult(task); result.setStatus(TaskResult.Status.COMPLETED); //Register the output of the task result.getOutputData().put("outputKey1", "value"); result.getOutputData().put("oddEven", 1); result.getOutputData().put("mod", 4); result.getOutputData().put("leaderAgree", "yes"); result.getOutputData().put("leaderDisagree", "no"); return result; } }

https://blog.csdn.net/u013970991/article/details/83379320

除了官方的文档 还有很多的非官方工具也很好 。

https://gitee.com/netflix/conductor/blob/master/RELATED.md

https://github.com/flaviostutz/conductor-ui

Start UI Server

The UI Server is in the directory conductor/ui.

To run it, you need Node.js installed and gulp installed with npm i -g gulp.

In a terminal other than the one running the Conductor server:

cd ui npm i gulp watch

Or Start all the services using docker-compose

cd docker docker-compose up

http://192.168.212.52:5000/#/workflow/metadata?_k=dhkcts

http://antv-2018.alipay.com/zh-cn/g6/3.x/demo/tool/grid.html g6 网址

decideCasejson格式

1创建简单流程

一个简单的task

https://netflix.github.io/conductor/apispec/

Task & Workflow Metadata

<colgroup><col style="width: 232px;"><col style="width: 232px;"><col style="width: 232px;"></colgroup>
|

Endpoint

|

Description

|

Input

|
|

GET /metadata/taskdefs

|

Get all the task definitions

|

n/a

|
|

GET /metadata/taskdefs/{taskType}

|

Retrieve task definition

|

Task Name

|
|

POST /metadata/taskdefs

|

Register new task definitions

|

List of Task Definitions

|
|

PUT /metadata/taskdefs

|

Update a task definition

|

A Task Definition

|
|

DELETE /metadata/taskdefs/{taskType}

|

Delete a task definition

|

Task Name

|
| | | |
|

GET /metadata/workflow

|

Get all the workflow definitions

|

n/a

|
|

POST /metadata/workflow

|

Register new workflow

|

Workflow Definition

|
|

PUT /metadata/workflow

|

Register/Update new workflows

|

List of Workflow Definition

|
|

GET /metadata/workflow/{name}?version=

|

Get the workflow definitions

|

workflow name, version (optional)

|

http://192.168.212.52:8080/metadata/workflow

流程接口使用

conductor 通过swagger ui 使用

https://www.jianshu.com/p/4eae1af8afa8

访问http://192.168.212.52:8080/index.html

测试 测试http请求

跑一个任务

Workflow Management

填入参数

{

"name": "string", //名字

"version": 0,//版本号

"correlationId": "string",

"input": {},

"taskToDomain": {},

"workflowDef": {

"ownerApp": "string",

"createTime": 0,

"updateTime": 0,

"createdBy": "string",

"updatedBy": "string",

"name": "string",

"description": "string",

"version": 0,

"tasks": [

{

"name": "string",

"taskReferenceName": "string",

"description": "string",

"inputParameters": {},

"type": "string",

"dynamicTaskNameParam": "string",

"caseValueParam": "string",

"caseExpression": "string",

"scriptExpression": "string",

"decisionCases": {},

"dynamicForkJoinTasksParam": "string",

"dynamicForkTasksParam": "string",

"dynamicForkTasksInputParamName": "string",

"defaultCase": [

{}

],

"forkTasks": [

[

{}

]

],

"startDelay": 0,

"subWorkflowParam": {

"name": "string",

"version": 0,

"taskToDomain": {},

"workflowDefinition": {}

},

"joinOn": [

"string"

],

"sink": "string",

"optional": false,

"taskDefinition": {

"ownerApp": "string",

"createTime": 0,

"updateTime": 0,

"createdBy": "string",

"updatedBy": "string",

"name": "string",

"description": "string",

"retryCount": 0,

"timeoutSeconds": 0,

"inputKeys": [

"string"

],

"outputKeys": [

"string"

],

"timeoutPolicy": "RETRY",

"retryLogic": "FIXED",

"retryDelaySeconds": 0,

"responseTimeoutSeconds": 0,

"concurrentExecLimit": 0,

"inputTemplate": {},

"rateLimitPerFrequency": 0,

"rateLimitFrequencyInSeconds": 0,

"isolationGroupId": "string",

"executionNameSpace": "string",

"ownerEmail": "string",

"pollTimeoutSeconds": 0

},

"rateLimited": false,

"defaultExclusiveJoinTask": [

"string"

],

"asyncComplete": false,

"loopCondition": "string",

"loopOver": [

{}

],

"retryCount": 0

}

],

"inputParameters": [

"string"

],

"outputParameters": {},

"failureWorkflow": "string",

"schemaVersion": 0,

"restartable": false,

"workflowStatusListenerEnabled": false,

"ownerEmail": "string",

"timeoutPolicy": "TIME_OUT_WF",

"timeoutSeconds": 0,

"variables": {}

},

"externalInputPayloadStoragePath": "string",

"priority": 0

}

报错

{ "status": 500, "message": "java.lang.NullPointerException: TaskDef name cannot be null", "retryable": false }

新手教程

Creating Task definitions创建新的任务

curl -X POST \ http://192.168.212.52:8080/api/metadata/taskdefs \ -H 'Content-Type: application/json' \ -d '[ { "name": "verify_if_idents_are_added", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "371452875@qq.com" }, { "name": "add_idents", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "371452875@qq.com" },

{ "name": "http_login", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "371452875@qq.com" },

]'

登录ui界面

[图片上传失败...(image-659660-1656567088793)]

<colgroup><col style="width: 117px;"><col style="width: 100px;"><col style="width: 123px;"><col style="width: 137px;"><col style="width: 196px;"><col style="width: 105px;"><col style="width: 164px;"><col style="width: 144px;"><col style="width: 211px;"><col style="width: 116px;"></colgroup>
|

Name/Version

|

Owner App

|

Timeout Policy

|

Timeout Seconds

|

Response Timeout Seconds

|

Retry Count

|

Concurrent Exec Limit

|

Rate Limit Amount

|

Rate Limit Frequency Seconds

|

Retry Logic

|
|

add_idents

| |

TIME_OUT_WF

|

300

|

180

|

3

| |

0

|

1

|

FIXED (10 seconds)

|

Creating Workflow Definition 创建流程定义

Creating Workflow definition is almost similar. We shall use the Task definitions created above. Note that same Task definitions can be used in multiple workflows, or for multipe times in same Workflow (that's where taskReferenceName is useful).

创建流程定义是差不多的,我们将使用上面的任务定义.

注意同样的任务定义能被多个流程使用,或者再同个流程里多次使用.(所以taskReferenceName 很有用)

一个流程没有任何任务如下

{ "name": "add_netflix_identation", "description": "Adds Netflix Identation to video files.", "version": 1, "schemaVersion": 2, "tasks": [] }

任何任务定义都有输入参数

curl -X POST \ http://192.168.212.52:8080/api/metadata/workflow \ -H 'Content-Type: application/json' \ -d '{ "name": "add_netflix_identation", "description": "Adds Netflix Identation to video files.", "version": 2, "schemaVersion": 2, "ownerEmail": "371452875@qq.com", "tasks": [ { "name": "verify_if_idents_are_added", "taskReferenceName": "ident_verification", "inputParameters": { "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE" }, { "name": "decide_task", "taskReferenceName": "is_idents_added", "inputParameters": { "case_value_param": "{ident_verification.output.is_idents_added}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "false": [ { "name": "add_idents", "taskReferenceName": "add_idents_by_type", "inputParameters": { "identType": "{workflow.input.identType}", "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE" } ] } } ] }'

创建成功

<colgroup><col style="width: 256px;"><col style="width: 315px;"><col style="width: 465px;"></colgroup>
|

Name/Version

|

Input Parameters

|

Tasks

|
|

add_netflix_identation / 2

|

[]

|

["verify_if_idents_are_added","decide_task"]

|

启动流程

curl -X POST \

http://192.168.212.52:8080/api/workflow/add_netflix_identation \ -H 'Content-Type: application/json' \ -d '{ "identType": "animation", "contentId": "my_unique_content_id" }'

成功的话返回结果 是一个workflow id

dc06c0a2-af71-4f40-9630-8064704ce08f

查询执行结果 ui的话调用有500错误 直接用curl 或者 swagger

curl -X GET --header 'Accept: application/json' 'http://192.168.212.52:8080/api/workflow/dc06c0a2-af71-4f40-9630-8064704ce08f?includeTasks=true'

http://192.168.212.52:8080/api/workflow/dc06c0a2-af71-4f40-9630-8064704ce08f?includeTasks=true

Response Body

{ "ownerApp": "", "createTime": 1611546032744, "updateTime": 1611546039265, "status": "RUNNING", "endTime": 0, "workflowId": "dc06c0a2-af71-4f40-9630-8064704ce08f", "tasks": [ { "taskType": "verify_if_idents_are_added", "status": "SCHEDULED", "inputData": { "contentId": "my_unique_content_id" }, "referenceTaskName": "ident_verification", "retryCount": 0, "seq": 1, "pollCount": 0, "taskDefName": "verify_if_idents_are_added", "scheduledTime": 1611546039265, "startTime": 0, "endTime": 0, "updateTime": 0, "startDelayInSeconds": 0, "retried": false, "executed": false, "callbackFromWorker": true, "responseTimeoutSeconds": 180, "workflowInstanceId": "dc06c0a2-af71-4f40-9630-8064704ce08f", "workflowType": "add_netflix_identation", "taskId": "1f95f7ad-9b03-4e41-922d-50922cc282a6", "callbackAfterSeconds": 0, "outputData": {}, "workflowTask": { "name": "verify_if_idents_are_added", "taskReferenceName": "ident_verification", "inputParameters": { "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, "taskDefinition": { "createTime": 1611540085124, "createdBy": "", "name": "verify_if_idents_are_added", "retryCount": 3, "timeoutSeconds": 300, "inputKeys": [], "outputKeys": [], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 10, "responseTimeoutSeconds": 180, "inputTemplate": {}, "rateLimitPerFrequency": 0, "rateLimitFrequencyInSeconds": 1, "ownerEmail": "371452875@qq.com" }, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] }, "rateLimitPerFrequency": 0, "rateLimitFrequencyInSeconds": 1, "workflowPriority": 0, "iteration": 0, "taskDefinition": { "present": true }, "loopOverTask": false, "queueWaitTime": 0, "taskStatus": "SCHEDULED" } ], "input": { "identType": "animation", "contentId": "my_unique_content_id" }, "output": {}, "workflowType": "add_netflix_identation", "version": 2, "schemaVersion": 2, "taskToDomain": {}, "failedReferenceTaskNames": [], "workflowDefinition": { "createTime": 1611545817508, "name": "add_netflix_identation", "description": "Adds Netflix Identation to video files.", "version": 2, "tasks": [ { "name": "verify_if_idents_are_added", "taskReferenceName": "ident_verification", "inputParameters": { "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, "taskDefinition": { "createTime": 1611540085124, "createdBy": "", "name": "verify_if_idents_are_added", "retryCount": 3, "timeoutSeconds": 300, "inputKeys": [], "outputKeys": [], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 10, "responseTimeoutSeconds": 180, "inputTemplate": {}, "rateLimitPerFrequency": 0, "rateLimitFrequencyInSeconds": 1, "ownerEmail": "371452875@qq.com" }, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] }, { "name": "decide_task", "taskReferenceName": "is_idents_added", "inputParameters": { "case_value_param": "{ident_verification.output.is_idents_added}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "false": [ { "name": "add_idents", "taskReferenceName": "add_idents_by_type", "inputParameters": { "identType": "{workflow.input.identType}", "contentId": "${workflow.input.contentId}" }, "type": "SIMPLE", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, "taskDefinition": { "createTime": 1611540085137, "createdBy": "", "name": "add_idents", "retryCount": 3, "timeoutSeconds": 300, "inputKeys": [], "outputKeys": [], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 10, "responseTimeoutSeconds": 180, "inputTemplate": {}, "rateLimitPerFrequency": 0, "rateLimitFrequencyInSeconds": 1, "ownerEmail": "371452875@qq.com" }, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] } ] }, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] } ], "inputParameters": [], "outputParameters": {}, "schemaVersion": 2, "restartable": true, "workflowStatusListenerEnabled": false, "ownerEmail": "371452875@qq.com", "timeoutPolicy": "ALERT_ONLY", "timeoutSeconds": 0, "variables": {} }, "priority": 0, "variables": {}, "lastRetriedTime": 0, "startTime": 1611546032744, "workflowName": "add_netflix_identation", "workflowVersion": 2 }

尝试发送http 请求

将之前的taskDef 该找下

curl -X POST \ http://192.168.212.52:8080/api/metadata/workflow \ -H 'Content-Type: application/json' \ -d '{ "name": "add_netflix_identation", "description": "Adds Netflix Identation to video files.", "version": 2, "schemaVersion": 2, "ownerEmail": "371452875@qq.com", "tasks": [ { "name": "verify_if_idents_are_added", "taskReferenceName": "ident_verification", "inputParameters": { "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE" }, { "name": "decide_task", "taskReferenceName": "is_idents_added", "inputParameters": { "case_value_param": "{ident_verification.output.is_idents_added}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "false": [ { "name": "add_idents", "taskReferenceName": "add_idents_by_type", "inputParameters": { "identType": "{workflow.input.identType}", "contentId": "{workflow.input.contentId}" }, "type": "SIMPLE" } ] } } ] }'

30页

业务背景

版本修订

整体的设计架构设计

叮咚怎么和开发平台集成

怎么设计ci di

创建控制器service

怎么床间dao

产品的设计

ui 设计

功能模块

相关文章

网友评论

      本文标题:conductor

      本文链接:https://www.haomeiwen.com/subject/bopfbrtx.html