介绍
Argo Workflows 是一个开源容器原生工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。
-
定义工作流中的每个步骤都是一个容器的工作流。
-
将多步骤工作流建模为任务序列或使用有向无环图 (DAG) 捕获任务之间的依赖关系。
-
使用 Kubernetes 上的 Argo Workflows 在很短的时间内轻松运行机器学习或数据处理的计算密集型作业。
-
在 Kubernetes 上本地运行 CI/CD pipeline,无需配置复杂的软件开发产品。
安装Argo Workflows
CLI
Mac
# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-darwin-amd64.gz
# Unzip
gunzip argo-darwin-amd64.gz
# Make binary executable
chmod +x argo-darwin-amd64
# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo
# Test installation
argo version
Linux
# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-linux-amd64.gz
# Unzip
gunzip argo-linux-amd64.gz
# Make binary executable
chmod +x argo-linux-amd64
# Move binary to path
mv ./argo-linux-amd64 /usr/local/bin/argo
# Test installation
argo version
Controller 和 Server
kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/install.yaml
核心概念
Workflow
Workflow是 Argo 中最重要的资源,具有两个重要功能:
-
它定义了要执行的工作流。
-
它存储工作流的状态。
Workflow Spec
要执行的工作流在 Workflow.spec
字段中定义。工作流规范的核心结构是模板列表和入口点。Workflow spec的核心结构是一组 templates
和一个 entrypoint
。
templates
可以松散地认为是“功能”:它们定义要执行的指令。entrypoint
字段定义了“main”函数将是什么——即首先执行的模板。
下面是一个只有一个 template 的 Workflow 的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world- # Name of this Workflow
spec:
entrypoint: whalesay # Defines "whalesay" as the "main" template
templates:
- name: whalesay # Defining the "whalesay" template
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"] # This template runs "cowsay" in the "whalesay" image with arguments "hello world"
模板类型
有6种 模板类型,可以分为两类。
模板定义
这些模板定义了要完成的工作,通常在一个容器中执行。
Container
可能是最通用的模板类型,将会调度一个 Container。模板的 spec 定义和 Kubernetes container spec定义相同。
示例:
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
Script
是对 container 的一种简便的封装。spec 定义和 container 相同,但增加了 source
字段:允许您就地定义脚本。该脚本将保存到文件中并为您执行。脚本的执行结果会自动导入到 Argo 变量 {{tasks.<NAME>.outputs.result}}
或 {{steps.<NAME>.outputs.result}}
中(取决于调用方式)。
- name: gen-random-int
script:
image: python:alpine3.6
command: [python]
source: |
import random
i = random.randint(1, 100)
print(i)
Resource
直接操作集群资源。可用于 get、create、apply、delete、replace 或 patch 集群中的资源。
这个示例会在集群是创建一个 ConfigMap 资源:
- name: k8s-owner-reference
resource:
action: create
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
generateName: owned-eg-
data:
some: value
Suspend
Suspend 模板将暂停执行一段时间或直到手动恢复。Suspend 模板可以通过 CLI(使用 argo resume)、API 接口或 UI 来恢复。
示例:
- name: delay
suspend:
duration: "20s"
模板调用器
这些模板用于调用其他模板并提供执行控制。
Steps
Steps 模板允许您在一系列步骤中定义您的任务。该模板的结构是“一组列表”。外部列表顺序执行,内部列表并行执行。如果你想顺序执行内部列表,请使用 Synchronization特性。您可以设置多种选项来控制执行,例如 when
:有条件地执行步骤的子句。
在这个示例中,首先执行 step1,待其执行完毕,再并行执行 step2a 和 step2b。
- name: hello-hello-hello
steps:
- - name: step1
template: prepare-data
- - name: step2a
template: run-data-first-half
- name: step2b
template: run-data-second-half
DAG
dag 模板允许您将任务定义为依赖关系图。在 DAG 中,您列出所有任务并设置在特定任务开始之前必须完成哪些其他任务。没有任何依赖关系的任务将立即运行。
在这个示例中,A 首先执行,待其执行完毕后,B和C并行执行,待B和C都执行完毕后,执行D:
- name: diamond
dag:
tasks:
- name: A
template: echo
- name: B
dependencies: [A]
template: echo
- name: C
dependencies: [A]
template: echo
- name: D
dependencies: [B, C]
template: echo
自定义资源类型
WorkflowTemplates
v2.4 and after
介绍
WorkflowTemplates 是集群中 Workflows 的定义。这允许您创建一个常用模板库,并通过直接提交它们(v2.7 及更高版本)或从您的 Workflows 中引用它们来重用它们。
WorkflowTemplate vs template
由于历史原因,WorkflowTemplate 和 template 存在一定的命名冲突,下面我们来介绍这两个术语的区别:
- template(小写)是 Workflow 中的一个任务,或者是 templates 字段下的一个 WorkflowTemplate。当你定义一个 Workflow 时,你必须至少定义一个(但通常是多个)template。template 可以是 container、script、dag、steps、resources 或 suspend 类型,也可以引用一个 entrypoint 或其它 dag 和 step 的模板。
这是一个有两个 templates 的 Workflow 的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello # We reference our first "template" here
templates:
- name: hello # The first "template" in this Workflow, it is referenced by "entrypoint"
steps: # The type of this "template" is "steps"
- - name: hello
template: whalesay # We reference our second "template" here
arguments:
parameters: [{name: message, value: "hello1"}]
- name: whalesay # The second "template" in this Workflow, it is referenced by "hello"
inputs:
parameters:
- name: message
container: # The type of this "template" is "container"
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
- WorkflowTemplate 用于定义集群中的 Workflow。由于它是 Workflow 的定义,它也包含 templates。这些 templates 可以引用当前 WorkflowTemplate,也可以引用集群内的其它 Workflows 和 WorkflowTemplates。更多信息请参考 Referencing Other WorkflowTemplates。
WorkflowTemplate Spec
v2.7 and after
在 v2.7 及更高版本中,WorkflowTemplates 支持 WorkflowSpec 中的所有字段(除 priority
外,它必须在 WorkflowSpec 中配置)。你可以将任何现有 Workflow 转换为 WorkflowTemplate,方法是将 kind: Workflow 替换为 kind: WorkflowTemplate。
v2.4 – 2.6
v2.4 - v2.6 中的 WorkflowTemplates 只是部分和 Workflow 定义相同,仅支持 templates
和 arguments
字段。
这将不是 v2.4 - v2.6 中的有效 WorkflowTemplate(注意 entrypoint 字段):
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-submittable
spec:
entrypoint: whalesay-template # Fields other than "arguments" and "templates" not supported in v2.4 - v2.6
arguments:
parameters:
- name: message
value: hello world
templates:
- name: whalesay-template
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
但是,这将是一个有效的 WorkflowTemplate:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-submittable
spec:
arguments:
parameters:
- name: message
value: hello world
templates:
- name: whalesay-template
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
使用 workflowMetadata 向工作流添加 labels/annotations
2.10.2 and after
要自动向从 WorkflowTemplates 创建的工作流添加 labels/annotations,请使用 workflowMetadata。
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-submittable
spec:
workflowMetadata:
labels:
example-label: example-value
使用参数
在 WorkflowTemplate 中使用参数时,请注意以下几点:
- 使用全局参数时,您可以在 Workflow 中实例化全局变量,然后在 WorkflowTemplate 中直接引用它们。如下所示:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: hello-world-template-global-arg
spec:
serviceAccountName: argo
templates:
- name: hello-world
container:
image: docker/whalesay
command: [cowsay]
args: ["{{workflow.parameters.global-parameter}}"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-wf-global-arg-
spec:
serviceAccountName: argo
entrypoint: whalesay
arguments:
parameters:
- name: global-parameter
value: hello
templates:
- name: whalesay
steps:
- - name: hello-world
templateRef:
name: hello-world-template-global-arg
template: hello-world
- 使用本地参数时,必须在 WorkflowTemplate 内的模板定义中提供本地参数的值。如下所示:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: hello-world-template-local-arg
spec:
templates:
- name: hello-world
inputs:
parameters:
- name: msg
value: "hello world"
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.msg}}"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-local-arg-
spec:
entrypoint: whalesay
templates:
- name: whalesay
steps:
- - name: hello-world
templateRef:
name: hello-world-template-local-arg
template: hello-world
引用其它 WorkflowTemplates
您可以使用 templateRef 字段从另一个 WorkflowTemplates 引用 template。正如您在同一个 Workflow 引用其他 templates 的方式一样,您应该在 steps 或 dag 模板中这样做。
以下是来自 steps 模板的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
steps: # You should only reference external "templates" in a "steps" or "dag" "template".
- - name: call-whalesay-template
templateRef: # You can reference a "template" from another "WorkflowTemplate" using this field
name: workflow-template-1 # This is the name of the "WorkflowTemplate" CRD that contains the "template" you want
template: whalesay-template # This is the name of the "template" you want to reference
arguments: # You can pass in arguments as normal
parameters:
- name: message
value: "hello world"
您也可以使用 dag 模板进行类似操作:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
dag:
tasks:
- name: call-whalesay-template
templateRef:
name: workflow-template-1
template: whalesay-template
arguments:
parameters:
- name: message
value: "hello world"
您永远不应该直接在 template 对象上引用另一个 template(在 steps 或 dag 模板之外)。这包括使用 template 和 templateRef。此行为已弃用,不再受支持,并将在未来版本中删除。
以下是不应使用的已弃用参考的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
template: # You should NEVER use "template" here. Use it under a "steps" or "dag" template (see above).
templateRef: # You should NEVER use "templateRef" here. Use it under a "steps" or "dag" template (see above).
name: workflow-template-1
template: whalesay-template
arguments: # Arguments here are ignored. Use them under a "steps" or "dag" template (see above).
parameters:
- name: message
value: "hello world"
从 WorkflowTemplate spec 创建 Workflow
您可以使用 workflowTemplateRef
从 WorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与工作流模板参数合并。下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中将 entrypoint 和 arguments 传递给 WorkflowTemplate:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
entrypoint: whalesay-template
arguments:
parameters:
- name: message
value: "from workflow"
workflowTemplateRef:
name: workflow-template-submittable
下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中使用 WorkflowTemplate 的 entrypoint 和 arguments 的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
workflowTemplateRef:
name: workflow-template-submittable
管理 WorkflowTemplates
CLI
您可以创建一些示例模板,如下所示:
argo template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/templates.yaml
然后使用这些模板之一提交工作流:
argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/hello-world.yaml
2.7 and after
然后提交一个 WorkflowTemplate 作为 Workflow:
argo submit --from workflowtemplate/workflow-template-submittable
如果您需要将 WorkflowTemplate 作为带参数的 Workflow 提交:
argo submit --from workflowtemplate/workflow-template-submittable -p param1=value1
kubectl
使用 kubectl apply -f 和 kubectl get wftmpl
通过 Argo CD 实现 GitOps
可以使用 Argo CD 中 GitOps 中管理 WorkflowTemplate 资源
UI
WorkflowTemplate 资源也可以由 UI 管理。
当从 UI 提交 WorkflowTemplates 时,用户可以在 enum 下指定选项以启用下拉列表选择。
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-with-enum-values
spec:
entrypoint: argosay
arguments:
parameters:
- name: message
value: one
enum:
- one
- two
- three
templates:
- name: argosay
inputs:
parameters:
- name: message
value: '{{workflow.parameters.message}}'
container:
name: main
image: 'argoproj/argosay:v2'
command:
- /argosay
args:
- echo
- '{{inputs.parameters.message}}'
ClusterWorkflowTemplates
v2.8 and after
介绍
ClusterWorkflowTemplates 是集群范围
的 WorkflowTemplates。 ClusterWorkflowTemplate 可以像 ClusterRole 一样在集群范围内使用,并且可以跨集群中的所有命名空间访问。
定义 ClusterWorkflowTemplate
apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
name: cluster-workflow-template-whalesay-template
spec:
templates:
- name: whalesay-template
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
引用其它 ClusterWorkflowTemplates
您可以使用带有 clusterScope: true
的 templateRef 字段从其他 ClusterWorkflowTemplates 中引用模板。正如您在同一 Workflow 中引用其他模板的方式一样,您应该在 steps 或 dag 模板中这样做。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
steps: # You should only reference external "templates" in a "steps" or "dag" "template".
- - name: call-whalesay-template
templateRef: # You can reference a "template" from another "WorkflowTemplate or ClusterWorkflowTemplate" using this field
name: cluster-workflow-template-whalesay-template # This is the name of the "WorkflowTemplate or ClusterWorkflowTemplate" CRD that contains the "template" you want
template: whalesay-template # This is the name of the "template" you want to reference
clusterScope: true # This field indicates this templateRef is pointing ClusterWorkflowTemplate
arguments: # You can pass in arguments as normal
parameters:
- name: message
value: "hello world"
2.9 and after
通过 ClusterWorkflowTemplate Spec 创建 Workflow
您可以使用带有 clusterScope: true
的 workflowTemplateRef 从 ClusterWorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与集群 Workflow 模板参数合并。
这是带有 entrypoint
和 arguments
的 ClusterWorkflowTemplate 示例:
apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
name: cluster-workflow-template-submittable
spec:
entrypoint: whalesay-template
arguments:
parameters:
- name: message
value: hello world
templates:
- name: whalesay-template
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
这是一个通过 ClusterWorkflowTemplate 创建为 Workflow 的示例,并将 entrypoint
和 arguments
传递给 ClusterWorkflowTemplate:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: cluster-workflow-template-hello-world-
spec:
entrypoint: whalesay-template
arguments:
parameters:
- name: message
value: "from workflow"
workflowTemplateRef:
name: cluster-workflow-template-submittable
clusterScope: true
这是一个通过 WorkflowTemplate 创建为 Workflow 并使用 WorkflowTemplates 的 entrypoint
和 arguments
的示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: cluster-workflow-template-hello-world-
spec:
workflowTemplateRef:
name: cluster-workflow-template-submittable
clusterScope: true
管理 ClusterWorkflowTemplates
CLI
您可以创建一些示例模板,如下所示:
argo cluster-template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/clustertemplates.yaml
使用这些模板之一提交工作流:
argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/cluster-wftmpl-dag.yaml
2.7 and after
将 ClusterWorkflowTemplate 作为 Workflow 提交:
argo submit --from clusterworkflowtemplate/workflow-template-submittable
kubectl
使用 kubectl apply -f 和 kubectl get cwft
UI
ClusterWorkflowTemplate 资源也可以由 UI 管理
CronWorkflows
v2.5 and after
介绍
CronWorkflow 是按预设时间表运行的工作流。它们旨在轻松地从 Workflow 转换并模仿与 Kubernetes CronJob 相同的选项。本质上,CronWorkflow = Workflow + 一些特定的 cron 选项。
CronWorkflow Spec
CronWorkflow spec 示例如下:
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 90"]
workflowSpec 和 workflowMetadata
CronWorkflow.spec.workflowSpec 和 Workflow.spec 类型相同,用于从中创建的 Workflow 对象的模板。spec 下的所有内容都将转换为 Workflow。
生成的 Workflow 名称将是基于 CronWorkflow 名称生成的名称。在此示例中,它可能类似于 test-cron-wf-tj6fe
CronWorkflow.spec.workflowMetadata
可用于添加 labels
和 annotations
。
CronWorkflow 选项
选项名称 | 默认值 | 描述 |
---|---|---|
schedule | None, must be provided | 工作流调度计划。例如,5 4 * * * |
timezone | Machine timezone | 将根据 IANA 时区标准运行工作流的时区,例如,America/Los_Angeles |
suspend | false | 如果为true,工作流调度将不会发生。可以从 CLI、GitOps 或直接设置 |
concurrencyPolicy | Allow | 多个工作流同时调度时的处理策略。可选项: * Allow:全部允许 * Replcace:在安排新的之前删除所有旧的 * Forbid:有旧的存在时不许有新的 |
startingDeadlineSeconds | 0 | Number of seconds after the last successful run during which a missed Workflow will be run |
successfulJobsHistoryLimit | 3 | 将保留的成功工作流的数量 |
failedJobsHistoryLimit | 1 | 将保留的失败工作流的数量 |
Cron Schedule 语法
cron 调度程序使用标准的 cron 语法,参看documented on Wikipedia。
此处记录了所使用的特定库的更详细文档。
崩溃恢复
如果 workflow-controller
崩溃(CronWorkflow controller 也会崩溃),您可以设置一些选项,以确保在 controller 关闭时计划的 CronWorkflow 仍然可以运行。主要是可以设置 startingDeadlineSeconds
来指定 CronWorkflow 最后一次成功运行后的最大秒数,在此期间错过的运行仍将被执行。
例如,如果每分钟运行的 CronWorkflow 最后一次运行是在 12:05:00,并且控制器在 12:05:55 和 12:06:05 之间崩溃,那么 12:06:00 的预期执行时间将会错过。然而,如果将 startingDeadlineSeconds 设置为大于65的值(上次计划运行时间 12:05:00 和当前控制器重新启动时间 12:06:05 之间经过的时间量),那么 CronWorkflow 的实例将在 12:06:05 准确执行。
由于设置了startingDeadlineSeconds,当前只会执行一个实例。
此设置也可以与 concurrencyPolicy 一起配置,以实现更精细的控制。
管理 CronWorkflow
CLI
可以使用基本命令从 CLI 创建 CronWorkflow:
$ argo cron create cron.yaml
Name: test-cron-wf
Namespace: argo
Created: Mon Nov 18 10:17:06 -0800 (now)
Schedule: * * * * *
Suspended: false
StartingDeadlineSeconds: 0
ConcurrencyPolicy: Forbid
$ argo cron list
NAME AGE LAST RUN SCHEDULE SUSPENDED
test-cron-wf 49s N/A * * * * * false
# some time passes
$ argo cron list
NAME AGE LAST RUN SCHEDULE SUSPENDED
test-cron-wf 56s 2s * * * * * false
$ argo cron get test-cron-wf
Name: test-cron-wf
Namespace: argo
Created: Wed Oct 28 07:19:02 -0600 (23 hours ago)
Schedule: * * * * *
Suspended: false
StartingDeadlineSeconds: 0
ConcurrencyPolicy: Replace
LastScheduledTime: Thu Oct 29 06:51:00 -0600 (11 minutes ago)
NextScheduledTime: Thu Oct 29 13:03:00 +0000 (32 seconds from now)
Active Workflows: test-cron-wf-rt4nf
注意:NextScheduledRun 假设工作流控制器使用 UTC 作为其时区
kubectl
使用 kubectl apply -f 和 kubectl get cwf
回填天数
See cron backfill.
GitOps via Argo CD
CronWorkflow resources can be managed with GitOps by using Argo CD
UI
CronWorkflow 资源也可以由 UI 管理
模板类型
HTTP 模板
v3.2 and after
HTTP 模板可用于执行 HTTP 请求。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: http-template-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: get-google-homepage
template: http
arguments:
parameters: [{name: url, value: "https://www.google.com"}]
- name: http
inputs:
parameters:
- name: url
http:
timeoutSeconds: 20 # Default 30
url: "{{inputs.parameters.url}}"
method: "GET" # Default GET
headers:
- name: "x-header-name"
value: "test-value"
# Template will succeed if evaluated to true, otherwise will fail
# Available variables:
# request.body: string, the request body
# request.headers: map[string][]string, the request headers
# response.url: string, the request url
# response.method: string, the request method
# response.statusCode: int, the response status code
# response.body: string, the response body
# response.headers: map[string][]string, the response headers
successCondition: "response.body contains \"google\"" # available since v3.3
body: "test body" # Change request body
Argo 代理
HTTP 模板使用 Argo 代理,它独立于控制器执行请求。Agent 和 Workflow Controller 通过 WorkflowTaskSet CRD 进行通信,该 CRD 是为每个需要使用 Agent 的正在运行的 Workflow 创建的。
为了使用 Argo 代理,您需要确保已添加适当的 Workflow RBAC 以将代理角色添加到 Argo 工作流。代理角色示例如下所示:
# https://argoproj.github.io/argo-workflows/workflow-rbac/
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: agent
annotations:
workflows.argoproj.io/description: |
This is the minimum recommended permissions needed if you want to use the agent, e.g. for HTTP or plugin templates.
If <= v3.2 you must replace `workflowtasksets/status` with `patch workflowtasksets`.
rules:
- apiGroups:
- argoproj.io
resources:
- workflowtasksets
verbs:
- list
- watch
- apiGroups:
- argoproj.io
resources:
- workflowtasksets/status
verbs:
- patch
Container Set 模板
v3.1 and after
Container set 模板和普通的 container 或 script 模板类似,但允许你在一个 pod 中指定运行多个 containers。
由于这多个 container 包含在一个 pod 中,它们将被调度到同一台宿主机上。你可以使用便宜且便捷的 empty-dir 卷替代 PVC 来实现多个步骤间共享数据。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: container-set-template-
spec:
entrypoint: main
templates:
- name: main
volumes:
- name: workspace
emptyDir: { }
containerSet:
volumeMounts:
- mountPath: /workspace
name: workspace
containers:
- name: a
image: argoproj/argosay:v2
- name: b
image: argoproj/argosay:v2
- name: main
image: argoproj/argosay:v2
dependencies:
- a
- b
outputs:
parameters:
- name: message
valueFrom:
path: /workspace/message
有几点需要注意:
-
您必须使用 Emissary Executor。
-
或者所有容器必须并行运行——即它是一个没有依赖关系的图。
-
您不能使用增强的依赖逻辑。
-
它将使用所有资源请求的总和,可能比相同的 DAG 模板花费更多。如果您的请求已经花费了很多,这将是一个问题。见下文。
通过指定依赖关系,可以将容器排列为图形。这适用于运行 10 个而不是 100 个容器。
输入和输出
与 container 和 script 模板一样,输入和输出只能从名为 main 的容器中加载和保存。
所有有制品的 container set 模板必须/应该有一个名为 main 的容器。
如果要使用基础层制品,main 必须最后完成,因此它必须是图中的根节点。
那可能不切实际。
相反,拥有一个工作区卷并确保所有工件路径都在该卷上。
⚠️资源请求
一个 container set 实际上启动了所有容器,而 Emissary 仅在它所依赖的容器完成后才启动主容器进程。这意味着即使容器没有做任何有用的工作,它仍然在消耗资源,并且您仍然需要为它们付费。
如果您的请求很小,这将不是问题。
如果您的请求很大,请设置资源请求,以便资源总和是您一次性所需的最大值。
示例 A:一个简单的序列,例如a -> b -> c
-
a 需要 1Gi 内存
-
b 需要 2Gi 内存
-
c 需要 1Gi 内存
然后你知道你最多只需要 2Gi。你可以设置如下:
-
a 请求 512Mi 内存
-
b 请求 1Gi 内存
-
c 请求 512Mi 内存
总共是2Gi,对于b来说已经足够了。所有任务都能正常运行。
示例 B:菱形 DAG,例如菱形 a -> b -> d 和 a -> c -> d,即 b 和 c 同时运行。
-
a 需要 1000 cpu
-
b 需要 2000 cpu
-
c 需要 1000 cpu
-
d 需要 1000 cpu
我知道 b 和 c 将同时运行。所以我需要确保总数是3000。
-
a 请求 500 cpu
-
b 请求 1000 cpu
-
c 请求 1000 cpu
-
d 请求 500 cpu
一共3000,足够b+c了。所有任务都能正常运行。
示例 C:不平衡的请求,例如a -> b 其中 a 便宜而 b 昂贵
-
a 需要 100 cpu, 1Mi 内存, 运行 10h
-
b 需要 8Ki GPU, 100 Gi 内存, 200 Ki GPU, 运行 5m
你能看出这里的问题吗? a 只有小请求,但容器集将使用所有请求的总数。因此,就好像您将所有 GPU 都使用了 10 小时。这将是昂贵的。
解决方案:当您有不平衡的请求时,不要使用容器集
数据获取和转换
v3.1 and after
我们有意使此功能仅具有基本功能。我们希望我们能够根据社区的反馈构建此功能。如果您对此功能有想法和用例,请在 GitHub 上打开增强提案。
介绍
用户经常将获取和转换数据作为其工作流程的一部分。data
模板为这些常见操作提供一流的支持。
可以通过查看 bash
中的常见数据源和转换操作来理解 data
模板:
find -r . | grep ".pdf" | sed "s/foo/foo.ready/"
此类操作包括两个主要部分:
-
一个数据源:find -r .
-
一系列转换操作,串行转换源的输出:| grep ".pdf" | sed "s/foo/foo.ready/"
例如,此操作在寻找待处理文件的潜在列表以及根据需要过滤和操作列表时可能很有用。
在 Argo 中,这个操作可以写成:
- name: generate-artifacts
data:
source: # Define a source for the data, only a single "source" is permitted
artifactPaths: # A predefined source: Generate a list of all artifact paths in a given repository
s3: # Source from an S3 bucket
bucket: test
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
transformation: # The source is then passed to be transformed by transformations defined here
- expression: "filter(data, {# endsWith \".pdf\"})"
- expression: "map(data, {# + \".ready\"})"
Spec
data 模板必须包含一个 source 字段。当前可用的数据源有:
-
artifactPaths
:从指定的制品库生成制品路径列表
data 模板可能包含多个 transformations(也可能是0个)。转换将按顺序连续应用。当前可用的转换:
我们知道 expression 转换是有限的。我们打算根据社区的反馈大大扩展此模板的功能。请参阅本文档顶部的链接以提交有关此功能的想法或用例。
内联模板
v3.2 and after
您可以在 DAG 和步骤中内联其他模板。
示例:
- DAG
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-inline-
labels:
workflows.argoproj.io/test: "true"
annotations:
workflows.argoproj.io/description: |
This example demonstrates running a DAG with inline templates.
workflows.argoproj.io/version: ">= 3.2.0"
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: a
inline:
container:
image: argoproj/argosay:v2
- Steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-inline-
labels:
workflows.argoproj.io/test: "true"
annotations:
workflows.argoproj.io/description: |
This workflow demonstrates running a steps with inline templates.
workflows.argoproj.io/version: ">= 3.2.0"
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: a
inline:
container:
image: argoproj/argosay:v2
警告:您只能内联一次。在 DAG 中内联 DAG 将不起作用。
访问控制
服务账户
配置服务帐户以运行工作流
Roles, Role-Bindings 和 Service Accounts
为了让 Argo 支持制品、输出、访问 secrets 等功能,它需要使用 Kubernetes API 与 Kubernetes 资源进行通信。为了与 Kubernetes API 通信,Argo 使用一个 ServiceAccount
授权自己访问 Kubernetes API。您可以通过使用 RoleBinding 将角色绑定到 ServiceAccount 来指定 Argo 使用的 ServiceAccount 的角色(即哪些权限)。
然后,在提交工作流时,您可以指定 Argo 使用哪个 ServiceAccount :
argo submit --serviceaccount <name>
如果没提供 ServiceAccount,Argo 将会使用其所运行的命名空间的 default
ServiceAccount,默认情况下几乎总是没有足够的权限。
有关为您的用例授予 Argo 必要权限的更多信息,请参阅 Workflow RBAC。
授予 admin 权限
出于本演示的目的,我们将授予default
ServiceAccount 管理员权限(即,我们将 admin Role 绑定到当前命名空间的default ServiceAccount):
kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default -n argo
请注意,这将向运行命令的命名空间中的默认 ServiceAccount 授予管理员权限,因此您将只能在创建 RoleBinding 的命名空间中运行工作流。
Workflow RBAC
工作流中的所有 pod 都使用 workflow.spec.serviceAccountName
中指定的服务账户运行,如果省略,则使用工作流命名空间的 default
服务账户。工作流需要的访问权限取决于工作流需要做什么。例如,如果您的工作流需要部署资源,则工作流的服务帐户将需要对该资源的“create”权限。
警告:我们不建议在生产环境使用 default 服务账户。它是一个共享帐户,因此可能会添加您不想要的权限。相反,仅为您的工作流程创建一个服务帐户。
执行器工作所需的最小权限:
对于 >= v3.4:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: executor
rules:
- apiGroups:
- argoproj.io
resources:
- workflowtaskresults
verbs:
- create
- patch
对于 <= v3.3 使用。
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: executor
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- patch
警告:对于许多组织来说,给工作流 Pod 补丁权限可能是不可接受的,参考#3961
如果您不使用 emissary,则需要额外的权限。请参阅 executor 以获得合适的权限。
功能
Workflow 变量
工作流规范中的某些字段允许由 Argo 自动替换的变量引用。
如何使用变量
变量用花括号括起来:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-parameters-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
value: hello world
templates:
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [ cowsay ]
args: [ "{{inputs.parameters.message}}" ]
以下变量可用于引用工作流的各种元数据:
模板标签类型
有两种模板标签:
-
简单的: 默认,示例 {{workflow.name}}
-
表达式: {{ 后紧跟着 =,例如 {{=workflow.name}}
简单的
标签被替换为与标签同名的变量。
简单标签可能在括号和变量之间有空格,如下所示。但是,存在一个已知问题,即变量可能无法使用空白进行插值,因此建议在解决此问题之前避免使用空白。请使用可重现的示例报告意外行为。
args: [ "{{ inputs.parameters.message }}" ]
表达式
Since v3.1
该标签被作为表达式评估该标签的结果替换。
请注意,任何带连字符的参数名称或步骤名称都会导致解析错误。您可以通过索引参数或步骤图来引用它们,例如 inputs.parameters['my-param']
或 steps['my-step'].outputs.result
。
示例
普通列表:
[1, 2]
列表过滤:
filter([1, 2], { # > 1})
映射一个列表:
map([1, 2], { # * 2 })
我们提供了一些核心函数:
转成 int:
asInt(inputs.parameters['my-int-param'])
转成 float:
asFloat(inputs.parameters['my-float-param'])
转成字符串:
string(1)
转成JSON串(需要 withParam):
toJson([1, 2])
从 JSON 中提取数据:
jsonpath(inputs.parameters.json, '$.some.path')
您还可以使用 Sprig 函数:
修剪字符串:
sprig.trim(inputs.parameters['my-string-param'])
!!!警告 在 Sprig 函数中,通常不会引发错误。例如。如果 int 用于无效值,则返回 0。请查看 Sprig 文档以了解哪些函数可以使用,哪些不可以。
引用
所有模板
变量 | 描述 |
---|---|
inputs.parameters.<NAME> | 给模板传入参数 |
inputs.parameters | 模板的所有输入参数作为 JSON 字符串 |
inputs.artifacts.<NAME> | 将制品输入到模板 |
Steps 模板
变量 | 描述 |
---|---|
steps.name | 步骤名称 |
steps.<STEPNAME>.id | 容器步骤的唯一 ID |
steps.<STEPNAME>.ip | 任何先前守护程序 container 步骤的 IP 地址 |
steps.<STEPNAME>.status | 任何先前步骤的状态短语 |
steps.<STEPNAME>.exitCode | 任何先前 script/container 步骤的退出码 |
steps.<STEPNAME>.startedAt | 当前步骤开始的时间戳 |
steps.<STEPNAME>.finishedAt | 当前步骤结束的时间戳 |
steps.<STEPNAME>.outputs.result | 任何先前 script/container 步骤的输出结果 |
steps.<STEPNAME>.outputs.parameters | When the previous step uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation |
steps.<STEPNAME>.outputs.parameters.<NAME> | Output parameter of any previous step. When the previous step uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation |
steps.<STEPNAME>.outputs.artifacts.<NAME> | 任何先前步骤的输出工件 |
DAG 模板
变量 | 描述 |
---|---|
tasks.name | 任务名称 |
tasks.<TASKNAME>.id | container 任务的唯一ID |
tasks.<TASKNAME>.ip | 任何先前守护程序 container 任务的 IP 地址 |
tasks.<TASKNAME>.status | 任何先前任务的状态短语 |
tasks.<TASKNAME>.exitCode | 任何先前 script/container 任务的退出码 |
tasks.<TASKNAME>.startedAt | 当前任务开始的时间戳 |
tasks.<TASKNAME>.finishedAt | 当前任务结束的时间戳 |
tasks.<TASKNAME>.outputs.result | 任何先前 script/container 任务的输出结果 |
tasks.<TASKNAME>.outputs.parameters | When the previous task uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation |
tasks.<TASKNAME>.outputs.parameters.<NAME> | Output parameter of any previous task. When the previous task uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation |
tasks.<TASKNAME>.outputs.artifacts.<NAME> | 任何先前任务的输出工件 |
HTTP 模板
Since v3.3
Only available for successCondition
变量 | 描述 |
---|---|
request.method | 请求方法(字符串) |
request.url | 请求URL(字符串) |
request.body | 请求体(字符串) |
request.headers | 请求头 (map[string][]string) |
response.statusCode | 响应状态码 (int) |
response.body | 响应体(string) |
response.headers | 响应头 (map[string][]string) |
重试策略
When using the expression field within retryStrategy, special variables are available.
变量 | 描述 |
---|---|
lastRetry.exitCode | Exit code of the last retry |
lastRetry.Status | Status of the last retry |
lastRetry.Duration | Duration in seconds of the last retry |
Note: These variables evaluate to a string type. If using advanced expressions, either cast them to int values (expression: "{{=asInt(lastRetry.exitCode) >= 2}}") or compare them to string values (expression: "{{=lastRetry.exitCode != '2'}}").
Container/Script 模板
变量 | 描述 |
---|---|
pod.name | container/script 的 Pod 名称 |
retries | 如果指定了重试策略,container/script的重试次数 |
inputs.artifacts.<NAME>.path | 输入工件的本地路径 |
outputs.artifacts.<NAME>.path | 输出工件的本地路径 |
outputs.parameters.<NAME>.path | 输出参数的本地路径 |
循环 (withItems / withParam)
变量 | 描述 |
---|---|
item | 元素列表 |
item.<FIELDNAME> | 列表中元素字段 |
指标
When emitting custom metrics in a template, special variables are available that allow self-reference to the current step.
变量 | 描述 |
---|---|
status | Phase status of the metric-emitting template |
duration | Duration of the metric-emitting template in seconds (only applicable in Template-level metrics, for Workflow-level use workflow.duration) |
exitCode | Exit code of the metric-emitting template |
inputs.parameters.<NAME> | Input parameter of the metric-emitting template |
outputs.parameters.<NAME> | Output parameter of the metric-emitting template |
outputs.result | Output result of the metric-emitting template |
resourcesDuration.{cpu,memory} | Resources duration in seconds. Must be one of resourcesDuration.cpu or resourcesDuration.memory, if available. For more info, see the Resource Duration doc. |
实时指标
Some variables can be emitted in real-time (as opposed to just when the step/task completes). To emit these variables in real time, set realtime: true under gauge (note: only Gauge metrics allow for real time variable emission). Metrics currently available for real time emission:
For Workflow-level metrics:
- workflow.duration
For Template-level metrics:
- duration
全局变量
变量 | 描述 |
---|---|
workflow.name | Workflow 名称 |
workflow.namespace | Workflow 命名空间 |
workflow.serviceAccountName | Workflow 服务账户名称 |
workflow.uid | Workflow UID。用于设置资源或唯一工件位置的所有权引用 |
workflow.parameters.<NAME> | Workflow的输入参数 |
workflow.parameters | Workflow的所有输入参数的 JSON 字符串 |
workflow.outputs.parameters.<NAME> | Workflow中的全局参数 |
workflow.outputs.artifacts.<NAME> | 工作流中的全局工件 |
workflow.annotations.<NAME> | Workflow的注解 |
workflow.labels.<NAME> | Workflow的标签 |
workflow.creationTimestamp | Workflow 的创建时间,格式为 RFC 3339 (e.g. 2018-08-23T05:42:49Z) |
workflow.creationTimestamp.<STRFTIMECHAR> | Workflow 的创建时间,格式为strftime. |
workflow.creationTimestamp.RFC3339 | Workflow 的创建时间,格式为 RFC 3339 |
workflow.priority | Workflow的优先级 |
workflow.duration | Workflow持续时间估计值可能与实际持续时间相差几秒钟 |
workflow.scheduledTime | 被调度时间,格式为 RFC 3339 (仅用于 CronWorkflow) |
退出处理器
变量 | 描述 |
---|---|
workflow.status | Workflow的状态。可选值为:Succeeded, Failed, Error |
workflow.failures | A list of JSON objects containing information about nodes that failed or errored during execution. Available fields: displayName, message, templateName, phase, podName, and finishedAt. |
重试
Argo工作流提供了一系列重试失败步骤的选项。
在 WorkflowSpec 中配置 retryStrategy
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-container-
spec:
entrypoint: retry-container
templates:
- name: retry-container
retryStrategy:
limit: "10"
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
重试策略
使用retryPolicy选择要重试的失败:
-
Always
:重试所有失败的步骤。 -
OnFailure
: 重试其主容器在Kubernetes中标记为失败的步骤(这是默认值)。 -
OnError
: 重试遇到Argo控制器错误或其init或wait容器失败的步骤 -
OnTransientError
: Retry steps that encounter errors defined as transient, or errors matching the TRANSIENT_ERROR_PATTERN environment variable. Available in version 3.0 and later.
示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-on-error-
spec:
entrypoint: error-container
templates:
- name: error-container
retryStrategy:
limit: "2"
retryPolicy: "Always"
container:
image: python
command: ["python", "-c"]
# fail with a 80% probability
args: ["import random; import sys; exit_code = random.choice(range(0, 5)); sys.exit(exit_code)"]
有条件地重试
v3.2 and after
可以使用 expression
来控制重试。expression
字段接受expr表达式,并且可以访问如下变量:
-
lastRetry.exitCode
: 上次重试的退出码,如果不可用则为“-1” -
lastRetry.status
: 上次重试的结果短语: Error, Failed -
lastRetry.duration
: 上次重试耗时,秒
如果 expression 评估为 false,该步骤将不会重试。
示例:
# Only retry if the retryStrategy.expression condition is satisfied. In this example, retries will be made until a pod has
# exit code 2 or the limit of 10 is reached, whichever happens first.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-script-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: safe-to-retry
template: safe-to-retry
- - name: retry
template: retry-script
arguments:
parameters:
- name: safe-to-retry
value: "{{steps.safe-to-retry.outputs.result}}"
- name: safe-to-retry
script:
image: python:alpine3.6
command: ["python"]
source: |
print("true")
- name: retry-script
inputs:
parameters:
- name: safe-to-retry
retryStrategy:
limit: "3"
# Only continue retrying if the last exit code is greater than 1 and the input parameter is true
expression: "asInt(lastRetry.exitCode) > 1 && {{inputs.parameters.safe-to-retry}} == true"
script:
image: python:alpine3.6
command: ["python"]
# Exit 1 with 50% probability and 2 with 50%
source: |
import random;
import sys;
exit_code = random.choice([1, 2]);
sys.exit(exit_code)
Back-Off
您可以使用 backoff
配置重试之间的延迟。
# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-backoff-
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: "10"
backoff:
duration: "1" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
factor: "2"
maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
生命周期钩子
v3.3 and after
介绍
LifecycleHook 触发一个基于条件表达式的动作。它在工作流级别或模板级别进行配置,例如分别作为 workflow.status
或 steps.status
的功能。LifecycleHook 在执行期间执行并执行一次。
换句话说,LifecycleHook 的功能类似于带有条件表达式的退出处理程序。
Workflow 级别 LifecycleHook: 满足配置的表达式时执行工作流。
示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-
spec:
entrypoint: main
hooks:
exit:
template: http
running:
expression: workflow.status == "Running"
template: http
templates:
- name: main
steps:
- - name: step1
template: heads
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: http
http:
# url: http://dummy.restapiexample.com/api/v1/employees
url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"
Template-level Lifecycle-Hook: 满足配置的表达式时执行模板。
示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-tmpl-level-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: step-1
hooks:
exit:
# Expr will not support `-` on variable name. Variable should wrap with `[]`
expression: steps["step-1"].status == "Running"
template: http
success:
expression: steps["step-1"].status == "Succeeded"
template: http
template: echo
- - name: step2
hooks:
exit:
expression: steps.step2.status == "Running"
template: http
success:
expression: steps.step2.status == "Succeeded"
template: http
template: echo
- name: echo
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: http
http:
# url: http://dummy.restapiexample.com/api/v1/employees
url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"
支持的条件
-
Exit handler variables: workflow.status 和 workflow.failures
不支持的条件
- outputs 不可用,因为 LifecycleHook 在执行期间执行,并且在该步骤完成之前不会产生输出。
通知用例
LifecycleHook 可用于根据工作流状态更改或模板状态更改配置通知,如下例所示:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-
spec:
entrypoint: main
hooks:
exit:
template: http
running:
expression: workflow.status == "Running"
template: http
templates:
- name: main
steps:
- - name: step1
template: heads
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: http
http:
url: http://dummy.restapiexample.com/api/v1/employees
步骤级记忆
v2.10 and after
介绍
工作流通常具有计算成本高的输出。此功能通过记忆先前运行的步骤来降低成本和工作流执行时间:它将模板的输出存储到具有可变 key 的指定缓存中。
缓存方法
目前,缓存只能使用 config-maps。这使您可以轻松地通过 kubectl 和 Kubernetes API 手动操作缓存条目,而无需通过 Argo。
使用记忆
记忆是在模板级别设置的。您必须指定一个键,它可以是静态字符串,但更多时候取决于输入。您还必须指定配置映射缓存的名称。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: memoized-workflow-
spec:
entrypoint: whalesay
templates:
- name: whalesay
memoize:
key: "{{inputs.parameters.message}}"
cache:
configMap:
name: whalesay-cache
注意:为了使用记忆,有必要为适当的(集群)角色添加configmaps 资源的 create 和 update 权限。在集群安装的情况下,应该更新 argo-cluster-role 集群角色,而对于命名空间安装,应该更新 argo-role 角色。
FAQ
-
如果你看到错误“error creating cache entry: ConfigMap "reuse-task" is invalid: []: Too long: must have at most 1048576 characters”,这是由于 ConfigMap 有 1MB 大小的限制。下面是一些解决方案:
-
删除现有的 ConfigMap 缓存或切换到使用不同的缓存。
-
减少正在记忆的节点的输出参数的大小。
-
将您的缓存拆分为不同的记忆键和缓存名称,以便每个缓存条目都很小。
-
模板默认值
v3.1 and after
介绍
TemplateDefaults 功能使用户能够在工作流 spec 级别
配置默认模板值,该值将应用于工作流中的所有模板。如果模板的值在 templateDefault
中也有默认值,则模板的值将优先。这些值将在运行时应用。将使用 Kubernetes 战略合并补丁合并模板值和默认值。要检查列表值是否合并以及如何合并,请检查工作流定义中的 patchStrategy 和 patchMergeKey 标签。
在 WorkflowSpec 中配置 templateDefaults
示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: template-defaults-example
spec:
entrypoint: main
templateDefaults:
timeout: 30s # timeout value will be applied to all templates
retryStrategy: # retryStrategy value will be applied to all templates
limit: 2
templates:
- name: main
container:
image: docker/whalesay:latest
在控制器级别配置templateDefaults
操作员可以在 workflow defaults中配置 templateDefaults。此 templateDefault 将应用于在控制器上运行的所有工作流。
以下将在 Config Map 中指定:
apiVersion: v1
kind: ConfigMap
metadata:
name: workflow-controller-configmap
data:
# Default values that will apply to all Workflows from this controller, unless overridden on the Workflow-level
workflowDefaults: |
metadata:
annotations:
argo: workflows
labels:
foo: bar
spec:
ttlStrategy:
secondsAfterSuccess: 5
templateDefaults:
timeout: 30s
增强的依赖逻辑
v2.9 and after
介绍
在 2.8 版之前,在 DAG 模板中指定依赖项的唯一方法是使用 dependencies
字段并指定当前任务所依赖的其他任务的列表。此语法具有限制性,因为它不允许用户指定要依赖的任务结果。例如,一个任务只有在其依赖的任务执行成功(或失败等)时才会运行。
依赖
为了解决这个问题,存在一个名为depends
的新字段,它允许用户指定相关任务、它们的状态以及任何复杂的布尔逻辑。该字段是一个字符串字段,其语法类似于表达式,格式为 <task-name>.<task-result>。示例包括 task-1.Suceeded、task-2.Failed、task-3.Daemoned。可用任务结果的完整列表如下:
任务结果 | 描述 | 含义 |
---|---|---|
.Succeeded | 任务执行成功 | 任务执行完成,且没有错误 |
.Failed | 任务执行失败 | 任务退出码非0 |
.Errored | 任务报错 | 任务有一个非 0 退出代码以外的错误 |
.Skipped | 任务被跳过 | 任务被跳过 |
.Omitted | 任务被省略 | 任务被省略 |
.Daemoned | 任务是守护进程并且不是 Pending 状态 |
为方便起见,如果省略的任务结果等价于 (task.Succeeded || task.Skipped || task.Daemoned)。
例如:
depends: "task || task-2.Failed"
等价于:
depends: (task.Succeeded || task.Skipped || task.Daemoned) || task-2.Failed
也可以使用完整的布尔逻辑。操作符包括:
-
&&
-
||
-
!
例如:
depends: "(task-2.Succeeded || task-2.Skipped) && !task-3.Failed"
如果您依赖使用了 withItems
的任务,您可以使用 .AnySucceeded 和 .AllFailed 依赖于任何项目任务是成功还是全部失败,例如:
depends: "task-1.AnySucceeded || task-2.AllFailed"
与 dependencies 和 dag.task.continueOn 的兼容性
此功能完全兼容 dependencies
,转换很容易。
要转换,只需在 dependencies
中加入 &&
:
dependencies: ["A", "B", "C"]
等价于:
depends: "A && B && C"
由于 depends 中加入了控制逻辑,使用 depends 不能使用 dag.task.continueOn
。此外,不能在同一个任务组中同时使用 dependencies
和 depends
。
Node字段选择器
v2.8 and after
介绍
resume、stop 和 retry 等 Argo CLI 和 API 命令支持 --node-field-selector
参数,允许用户选择命令要作用的一组节点。
与 CLI 一起使用时的格式为:
--node-field-selector=FIELD=VALUE
可选项
该字段可以是以下任何一项:
字段 | 描述 |
---|---|
displayName | Display name of the node. This is the name of the node as it is displayed on the CLI or UI, without considering its ancestors (see example below). This is a useful shortcut if there is only one node with the same displayName |
name | Full name of the node. This is the full name of the node, including its ancestors (see example below). Using name is necessary when two or more nodes share the same displayName and disambiguation is required. |
templateName | Template name of the node |
phase | Phase status of the node - e.g. Running |
templateRef.name | The name of the workflow template the node is referring to |
templateRef.template | The template within the workflow template the node is referring to |
inputs.parameters.<NAME>.value | The value of input parameter NAME |
The operator can be '=' or '!='. Multiple selectors can be combined with a comma, in which case they are anded together.
示例
过滤输入参数 'foo' 等于 'bar' 的节点:
--node-field-selector=inputs.parameters.foo.value=bar
过滤输入参数 'foo' 等于 'bar' ,且状态短语不是 Running 的节点:
--node-field-selector=foo1=bar1,phase!=Running
考虑以下工作流程:
● appr-promotion-ffsv4 code-release
├─✔ start sample-template/email appr-promotion-ffsv4-3704914002 2s
├─● app1 wftempl1/approval-and-promotion
│ ├─✔ notification-email sample-template/email appr-promotion-ffsv4-524476380 2s
│ └─ǁ wait-approval sample-template/waiting-for-approval
├─✔ app2 wftempl2/promotion
│ ├─✔ notification-email sample-template/email appr-promotion-ffsv4-2580536603 2s
│ ├─✔ pr-approval sample-template/approval appr-promotion-ffsv4-3445567645 2s
│ └─✔ deployment sample-template/promote appr-promotion-ffsv4-970728982 1s
└─● app3 wftempl1/approval-and-promotion
├─✔ notification-email sample-template/email appr-promotion-ffsv4-388318034 2s
└─ǁ wait-approval sample-template/waiting-for-approval
模式
Empty Dir
虽然默认情况下,Docker 和 PNS 工作流执行器可以从基础层(例如 /tmp)获取输出工件/参数,但 Kubelet 和 K8SAPI 执行器都不能。如果您使用安全上下文运行工作流 pod,则不太可能从基础层获得输出工件/参数。
您可以通过将卷挂载到您的 pod 来解决此限制。最简单的方法是使用 emptyDir 卷。
只能用于工件/参数输出,如果需要的话,工件/参数输入会自动挂载一个 empty-dir。
此示例显示如何挂载输出卷:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: empty-dir-
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
command: [sh, -c]
args: ["cowsay hello world | tee /mnt/out/hello_world.txt"]
volumeMounts:
- name: out
mountPath: /mnt/out
volumes:
- name: out
emptyDir: { }
outputs:
parameters:
- name: message
valueFrom:
path: /mnt/out/hello_world.txt
Cron 回填
使用场景
- 您正在使用 cron 工作流来运行日常作业,您可能需要重新运行某个日期,或者运行一些历史日期的作业。
解决方案
-
为您的日常工作创建工作流模板。
-
创建您的 cron 工作流程以每天运行并调用该模板。
-
创建一个使用 withSequence的回填工作流运行每日工作流。
这个完整示例包括:
-
一个名为 job 的工作流模板
-
一个名为 daily-job 的 cron 工作流
-
一个名为 backfill-v1 的工作流,它使用资源模板为每个回填日期创建一个工作流。
-
一个名为 backfill-v2 的替代工作流,它使用步骤模板为每个回填日期运行一个任务。
状态
资源预估
v2.7 and after
Argo Workflows 指示您的工作流使用了多少资源并保存此信息。这是一个指示性但不准确的值。
预估耗时
v2.12 and after
当您运行工作流时,控制器将尝试预估其耗时。
这是基于从同一工作流模板、集群工作流模板或 cron 工作流提交的最近成功的工作流。
为了获取这些数据,控制器首先查询 Kubernetes API(因为这样更快),然后是工作流存档(如果启用)。
如果您使用过 Jenkins 之类的工具,您就会知道估计可能不准确:
-
一个 pod 花费了很长时间等待调度。
-
工作流程是不确定的,例如它使用何时执行不同的路径。
-
工作流程可能因规模而异,例如有时它使用 withItems,因此有时运行 100 个节点,有时运行 1000 个。
-
如果 pod 运行时间不可预测。
-
工作流是参数化的,不同的参数会影响其持续时间。
工作流程进度
v2.12 and after
当您运行工作流时,控制器将报告其进度。
我们将进度定义为两个数字,N/M 使得 0 <= N <= M 和 0 <= M。
-
N 是已完成的任务量。
-
M 是任务总量。
如 0/0、0/1 或 50/100。
和预计持续时间不同,进度是确定的。无论有任何问题,每个工作流程都是相同的。
每个节点的进度计算如下:
-
对于 pod 节点,如果完成则为 1/1,否则为 0/1。
-
对于非叶节点,其子节点的总和。
对于整个工作流,进度是其所有叶节点的总和。
警告:每次将节点添加到图中,M 都会在工作流运行期间增加。
自我报告进度
工作流中的 Pod 可以在运行时报告自己的进度。这个自我报告的进度会覆盖自动生成的进度。
报告进度实现方式如下:
-
创建进度并将进度写入环境变量 ARGO_PROGRESS_FILE 指示的文件
-
进度格式必须为 N/M
执行器将每 3 秒读取一次此文件,如果有更新,会更新 pod 的 workflows.argoproj.io/progress: N/M 注解。控制器读取该数据,并将进度写入适当的状态属性。
最初,工作流 pod 的进度始终为 0/1。如果您想影响这一点,请确保在 pod 上设置初始进度注解:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: progress-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: progress
template: progress
- name: progress
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: alpine:3.14
command: [ "/bin/sh", "-c" ]
args:
- |
for i in `seq 1 10`; do sleep 10; echo "$(($i*10))"'/100' > $ARGO_PROGRESS_FILE; done
工作流创建者
v2.9 and after
如果您通过 CLI 或 UI 创建工作流,则会自动添加一个label,标记创建它的用户
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: my-wf
labels:
workflows.argoproj.io/creator: admin
# labels must be DNS formatted, so the "@" is replaces by '.at.'
workflows.argoproj.io/creator-email: admin.at.your.org
workflows.argoproj.io/creator-preferred-username: admin-preferred-username
网友评论