美文网首页Kubernetes
Argo Workflows用户手册

Argo Workflows用户手册

作者: 王勇1024 | 来源:发表于2023-01-05 10:28 被阅读0次

介绍

Argo Workflows 是一个开源容器原生工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。

  • 定义工作流中的每个步骤都是一个容器的工作流。

  • 将多步骤工作流建模为任务序列或使用有向无环图 (DAG) 捕获任务之间的依赖关系。

  • 使用 Kubernetes 上的 Argo Workflows 在很短的时间内轻松运行机器学习或数据处理的计算密集型作业。

  • 在 Kubernetes 上本地运行 CI/CD pipeline,无需配置复杂的软件开发产品。

安装Argo Workflows

CLI

Mac

# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-darwin-amd64.gz

# Unzip
gunzip argo-darwin-amd64.gz

# Make binary executable
chmod +x argo-darwin-amd64

# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo

# Test installation
argo version

Linux

# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-linux-amd64.gz

# Unzip
gunzip argo-linux-amd64.gz

# Make binary executable
chmod +x argo-linux-amd64

# Move binary to path
mv ./argo-linux-amd64 /usr/local/bin/argo

# Test installation
argo version

Controller 和 Server

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/install.yaml

核心概念

Workflow

Workflow是 Argo 中最重要的资源,具有两个重要功能:

  1. 它定义了要执行的工作流。

  2. 它存储工作流的状态。

Workflow Spec

要执行的工作流在 Workflow.spec 字段中定义。工作流规范的核心结构是模板列表和入口点。Workflow spec的核心结构是一组 templates 和一个 entrypoint

templates 可以松散地认为是“功能”:它们定义要执行的指令。entrypoint字段定义了“main”函数将是什么——即首先执行的模板。

下面是一个只有一个 template 的 Workflow 的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-  # Name of this Workflow
spec:
  entrypoint: whalesay        # Defines "whalesay" as the "main" template
  templates:
  - name: whalesay            # Defining the "whalesay" template
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]   # This template runs "cowsay" in the "whalesay" image with arguments "hello world"

模板类型

有6种 模板类型,可以分为两类。

模板定义

这些模板定义了要完成的工作,通常在一个容器中执行。

Container

可能是最通用的模板类型,将会调度一个 Container。模板的 spec 定义和 Kubernetes container spec定义相同。

示例:

  - name: whalesay
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]
Script

是对 container 的一种简便的封装。spec 定义和 container 相同,但增加了 source 字段:允许您就地定义脚本。该脚本将保存到文件中并为您执行。脚本的执行结果会自动导入到 Argo 变量 {{tasks.<NAME>.outputs.result}}{{steps.<NAME>.outputs.result}} 中(取决于调用方式)。

  - name: gen-random-int
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        i = random.randint(1, 100)
        print(i)
Resource

直接操作集群资源。可用于 get、create、apply、delete、replace 或 patch 集群中的资源。

这个示例会在集群是创建一个 ConfigMap 资源:

  - name: k8s-owner-reference
    resource:
      action: create
      manifest: |
        apiVersion: v1
        kind: ConfigMap
        metadata:
          generateName: owned-eg-
        data:
          some: value
Suspend

Suspend 模板将暂停执行一段时间或直到手动恢复。Suspend 模板可以通过 CLI(使用 argo resume)、API 接口或 UI 来恢复。

示例:

  - name: delay
    suspend:
      duration: "20s"

模板调用器

这些模板用于调用其他模板并提供执行控制。

Steps

Steps 模板允许您在一系列步骤中定义您的任务。该模板的结构是“一组列表”。外部列表顺序执行,内部列表并行执行。如果你想顺序执行内部列表,请使用 Synchronization特性。您可以设置多种选项来控制执行,例如 when有条件地执行步骤的子句

在这个示例中,首先执行 step1,待其执行完毕,再并行执行 step2a 和 step2b。

  - name: hello-hello-hello
    steps:
    - - name: step1
        template: prepare-data
    - - name: step2a
        template: run-data-first-half
      - name: step2b
        template: run-data-second-half
DAG

dag 模板允许您将任务定义为依赖关系图。在 DAG 中,您列出所有任务并设置在特定任务开始之前必须完成哪些其他任务。没有任何依赖关系的任务将立即运行。

在这个示例中,A 首先执行,待其执行完毕后,B和C并行执行,待B和C都执行完毕后,执行D:

  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
      - name: B
        dependencies: [A]
        template: echo
      - name: C
        dependencies: [A]
        template: echo
      - name: D
        dependencies: [B, C]
        template: echo

自定义资源类型

WorkflowTemplates

v2.4 and after

介绍

WorkflowTemplates 是集群中 Workflows 的定义。这允许您创建一个常用模板库,并通过直接提交它们(v2.7 及更高版本)或从您的 Workflows 中引用它们来重用它们。

WorkflowTemplate vs template

由于历史原因,WorkflowTemplate 和 template 存在一定的命名冲突,下面我们来介绍这两个术语的区别:

  • template(小写)是 Workflow 中的一个任务,或者是 templates 字段下的一个 WorkflowTemplate。当你定义一个 Workflow 时,你必须至少定义一个(但通常是多个)template。template 可以是 container、script、dag、steps、resources 或 suspend 类型,也可以引用一个 entrypoint 或其它 dag 和 step 的模板。

这是一个有两个 templates 的 Workflow 的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello           # We reference our first "template" here

  templates:
  - name: hello               # The first "template" in this Workflow, it is referenced by "entrypoint"
    steps:                    # The type of this "template" is "steps"
    - - name: hello
        template: whalesay    # We reference our second "template" here
        arguments:
          parameters: [{name: message, value: "hello1"}]

  - name: whalesay             # The second "template" in this Workflow, it is referenced by "hello"
    inputs:
      parameters:
      - name: message
    container:                # The type of this "template" is "container"
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
  • WorkflowTemplate 用于定义集群中的 Workflow。由于它是 Workflow 的定义,它也包含 templates。这些 templates 可以引用当前 WorkflowTemplate,也可以引用集群内的其它 Workflows 和 WorkflowTemplates。更多信息请参考 Referencing Other WorkflowTemplates

WorkflowTemplate Spec

v2.7 and after

在 v2.7 及更高版本中,WorkflowTemplates 支持 WorkflowSpec 中的所有字段(除 priority 外,它必须在 WorkflowSpec 中配置)。你可以将任何现有 Workflow 转换为 WorkflowTemplate,方法是将 kind: Workflow 替换为 kind: WorkflowTemplate。

v2.4 – 2.6

v2.4 - v2.6 中的 WorkflowTemplates 只是部分和 Workflow 定义相同,仅支持 templatesarguments 字段。

这将不是 v2.4 - v2.6 中的有效 WorkflowTemplate(注意 entrypoint 字段):

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: workflow-template-submittable
spec:
  entrypoint: whalesay-template     # Fields other than "arguments" and "templates" not supported in v2.4 - v2.6
  arguments:
    parameters:
      - name: message
        value: hello world
  templates:
    - name: whalesay-template
      inputs:
        parameters:
          - name: message
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]

但是,这将是一个有效的 WorkflowTemplate:

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: workflow-template-submittable
spec:
  arguments:
    parameters:
      - name: message
        value: hello world
  templates:
    - name: whalesay-template
      inputs:
        parameters:
          - name: message
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]

使用 workflowMetadata 向工作流添加 labels/annotations

2.10.2 and after

要自动向从 WorkflowTemplates 创建的工作流添加 labels/annotations,请使用 workflowMetadata。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: workflow-template-submittable
spec:
  workflowMetadata:
    labels:
      example-label: example-value

使用参数

在 WorkflowTemplate 中使用参数时,请注意以下几点:

  • 使用全局参数时,您可以在 Workflow 中实例化全局变量,然后在 WorkflowTemplate 中直接引用它们。如下所示:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: hello-world-template-global-arg
spec:
  serviceAccountName: argo
  templates:
    - name: hello-world
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{workflow.parameters.global-parameter}}"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-wf-global-arg-
spec:
  serviceAccountName: argo
  entrypoint: whalesay
  arguments:
    parameters:
      - name: global-parameter
        value: hello
  templates:
    - name: whalesay
      steps:
        - - name: hello-world
            templateRef:
              name: hello-world-template-global-arg
              template: hello-world
  • 使用本地参数时,必须在 WorkflowTemplate 内的模板定义中提供本地参数的值。如下所示:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: hello-world-template-local-arg
spec:
  templates:
    - name: hello-world
      inputs:
        parameters:
          - name: msg
            value: "hello world"
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{inputs.parameters.msg}}"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-local-arg-
spec:
  entrypoint: whalesay
  templates:
    - name: whalesay
      steps:
        - - name: hello-world
            templateRef:
              name: hello-world-template-local-arg
              template: hello-world

引用其它 WorkflowTemplates

您可以使用 templateRef 字段从另一个 WorkflowTemplates 引用 template。正如您在同一个 Workflow 引用其他 templates 的方式一样,您应该在 steps 或 dag 模板中这样做。

以下是来自 steps 模板的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    steps:                              # You should only reference external "templates" in a "steps" or "dag" "template".
      - - name: call-whalesay-template
          templateRef:                  # You can reference a "template" from another "WorkflowTemplate" using this field
            name: workflow-template-1   # This is the name of the "WorkflowTemplate" CRD that contains the "template" you want
            template: whalesay-template # This is the name of the "template" you want to reference
          arguments:                    # You can pass in arguments as normal
            parameters:
            - name: message
              value: "hello world"

您也可以使用 dag 模板进行类似操作:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    dag:
      tasks:
        - name: call-whalesay-template
          templateRef:
            name: workflow-template-1
            template: whalesay-template
          arguments:
            parameters:
            - name: message
              value: "hello world"

永远不应该直接在 template 对象上引用另一个 template(在 steps 或 dag 模板之外)。这包括使用 template 和 templateRef。此行为已弃用,不再受支持,并将在未来版本中删除。

以下是不应使用的已弃用参考的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    template:                     # You should NEVER use "template" here. Use it under a "steps" or "dag" template (see above).
    templateRef:                  # You should NEVER use "templateRef" here. Use it under a "steps" or "dag" template (see above).
      name: workflow-template-1
      template: whalesay-template
    arguments:                    # Arguments here are ignored. Use them under a "steps" or "dag" template (see above).
      parameters:
      - name: message
        value: "hello world"

从 WorkflowTemplate spec 创建 Workflow

您可以使用 workflowTemplateRef 从 WorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与工作流模板参数合并。下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中将 entrypoint 和 arguments 传递给 WorkflowTemplate:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  entrypoint: whalesay-template
  arguments:
    parameters:
      - name: message
        value: "from workflow"
  workflowTemplateRef:
    name: workflow-template-submittable

下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中使用 WorkflowTemplate 的 entrypoint 和 arguments 的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  workflowTemplateRef:
    name: workflow-template-submittable

管理 WorkflowTemplates

CLI

您可以创建一些示例模板,如下所示:

argo template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/templates.yaml

然后使用这些模板之一提交工作流:

argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/hello-world.yaml

2.7 and after

然后提交一个 WorkflowTemplate 作为 Workflow:

argo submit --from workflowtemplate/workflow-template-submittable

如果您需要将 WorkflowTemplate 作为带参数的 Workflow 提交:

argo submit --from workflowtemplate/workflow-template-submittable -p param1=value1

kubectl

使用 kubectl apply -f 和 kubectl get wftmpl

通过 Argo CD 实现 GitOps

可以使用 Argo CD 中 GitOps 中管理 WorkflowTemplate 资源

UI

WorkflowTemplate 资源也可以由 UI 管理。

当从 UI 提交 WorkflowTemplates 时,用户可以在 enum 下指定选项以启用下拉列表选择。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: workflow-template-with-enum-values
spec:
  entrypoint: argosay
  arguments:
    parameters:
      - name: message
        value: one
        enum:
          -   one
          -   two
          -   three
  templates:
    - name: argosay
      inputs:
        parameters:
          - name: message
            value: '{{workflow.parameters.message}}'
      container:
        name: main
        image: 'argoproj/argosay:v2'
        command:
          - /argosay
        args:
          - echo
          - '{{inputs.parameters.message}}'

ClusterWorkflowTemplates

v2.8 and after

介绍

ClusterWorkflowTemplates 是集群范围的 WorkflowTemplates。 ClusterWorkflowTemplate 可以像 ClusterRole 一样在集群范围内使用,并且可以跨集群中的所有命名空间访问。

定义 ClusterWorkflowTemplate

apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
  name: cluster-workflow-template-whalesay-template
spec:
  templates:
  - name: whalesay-template
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

引用其它 ClusterWorkflowTemplates

您可以使用带有 clusterScope: true 的 templateRef 字段从其他 ClusterWorkflowTemplates 中引用模板。正如您在同一 Workflow 中引用其他模板的方式一样,您应该在 steps 或 dag 模板中这样做。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-template-hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    steps:                              # You should only reference external "templates" in a "steps" or "dag" "template".
      - - name: call-whalesay-template
          templateRef:                  # You can reference a "template" from another "WorkflowTemplate or ClusterWorkflowTemplate" using this field
            name: cluster-workflow-template-whalesay-template   # This is the name of the "WorkflowTemplate or ClusterWorkflowTemplate" CRD that contains the "template" you want
            template: whalesay-template # This is the name of the "template" you want to reference
            clusterScope: true          # This field indicates this templateRef is pointing ClusterWorkflowTemplate
          arguments:                    # You can pass in arguments as normal
            parameters:
            - name: message
              value: "hello world"

2.9 and after

通过 ClusterWorkflowTemplate Spec 创建 Workflow

您可以使用带有 clusterScope: true 的 workflowTemplateRef 从 ClusterWorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与集群 Workflow 模板参数合并。

这是带有 entrypointarguments 的 ClusterWorkflowTemplate 示例:

apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
  name: cluster-workflow-template-submittable
spec:
  entrypoint: whalesay-template
  arguments:
    parameters:
      - name: message
        value: hello world
  templates:
    - name: whalesay-template
      inputs:
        parameters:
          - name: message
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]

这是一个通过 ClusterWorkflowTemplate 创建为 Workflow 的示例,并将 entrypointarguments 传递给 ClusterWorkflowTemplate:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: cluster-workflow-template-hello-world-
spec:
  entrypoint: whalesay-template
  arguments:
    parameters:
      - name: message
        value: "from workflow"
  workflowTemplateRef:
    name: cluster-workflow-template-submittable
    clusterScope: true

这是一个通过 WorkflowTemplate 创建为 Workflow 并使用 WorkflowTemplates 的 entrypointarguments 的示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: cluster-workflow-template-hello-world-
spec:
  workflowTemplateRef:
    name: cluster-workflow-template-submittable
    clusterScope: true

管理 ClusterWorkflowTemplates

CLI

您可以创建一些示例模板,如下所示:

argo cluster-template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/clustertemplates.yaml

使用这些模板之一提交工作流:

argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/cluster-wftmpl-dag.yaml

2.7 and after

将 ClusterWorkflowTemplate 作为 Workflow 提交:

argo submit --from clusterworkflowtemplate/workflow-template-submittable

kubectl

使用 kubectl apply -f 和 kubectl get cwft

UI

ClusterWorkflowTemplate 资源也可以由 UI 管理

CronWorkflows

v2.5 and after

介绍

CronWorkflow 是按预设时间表运行的工作流。它们旨在轻松地从 Workflow 转换并模仿与 Kubernetes CronJob 相同的选项。本质上,CronWorkflow = Workflow + 一些特定的 cron 选项。

CronWorkflow Spec

CronWorkflow spec 示例如下:

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: test-cron-wf
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 90"]

workflowSpec 和 workflowMetadata

CronWorkflow.spec.workflowSpec 和 Workflow.spec 类型相同,用于从中创建的 Workflow 对象的模板。spec 下的所有内容都将转换为 Workflow。

生成的 Workflow 名称将是基于 CronWorkflow 名称生成的名称。在此示例中,它可能类似于 test-cron-wf-tj6fe

CronWorkflow.spec.workflowMetadata 可用于添加 labelsannotations

CronWorkflow 选项

选项名称 默认值 描述
schedule None, must be provided 工作流调度计划。例如,5 4 * * *
timezone Machine timezone 将根据 IANA 时区标准运行工作流的时区,例如,America/Los_Angeles
suspend false 如果为true,工作流调度将不会发生。可以从 CLI、GitOps 或直接设置
concurrencyPolicy Allow 多个工作流同时调度时的处理策略。可选项:
* Allow:全部允许
* Replcace:在安排新的之前删除所有旧的
* Forbid:有旧的存在时不许有新的
startingDeadlineSeconds 0 Number of seconds after the last successful run during which a missed Workflow will be run
successfulJobsHistoryLimit 3 将保留的成功工作流的数量
failedJobsHistoryLimit 1 将保留的失败工作流的数量

Cron Schedule 语法

cron 调度程序使用标准的 cron 语法,参看documented on Wikipedia

此处记录了所使用的特定库的更详细文档。

崩溃恢复

如果 workflow-controller 崩溃(CronWorkflow controller 也会崩溃),您可以设置一些选项,以确保在 controller 关闭时计划的 CronWorkflow 仍然可以运行。主要是可以设置 startingDeadlineSeconds 来指定 CronWorkflow 最后一次成功运行后的最大秒数,在此期间错过的运行仍将被执行。

例如,如果每分钟运行的 CronWorkflow 最后一次运行是在 12:05:00,并且控制器在 12:05:55 和 12:06:05 之间崩溃,那么 12:06:00 的预期执行时间将会错过。然而,如果将 startingDeadlineSeconds 设置为大于65的值(上次计划运行时间 12:05:00 和当前控制器重新启动时间 12:06:05 之间经过的时间量),那么 CronWorkflow 的实例将在 12:06:05 准确执行。

由于设置了startingDeadlineSeconds,当前只会执行一个实例。

此设置也可以与 concurrencyPolicy 一起配置,以实现更精细的控制。

管理 CronWorkflow

CLI

可以使用基本命令从 CLI 创建 CronWorkflow:

$ argo cron create cron.yaml
Name:                          test-cron-wf
Namespace:                     argo
Created:                       Mon Nov 18 10:17:06 -0800 (now)
Schedule:                      * * * * *
Suspended:                     false
StartingDeadlineSeconds:       0
ConcurrencyPolicy:             Forbid

$ argo cron list
NAME           AGE   LAST RUN   SCHEDULE    SUSPENDED
test-cron-wf   49s   N/A        * * * * *   false

# some time passes

$ argo cron list
NAME           AGE   LAST RUN   SCHEDULE    SUSPENDED
test-cron-wf   56s   2s         * * * * *   false

$ argo cron get test-cron-wf
Name:                          test-cron-wf
Namespace:                     argo
Created:                       Wed Oct 28 07:19:02 -0600 (23 hours ago)
Schedule:                      * * * * *
Suspended:                     false
StartingDeadlineSeconds:       0
ConcurrencyPolicy:             Replace
LastScheduledTime:             Thu Oct 29 06:51:00 -0600 (11 minutes ago)
NextScheduledTime:             Thu Oct 29 13:03:00 +0000 (32 seconds from now)
Active Workflows:              test-cron-wf-rt4nf

注意:NextScheduledRun 假设工作流控制器使用 UTC 作为其时区

kubectl

使用 kubectl apply -f 和 kubectl get cwf

回填天数

See cron backfill.

GitOps via Argo CD

CronWorkflow resources can be managed with GitOps by using Argo CD

UI

CronWorkflow 资源也可以由 UI 管理

模板类型

HTTP 模板

v3.2 and after

HTTP 模板可用于执行 HTTP 请求。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: http-template-
spec:
  entrypoint: main
  templates:
    - name: main
      steps:
        - - name: get-google-homepage
            template: http
            arguments:
              parameters: [{name: url, value: "https://www.google.com"}]
    - name: http
      inputs:
        parameters:
          - name: url
      http:
        timeoutSeconds: 20 # Default 30
        url: "{{inputs.parameters.url}}"
        method: "GET" # Default GET
        headers:
          - name: "x-header-name"
            value: "test-value"
        # Template will succeed if evaluated to true, otherwise will fail
        # Available variables:
        #  request.body: string, the request body
        #  request.headers: map[string][]string, the request headers
        #  response.url: string, the request url
        #  response.method: string, the request method
        #  response.statusCode: int, the response status code
        #  response.body: string, the response body
        #  response.headers: map[string][]string, the response headers
        successCondition: "response.body contains \"google\"" # available since v3.3
        body: "test body" # Change request body

Argo 代理

HTTP 模板使用 Argo 代理,它独立于控制器执行请求。Agent 和 Workflow Controller 通过 WorkflowTaskSet CRD 进行通信,该 CRD 是为每个需要使用 Agent 的正在运行的 Workflow 创建的。

为了使用 Argo 代理,您需要确保已添加适当的 Workflow RBAC 以将代理角色添加到 Argo 工作流。代理角色示例如下所示:

# https://argoproj.github.io/argo-workflows/workflow-rbac/
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: agent
  annotations:
    workflows.argoproj.io/description: |
      This is the minimum recommended permissions needed if you want to use the agent, e.g. for HTTP or plugin templates.
      If <= v3.2 you must replace `workflowtasksets/status` with `patch workflowtasksets`.
rules:
  - apiGroups:
      - argoproj.io
    resources:
      - workflowtasksets
    verbs:
      - list
      - watch
  - apiGroups:
      - argoproj.io
    resources:
      - workflowtasksets/status
    verbs:
      - patch

Container Set 模板

v3.1 and after

Container set 模板和普通的 container 或 script 模板类似,但允许你在一个 pod 中指定运行多个 containers。

由于这多个 container 包含在一个 pod 中,它们将被调度到同一台宿主机上。你可以使用便宜且便捷的 empty-dir 卷替代 PVC 来实现多个步骤间共享数据。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: container-set-template-
spec:
  entrypoint: main
  templates:
    - name: main
      volumes:
        - name: workspace
          emptyDir: { }
      containerSet:
        volumeMounts:
          - mountPath: /workspace
            name: workspace
        containers:
          - name: a
            image: argoproj/argosay:v2
          - name: b
            image: argoproj/argosay:v2
          - name: main
            image: argoproj/argosay:v2
            dependencies:
              - a
              - b
      outputs:
        parameters:
          - name: message
            valueFrom:
              path: /workspace/message

有几点需要注意:

  1. 您必须使用 Emissary Executor。

  2. 或者所有容器必须并行运行——即它是一个没有依赖关系的图。

  3. 您不能使用增强的依赖逻辑。

  4. 它将使用所有资源请求的总和,可能比相同的 DAG 模板花费更多。如果您的请求已经花费了很多,这将是一个问题。见下文。

通过指定依赖关系,可以将容器排列为图形。这适用于运行 10 个而不是 100 个容器。

输入和输出

与 container 和 script 模板一样,输入和输出只能从名为 main 的容器中加载和保存。

所有有制品的 container set 模板必须/应该有一个名为 main 的容器。

如果要使用基础层制品,main 必须最后完成,因此它必须是图中的根节点。

那可能不切实际。

相反,拥有一个工作区卷并确保所有工件路径都在该卷上。

⚠️资源请求

一个 container set 实际上启动了所有容器,而 Emissary 仅在它所依赖的容器完成后才启动主容器进程。这意味着即使容器没有做任何有用的工作,它仍然在消耗资源,并且您仍然需要为它们付费。

如果您的请求很小,这将不是问题。

如果您的请求很大,请设置资源请求,以便资源总和是您一次性所需的最大值。

示例 A:一个简单的序列,例如a -> b -> c

  • a 需要 1Gi 内存

  • b 需要 2Gi 内存

  • c 需要 1Gi 内存

然后你知道你最多只需要 2Gi。你可以设置如下:

  • a 请求 512Mi 内存

  • b 请求 1Gi 内存

  • c 请求 512Mi 内存

总共是2Gi,对于b来说已经足够了。所有任务都能正常运行。

示例 B:菱形 DAG,例如菱形 a -> b -> d 和 a -> c -> d,即 b 和 c 同时运行。

  • a 需要 1000 cpu

  • b 需要 2000 cpu

  • c 需要 1000 cpu

  • d 需要 1000 cpu

我知道 b 和 c 将同时运行。所以我需要确保总数是3000。

  • a 请求 500 cpu

  • b 请求 1000 cpu

  • c 请求 1000 cpu

  • d 请求 500 cpu

一共3000,足够b+c了。所有任务都能正常运行。

示例 C:不平衡的请求,例如a -> b 其中 a 便宜而 b 昂贵

  • a 需要 100 cpu, 1Mi 内存, 运行 10h

  • b 需要 8Ki GPU, 100 Gi 内存, 200 Ki GPU, 运行 5m

你能看出这里的问题吗? a 只有小请求,但容器集将使用所有请求的总数。因此,就好像您将所有 GPU 都使用了 10 小时。这将是昂贵的。

解决方案:当您有不平衡的请求时,不要使用容器集

数据获取和转换

v3.1 and after

我们有意使此功能仅具有基本功能。我们希望我们能够根据社区的反馈构建此功能。如果您对此功能有想法和用例,请在 GitHub 上打开增强提案

介绍

用户经常将获取和转换数据作为其工作流程的一部分。data 模板为这些常见操作提供一流的支持。

可以通过查看 bash 中的常见数据源和转换操作来理解 data 模板:

find -r . | grep ".pdf" | sed "s/foo/foo.ready/"

此类操作包括两个主要部分:

  • 一个数据源:find -r .

  • 一系列转换操作,串行转换源的输出:| grep ".pdf" | sed "s/foo/foo.ready/"

例如,此操作在寻找待处理文件的潜在列表以及根据需要过滤和操作列表时可能很有用。

在 Argo 中,这个操作可以写成:

- name: generate-artifacts
  data:
    source:             # Define a source for the data, only a single "source" is permitted
      artifactPaths:    # A predefined source: Generate a list of all artifact paths in a given repository
        s3:             # Source from an S3 bucket
          bucket: test
          endpoint: minio:9000
          insecure: true
          accessKeySecret:
            name: my-minio-cred
            key: accesskey
          secretKeySecret:
            name: my-minio-cred
            key: secretkey
    transformation:     # The source is then passed to be transformed by transformations defined here
      - expression: "filter(data, {# endsWith \".pdf\"})"
      - expression: "map(data, {# + \".ready\"})"

Spec

data 模板必须包含一个 source 字段。当前可用的数据源有:

  • artifactPaths:从指定的制品库生成制品路径列表

data 模板可能包含多个 transformations(也可能是0个)。转换将按顺序连续应用。当前可用的转换:

  • expression:一个expr表达式。语言定义请看这里。在定义 expr 表达式时,Argo 会将可用数据传递给名为 data 的环境变量(参见上面的示例)。

我们知道 expression 转换是有限的。我们打算根据社区的反馈大大扩展此模板的功能。请参阅本文档顶部的链接以提交有关此功能的想法或用例。

内联模板

v3.2 and after

您可以在 DAG 和步骤中内联其他模板。

示例:

  • DAG
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-inline-
  labels:
    workflows.argoproj.io/test: "true"
  annotations:
    workflows.argoproj.io/description: |
      This example demonstrates running a DAG with inline templates.
    workflows.argoproj.io/version: ">= 3.2.0"
spec:
  entrypoint: main
  templates:
    - name: main
      dag:
        tasks:
          - name: a
            inline:
              container:
                image: argoproj/argosay:v2
  • Steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-inline-
  labels:
    workflows.argoproj.io/test: "true"
  annotations:
    workflows.argoproj.io/description: |
      This workflow demonstrates running a steps with inline templates.
    workflows.argoproj.io/version: ">= 3.2.0"
spec:
  entrypoint: main
  templates:
    - name: main
      steps:
        - - name: a
            inline:
              container:
                image: argoproj/argosay:v2

警告:您只能内联一次。在 DAG 中内联 DAG 将不起作用。

访问控制

服务账户

配置服务帐户以运行工作流

Roles, Role-Bindings 和 Service Accounts

为了让 Argo 支持制品、输出、访问 secrets 等功能,它需要使用 Kubernetes API 与 Kubernetes 资源进行通信。为了与 Kubernetes API 通信,Argo 使用一个 ServiceAccount 授权自己访问 Kubernetes API。您可以通过使用 RoleBinding 将角色绑定到 ServiceAccount 来指定 Argo 使用的 ServiceAccount 的角色(即哪些权限)。

然后,在提交工作流时,您可以指定 Argo 使用哪个 ServiceAccount :

argo submit --serviceaccount <name>

如果没提供 ServiceAccount,Argo 将会使用其所运行的命名空间的 default ServiceAccount,默认情况下几乎总是没有足够的权限。

有关为您的用例授予 Argo 必要权限的更多信息,请参阅 Workflow RBAC。

授予 admin 权限

出于本演示的目的,我们将授予default ServiceAccount 管理员权限(即,我们将 admin Role 绑定到当前命名空间的default ServiceAccount):

kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default -n argo

请注意,这将向运行命令的命名空间中的默认 ServiceAccount 授予管理员权限,因此您将只能在创建 RoleBinding 的命名空间中运行工作流。

Workflow RBAC

工作流中的所有 pod 都使用 workflow.spec.serviceAccountName 中指定的服务账户运行,如果省略,则使用工作流命名空间的 default 服务账户。工作流需要的访问权限取决于工作流需要做什么。例如,如果您的工作流需要部署资源,则工作流的服务帐户将需要对该资源的“create”权限。

警告:我们不建议在生产环境使用 default 服务账户。它是一个共享帐户,因此可能会添加您不想要的权限。相反,仅为您的工作流程创建一个服务帐户。

执行器工作所需的最小权限:

对于 >= v3.4:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: executor
rules:
  - apiGroups:
      - argoproj.io
    resources:
      - workflowtaskresults
    verbs:
      - create
      - patch

对于 <= v3.3 使用。

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: executor
rules:
  - apiGroups:
      - ""
    resources:
      - pods
    verbs:
      - get
      - patch

警告:对于许多组织来说,给工作流 Pod 补丁权限可能是不可接受的,参考#3961

如果您不使用 emissary,则需要额外的权限。请参阅 executor 以获得合适的权限。

功能

Workflow 变量

工作流规范中的某些字段允许由 Argo 自动替换的变量引用。

如何使用变量

变量用花括号括起来:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-world-parameters-
spec:
  entrypoint: whalesay
  arguments:
    parameters:
      - name: message
        value: hello world
  templates:
    - name: whalesay
      inputs:
        parameters:
          - name: message
      container:
        image: docker/whalesay
        command: [ cowsay ]
        args: [ "{{inputs.parameters.message}}" ]

以下变量可用于引用工作流的各种元数据:

模板标签类型

有两种模板标签:

  • 简单的: 默认,示例 {{workflow.name}}

  • 表达式: {{ 后紧跟着 =,例如 {{=workflow.name}}

简单的

标签被替换为与标签同名的变量。

简单标签可能在括号和变量之间有空格,如下所示。但是,存在一个已知问题,即变量可能无法使用空白进行插值,因此建议在解决此问题之前避免使用空白。请使用可重现的示例报告意外行为。

args: [ "{{ inputs.parameters.message }}" ]  

表达式

Since v3.1

该标签被作为表达式评估该标签的结果替换。

请注意,任何带连字符的参数名称或步骤名称都会导致解析错误。您可以通过索引参数或步骤图来引用它们,例如 inputs.parameters['my-param']steps['my-step'].outputs.result

学习表达式语法

示例

普通列表:

[1, 2]

列表过滤:

filter([1, 2], { # > 1})

映射一个列表:

map([1, 2], { # * 2 })

我们提供了一些核心函数:

转成 int:

asInt(inputs.parameters['my-int-param'])

转成 float:

asFloat(inputs.parameters['my-float-param'])

转成字符串:

string(1)

转成JSON串(需要 withParam):

toJson([1, 2])

从 JSON 中提取数据:

jsonpath(inputs.parameters.json, '$.some.path')

您还可以使用 Sprig 函数

修剪字符串:

sprig.trim(inputs.parameters['my-string-param'])

!!!警告 在 Sprig 函数中,通常不会引发错误。例如。如果 int 用于无效值,则返回 0。请查看 Sprig 文档以了解哪些函数可以使用,哪些不可以。

引用

所有模板

变量 描述
inputs.parameters.<NAME> 给模板传入参数
inputs.parameters 模板的所有输入参数作为 JSON 字符串
inputs.artifacts.<NAME> 将制品输入到模板

Steps 模板

变量 描述
steps.name 步骤名称
steps.<STEPNAME>.id 容器步骤的唯一 ID
steps.<STEPNAME>.ip 任何先前守护程序 container 步骤的 IP 地址
steps.<STEPNAME>.status 任何先前步骤的状态短语
steps.<STEPNAME>.exitCode 任何先前 script/container 步骤的退出码
steps.<STEPNAME>.startedAt 当前步骤开始的时间戳
steps.<STEPNAME>.finishedAt 当前步骤结束的时间戳
steps.<STEPNAME>.outputs.result 任何先前 script/container 步骤的输出结果
steps.<STEPNAME>.outputs.parameters When the previous step uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation
steps.<STEPNAME>.outputs.parameters.<NAME> Output parameter of any previous step. When the previous step uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation
steps.<STEPNAME>.outputs.artifacts.<NAME> 任何先前步骤的输出工件

DAG 模板

变量 描述
tasks.name 任务名称
tasks.<TASKNAME>.id container 任务的唯一ID
tasks.<TASKNAME>.ip 任何先前守护程序 container 任务的 IP 地址
tasks.<TASKNAME>.status 任何先前任务的状态短语
tasks.<TASKNAME>.exitCode 任何先前 script/container 任务的退出码
tasks.<TASKNAME>.startedAt 当前任务开始的时间戳
tasks.<TASKNAME>.finishedAt 当前任务结束的时间戳
tasks.<TASKNAME>.outputs.result 任何先前 script/container 任务的输出结果
tasks.<TASKNAME>.outputs.parameters When the previous task uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation
tasks.<TASKNAME>.outputs.parameters.<NAME> Output parameter of any previous task. When the previous task uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation
tasks.<TASKNAME>.outputs.artifacts.<NAME> 任何先前任务的输出工件

HTTP 模板

Since v3.3

Only available for successCondition

变量 描述
request.method 请求方法(字符串)
request.url 请求URL(字符串)
request.body 请求体(字符串)
request.headers 请求头 (map[string][]string)
response.statusCode 响应状态码 (int)
response.body 响应体(string)
response.headers 响应头 (map[string][]string)

重试策略

When using the expression field within retryStrategy, special variables are available.

变量 描述
lastRetry.exitCode Exit code of the last retry
lastRetry.Status Status of the last retry
lastRetry.Duration Duration in seconds of the last retry

Note: These variables evaluate to a string type. If using advanced expressions, either cast them to int values (expression: "{{=asInt(lastRetry.exitCode) >= 2}}") or compare them to string values (expression: "{{=lastRetry.exitCode != '2'}}").

Container/Script 模板

变量 描述
pod.name container/script 的 Pod 名称
retries 如果指定了重试策略,container/script的重试次数
inputs.artifacts.<NAME>.path 输入工件的本地路径
outputs.artifacts.<NAME>.path 输出工件的本地路径
outputs.parameters.<NAME>.path 输出参数的本地路径

循环 (withItems / withParam)

变量 描述
item 元素列表
item.<FIELDNAME> 列表中元素字段

指标

When emitting custom metrics in a template, special variables are available that allow self-reference to the current step.

变量 描述
status Phase status of the metric-emitting template
duration Duration of the metric-emitting template in seconds (only applicable in Template-level metrics, for Workflow-level use workflow.duration)
exitCode Exit code of the metric-emitting template
inputs.parameters.<NAME> Input parameter of the metric-emitting template
outputs.parameters.<NAME> Output parameter of the metric-emitting template
outputs.result Output result of the metric-emitting template
resourcesDuration.{cpu,memory} Resources duration in seconds. Must be one of resourcesDuration.cpu or resourcesDuration.memory, if available. For more info, see the Resource Duration doc.

实时指标

Some variables can be emitted in real-time (as opposed to just when the step/task completes). To emit these variables in real time, set realtime: true under gauge (note: only Gauge metrics allow for real time variable emission). Metrics currently available for real time emission:

For Workflow-level metrics:

  • workflow.duration

For Template-level metrics:

  • duration

全局变量

变量 描述
workflow.name Workflow 名称
workflow.namespace Workflow 命名空间
workflow.serviceAccountName Workflow 服务账户名称
workflow.uid Workflow UID。用于设置资源或唯一工件位置的所有权引用
workflow.parameters.<NAME> Workflow的输入参数
workflow.parameters Workflow的所有输入参数的 JSON 字符串
workflow.outputs.parameters.<NAME> Workflow中的全局参数
workflow.outputs.artifacts.<NAME> 工作流中的全局工件
workflow.annotations.<NAME> Workflow的注解
workflow.labels.<NAME> Workflow的标签
workflow.creationTimestamp Workflow 的创建时间,格式为 RFC 3339 (e.g. 2018-08-23T05:42:49Z)
workflow.creationTimestamp.<STRFTIMECHAR> Workflow 的创建时间,格式为strftime.
workflow.creationTimestamp.RFC3339 Workflow 的创建时间,格式为 RFC 3339
workflow.priority Workflow的优先级
workflow.duration Workflow持续时间估计值可能与实际持续时间相差几秒钟
workflow.scheduledTime 被调度时间,格式为 RFC 3339 (仅用于 CronWorkflow)

退出处理器

变量 描述
workflow.status Workflow的状态。可选值为:Succeeded, Failed, Error
workflow.failures A list of JSON objects containing information about nodes that failed or errored during execution. Available fields: displayName, message, templateName, phase, podName, and finishedAt.

重试

Argo工作流提供了一系列重试失败步骤的选项。

在 WorkflowSpec 中配置 retryStrategy

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-container-
spec:
  entrypoint: retry-container
  templates:
  - name: retry-container
    retryStrategy:
      limit: "10"
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]

重试策略

使用retryPolicy选择要重试的失败:

  • Always:重试所有失败的步骤。

  • OnFailure: 重试其主容器在Kubernetes中标记为失败的步骤(这是默认值)。

  • OnError: 重试遇到Argo控制器错误或其init或wait容器失败的步骤

  • OnTransientError: Retry steps that encounter errors defined as transient, or errors matching the TRANSIENT_ERROR_PATTERN environment variable. Available in version 3.0 and later.

示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-on-error-
spec:
  entrypoint: error-container
  templates:
  - name: error-container
    retryStrategy:
      limit: "2"
      retryPolicy: "Always"
    container:
      image: python
      command: ["python", "-c"]
      # fail with a 80% probability
      args: ["import random; import sys; exit_code = random.choice(range(0, 5)); sys.exit(exit_code)"]

有条件地重试

v3.2 and after

可以使用 expression 来控制重试。expression 字段接受expr表达式,并且可以访问如下变量:

  • lastRetry.exitCode: 上次重试的退出码,如果不可用则为“-1”

  • lastRetry.status: 上次重试的结果短语: Error, Failed

  • lastRetry.duration: 上次重试耗时,秒

如果 expression 评估为 false,该步骤将不会重试。

示例:

# Only retry if the retryStrategy.expression condition is satisfied. In this example, retries will be made until a pod has
# exit code 2 or the limit of 10 is reached, whichever happens first.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-script-
spec:
  entrypoint: main
  templates:
    - name: main
      steps:
        - - name: safe-to-retry
            template: safe-to-retry
        - - name: retry
            template: retry-script
            arguments:
              parameters:
                - name: safe-to-retry
                  value: "{{steps.safe-to-retry.outputs.result}}"

    - name: safe-to-retry
      script:
        image: python:alpine3.6
        command: ["python"]
        source: |
          print("true")
    - name: retry-script
      inputs:
        parameters:
            - name: safe-to-retry
      retryStrategy:
        limit: "3"
        # Only continue retrying if the last exit code is greater than 1 and the input parameter is true
        expression: "asInt(lastRetry.exitCode) > 1 && {{inputs.parameters.safe-to-retry}} == true"
      script:
        image: python:alpine3.6
        command: ["python"]
        # Exit 1 with 50% probability and 2 with 50%
        source: |
          import random;
          import sys;
          exit_code = random.choice([1, 2]);
          sys.exit(exit_code)

Back-Off

您可以使用 backoff 配置重试之间的延迟。

# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: "10"
      backoff:
        duration: "1"       # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: "2"
        maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]

生命周期钩子

v3.3 and after

介绍

LifecycleHook 触发一个基于条件表达式的动作。它在工作流级别或模板级别进行配置,例如分别作为 workflow.statussteps.status 的功能。LifecycleHook 在执行期间执行并执行一次。

换句话说,LifecycleHook 的功能类似于带有条件表达式的退出处理程序

Workflow 级别 LifecycleHook: 满足配置的表达式时执行工作流。

示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: lifecycle-hook-
spec:
  entrypoint: main
  hooks:
    exit:
      template: http
    running:
      expression: workflow.status == "Running"
      template: http
  templates:
    - name: main
      steps:
      - - name: step1
          template: heads

    - name: heads
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["echo \"it was heads\""]

    - name: http
      http:
        # url: http://dummy.restapiexample.com/api/v1/employees
        url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"

Template-level Lifecycle-Hook: 满足配置的表达式时执行模板。

示例:

apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: lifecycle-hook-tmpl-level-
    spec:
      entrypoint: main
      templates:
        - name: main
          steps:
            - - name: step-1
                hooks:
                  exit:
                    # Expr will not support `-` on variable name. Variable should wrap with `[]`
                    expression: steps["step-1"].status == "Running"
                    template: http
                  success:
                    expression: steps["step-1"].status == "Succeeded"
                    template: http
                template: echo
            - - name: step2
                hooks:
                  exit:
                    expression: steps.step2.status == "Running"
                    template: http
                  success:
                    expression: steps.step2.status == "Succeeded"
                    template: http
                template: echo

        - name: echo
          container:
            image: alpine:3.6
            command: [sh, -c]
            args: ["echo \"it was heads\""]

        - name: http
          http:
            # url: http://dummy.restapiexample.com/api/v1/employees
            url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"

支持的条件

不支持的条件

  • outputs 不可用,因为 LifecycleHook 在执行期间执行,并且在该步骤完成之前不会产生输出。

通知用例

LifecycleHook 可用于根据工作流状态更改或模板状态更改配置通知,如下例所示:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
 generateName: lifecycle-hook-
spec:
 entrypoint: main
 hooks:
   exit:
     template: http
   running:
     expression: workflow.status == "Running"
     template: http
 templates:
   - name: main
     steps:
       - - name: step1
           template: heads

   - name: heads
     container:
       image: alpine:3.6
       command: [sh, -c]
       args: ["echo \"it was heads\""]

   - name: http
     http:
       url: http://dummy.restapiexample.com/api/v1/employees

步骤级记忆

v2.10 and after

介绍

工作流通常具有计算成本高的输出。此功能通过记忆先前运行的步骤来降低成本和工作流执行时间:它将模板的输出存储到具有可变 key 的指定缓存中。

缓存方法

目前,缓存只能使用 config-maps。这使您可以轻松地通过 kubectl 和 Kubernetes API 手动操作缓存条目,而无需通过 Argo。

使用记忆

记忆是在模板级别设置的。您必须指定一个键,它可以是静态字符串,但更多时候取决于输入。您还必须指定配置映射缓存的名称。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
   generateName: memoized-workflow-
spec:
   entrypoint: whalesay
   templates:
      - name: whalesay
        memoize:
           key: "{{inputs.parameters.message}}"
           cache:
              configMap:
                 name: whalesay-cache

注意:为了使用记忆,有必要为适当的(集群)角色添加configmaps 资源的 create 和 update 权限。在集群安装的情况下,应该更新 argo-cluster-role 集群角色,而对于命名空间安装,应该更新 argo-role 角色。

FAQ

  1. 如果你看到错误“error creating cache entry: ConfigMap "reuse-task" is invalid: []: Too long: must have at most 1048576 characters”,这是由于 ConfigMap 有 1MB 大小的限制。下面是一些解决方案:

    • 删除现有的 ConfigMap 缓存或切换到使用不同的缓存。

    • 减少正在记忆的节点的输出参数的大小。

    • 将您的缓存拆分为不同的记忆键和缓存名称,以便每个缓存条目都很小。

模板默认值

v3.1 and after

介绍

TemplateDefaults 功能使用户能够在工作流 spec 级别配置默认模板值,该值将应用于工作流中的所有模板。如果模板的值在 templateDefault 中也有默认值,则模板的值将优先。这些值将在运行时应用。将使用 Kubernetes 战略合并补丁合并模板值和默认值。要检查列表值是否合并以及如何合并,请检查工作流定义中的 patchStrategy 和 patchMergeKey 标签。

在 WorkflowSpec 中配置 templateDefaults

示例:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: template-defaults-example
spec:
  entrypoint: main
  templateDefaults:
    timeout: 30s   # timeout value will be applied to all templates
    retryStrategy: # retryStrategy value will be applied to all templates
      limit: 2
  templates:
  - name: main
    container:
      image: docker/whalesay:latest

template defaults example

在控制器级别配置templateDefaults

操作员可以在 workflow defaults中配置 templateDefaults。此 templateDefault 将应用于在控制器上运行的所有工作流。

以下将在 Config Map 中指定:

apiVersion: v1
kind: ConfigMap
metadata:
  name: workflow-controller-configmap
data:
  # Default values that will apply to all Workflows from this controller, unless overridden on the Workflow-level
  workflowDefaults: |
    metadata:
      annotations:
        argo: workflows
      labels:
        foo: bar
    spec:
      ttlStrategy:
        secondsAfterSuccess: 5
      templateDefaults:
        timeout: 30s 

增强的依赖逻辑

v2.9 and after

介绍

在 2.8 版之前,在 DAG 模板中指定依赖项的唯一方法是使用 dependencies 字段并指定当前任务所依赖的其他任务的列表。此语法具有限制性,因为它不允许用户指定要依赖的任务结果。例如,一个任务只有在其依赖的任务执行成功(或失败等)时才会运行。

依赖

为了解决这个问题,存在一个名为depends 的新字段,它允许用户指定相关任务、它们的状态以及任何复杂的布尔逻辑。该字段是一个字符串字段,其语法类似于表达式,格式为 <task-name>.<task-result>。示例包括 task-1.Suceeded、task-2.Failed、task-3.Daemoned。可用任务结果的完整列表如下:

任务结果 描述 含义
.Succeeded 任务执行成功 任务执行完成,且没有错误
.Failed 任务执行失败 任务退出码非0
.Errored 任务报错 任务有一个非 0 退出代码以外的错误
.Skipped 任务被跳过 任务被跳过
.Omitted 任务被省略 任务被省略
.Daemoned 任务是守护进程并且不是 Pending 状态

为方便起见,如果省略的任务结果等价于 (task.Succeeded || task.Skipped || task.Daemoned)。

例如:

depends: "task || task-2.Failed"

等价于:

depends: (task.Succeeded || task.Skipped || task.Daemoned) || task-2.Failed

也可以使用完整的布尔逻辑。操作符包括:

  • &&

  • ||

  • !

例如:

depends: "(task-2.Succeeded || task-2.Skipped) && !task-3.Failed"

如果您依赖使用了 withItems 的任务,您可以使用 .AnySucceeded 和 .AllFailed 依赖于任何项目任务是成功还是全部失败,例如:

depends: "task-1.AnySucceeded || task-2.AllFailed"

与 dependencies 和 dag.task.continueOn 的兼容性

此功能完全兼容 dependencies,转换很容易。

要转换,只需在 dependencies 中加入 &&

dependencies: ["A", "B", "C"]

等价于:

depends: "A && B && C"

由于 depends 中加入了控制逻辑,使用 depends 不能使用 dag.task.continueOn。此外,不能在同一个任务组中同时使用 dependenciesdepends

Node字段选择器

v2.8 and after

介绍

resume、stop 和 retry 等 Argo CLI 和 API 命令支持 --node-field-selector参数,允许用户选择命令要作用的一组节点。

与 CLI 一起使用时的格式为:

--node-field-selector=FIELD=VALUE

可选项

该字段可以是以下任何一项:

字段 描述
displayName Display name of the node. This is the name of the node as it is displayed on the CLI or UI, without considering its ancestors (see example below). This is a useful shortcut if there is only one node with the same displayName
name Full name of the node. This is the full name of the node, including its ancestors (see example below). Using name is necessary when two or more nodes share the same displayName and disambiguation is required.
templateName Template name of the node
phase Phase status of the node - e.g. Running
templateRef.name The name of the workflow template the node is referring to
templateRef.template The template within the workflow template the node is referring to
inputs.parameters.<NAME>.value The value of input parameter NAME

The operator can be '=' or '!='. Multiple selectors can be combined with a comma, in which case they are anded together.

示例

过滤输入参数 'foo' 等于 'bar' 的节点:

--node-field-selector=inputs.parameters.foo.value=bar

过滤输入参数 'foo' 等于 'bar' ,且状态短语不是 Running 的节点:

--node-field-selector=foo1=bar1,phase!=Running

考虑以下工作流程:

 ● appr-promotion-ffsv4    code-release
 ├─✔ start                 sample-template/email                 appr-promotion-ffsv4-3704914002  2s
 ├─● app1                  wftempl1/approval-and-promotion
 │ ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-524476380   2s
 │ └─ǁ wait-approval       sample-template/waiting-for-approval
 ├─✔ app2                  wftempl2/promotion
 │ ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-2580536603  2s
 │ ├─✔ pr-approval         sample-template/approval              appr-promotion-ffsv4-3445567645  2s
 │ └─✔ deployment          sample-template/promote               appr-promotion-ffsv4-970728982   1s
 └─● app3                  wftempl1/approval-and-promotion
   ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-388318034   2s
   └─ǁ wait-approval       sample-template/waiting-for-approval

模式

Empty Dir

虽然默认情况下,Docker 和 PNS 工作流执行器可以从基础层(例如 /tmp)获取输出工件/参数,但 Kubelet 和 K8SAPI 执行器都不能。如果您使用安全上下文运行工作流 pod,则不太可能从基础层获得输出工件/参数。

您可以通过将卷挂载到您的 pod 来解决此限制。最简单的方法是使用 emptyDir 卷。

只能用于工件/参数输出,如果需要的话,工件/参数输入会自动挂载一个 empty-dir。

此示例显示如何挂载输出卷:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: empty-dir-
spec:
  entrypoint: main
  templates:
    - name: main
      container:
        image: argoproj/argosay:v2
        command: [sh, -c]
        args: ["cowsay hello world | tee /mnt/out/hello_world.txt"]
        volumeMounts:
          - name: out
            mountPath: /mnt/out
      volumes:
        - name: out
          emptyDir: { }
      outputs:
        parameters:
          - name: message
            valueFrom:
              path: /mnt/out/hello_world.txt

Cron 回填

使用场景

  • 您正在使用 cron 工作流来运行日常作业,您可能需要重新运行某个日期,或者运行一些历史日期的作业。

解决方案

  1. 为您的日常工作创建工作流模板。

  2. 创建您的 cron 工作流程以每天运行并调用该模板。

  3. 创建一个使用 withSequence的回填工作流运行每日工作流。

这个完整示例包括:

  • 一个名为 job 的工作流模板

  • 一个名为 daily-job 的 cron 工作流

  • 一个名为 backfill-v1 的工作流,它使用资源模板为每个回填日期创建一个工作流。

  • 一个名为 backfill-v2 的替代工作流,它使用步骤模板为每个回填日期运行一个任务。

状态

资源预估

v2.7 and after

Argo Workflows 指示您的工作流使用了多少资源并保存此信息。这是一个指示性但不准确的值。

预估耗时

v2.12 and after

当您运行工作流时,控制器将尝试预估其耗时。

这是基于从同一工作流模板、集群工作流模板或 cron 工作流提交的最近成功的工作流。

为了获取这些数据,控制器首先查询 Kubernetes API(因为这样更快),然后是工作流存档(如果启用)。

如果您使用过 Jenkins 之类的工具,您就会知道估计可能不准确:

  • 一个 pod 花费了很长时间等待调度。

  • 工作流程是不确定的,例如它使用何时执行不同的路径。

  • 工作流程可能因规模而异,例如有时它使用 withItems,因此有时运行 100 个节点,有时运行 1000 个。

  • 如果 pod 运行时间不可预测。

  • 工作流是参数化的,不同的参数会影响其持续时间。

工作流程进度

v2.12 and after

当您运行工作流时,控制器将报告其进度。

我们将进度定义为两个数字,N/M 使得 0 <= N <= M 和 0 <= M。

  • N 是已完成的任务量。

  • M 是任务总量。

如 0/0、0/1 或 50/100。

和预计持续时间不同,进度是确定的。无论有任何问题,每个工作流程都是相同的。

每个节点的进度计算如下:

  1. 对于 pod 节点,如果完成则为 1/1,否则为 0/1。

  2. 对于非叶节点,其子节点的总和。

对于整个工作流,进度是其所有叶节点的总和。

警告:每次将节点添加到图中,M 都会在工作流运行期间增加。

自我报告进度

工作流中的 Pod 可以在运行时报告自己的进度。这个自我报告的进度会覆盖自动生成的进度。

报告进度实现方式如下:

  • 创建进度并将进度写入环境变量 ARGO_PROGRESS_FILE 指示的文件

  • 进度格式必须为 N/M

执行器将每 3 秒读取一次此文件,如果有更新,会更新 pod 的 workflows.argoproj.io/progress: N/M 注解。控制器读取该数据,并将进度写入适当的状态属性。

最初,工作流 pod 的进度始终为 0/1。如果您想影响这一点,请确保在 pod 上设置初始进度注解:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: progress-
spec:
  entrypoint: main
  templates:
    - name: main
      dag:
        tasks:
          - name: progress
            template: progress
    - name: progress
      metadata:
        annotations:
          workflows.argoproj.io/progress: 0/100
      container:
        image: alpine:3.14
        command: [ "/bin/sh", "-c" ]
        args:
          - |
            for i in `seq 1 10`; do sleep 10; echo "$(($i*10))"'/100' > $ARGO_PROGRESS_FILE; done

工作流创建者

v2.9 and after

如果您通过 CLI 或 UI 创建工作流,则会自动添加一个label,标记创建它的用户

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: my-wf
  labels:
    workflows.argoproj.io/creator: admin
    # labels must be DNS formatted, so the "@" is replaces by '.at.'  
    workflows.argoproj.io/creator-email: admin.at.your.org
    workflows.argoproj.io/creator-preferred-username: admin-preferred-username

相关文章

网友评论

    本文标题:Argo Workflows用户手册

    本文链接:https://www.haomeiwen.com/subject/rzadcdtx.html