美文网首页Kubernetes
Argo Workflows用户手册

Argo Workflows用户手册

作者: 王勇1024 | 来源:发表于2023-01-05 10:28 被阅读0次

    介绍

    Argo Workflows 是一个开源容器原生工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。

    • 定义工作流中的每个步骤都是一个容器的工作流。

    • 将多步骤工作流建模为任务序列或使用有向无环图 (DAG) 捕获任务之间的依赖关系。

    • 使用 Kubernetes 上的 Argo Workflows 在很短的时间内轻松运行机器学习或数据处理的计算密集型作业。

    • 在 Kubernetes 上本地运行 CI/CD pipeline,无需配置复杂的软件开发产品。

    安装Argo Workflows

    CLI

    Mac

    # Download the binary
    curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-darwin-amd64.gz
    
    # Unzip
    gunzip argo-darwin-amd64.gz
    
    # Make binary executable
    chmod +x argo-darwin-amd64
    
    # Move binary to path
    mv ./argo-darwin-amd64 /usr/local/bin/argo
    
    # Test installation
    argo version
    

    Linux

    # Download the binary
    curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-linux-amd64.gz
    
    # Unzip
    gunzip argo-linux-amd64.gz
    
    # Make binary executable
    chmod +x argo-linux-amd64
    
    # Move binary to path
    mv ./argo-linux-amd64 /usr/local/bin/argo
    
    # Test installation
    argo version
    

    Controller 和 Server

    kubectl create namespace argo
    kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/install.yaml
    

    核心概念

    Workflow

    Workflow是 Argo 中最重要的资源,具有两个重要功能:

    1. 它定义了要执行的工作流。

    2. 它存储工作流的状态。

    Workflow Spec

    要执行的工作流在 Workflow.spec 字段中定义。工作流规范的核心结构是模板列表和入口点。Workflow spec的核心结构是一组 templates 和一个 entrypoint

    templates 可以松散地认为是“功能”:它们定义要执行的指令。entrypoint字段定义了“main”函数将是什么——即首先执行的模板。

    下面是一个只有一个 template 的 Workflow 的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: hello-world-  # Name of this Workflow
    spec:
      entrypoint: whalesay        # Defines "whalesay" as the "main" template
      templates:
      - name: whalesay            # Defining the "whalesay" template
        container:
          image: docker/whalesay
          command: [cowsay]
          args: ["hello world"]   # This template runs "cowsay" in the "whalesay" image with arguments "hello world"
    

    模板类型

    有6种 模板类型,可以分为两类。

    模板定义

    这些模板定义了要完成的工作,通常在一个容器中执行。

    Container

    可能是最通用的模板类型,将会调度一个 Container。模板的 spec 定义和 Kubernetes container spec定义相同。

    示例:

      - name: whalesay
        container:
          image: docker/whalesay
          command: [cowsay]
          args: ["hello world"]
    
    Script

    是对 container 的一种简便的封装。spec 定义和 container 相同,但增加了 source 字段:允许您就地定义脚本。该脚本将保存到文件中并为您执行。脚本的执行结果会自动导入到 Argo 变量 {{tasks.<NAME>.outputs.result}}{{steps.<NAME>.outputs.result}} 中(取决于调用方式)。

      - name: gen-random-int
        script:
          image: python:alpine3.6
          command: [python]
          source: |
            import random
            i = random.randint(1, 100)
            print(i)
    
    Resource

    直接操作集群资源。可用于 get、create、apply、delete、replace 或 patch 集群中的资源。

    这个示例会在集群是创建一个 ConfigMap 资源:

      - name: k8s-owner-reference
        resource:
          action: create
          manifest: |
            apiVersion: v1
            kind: ConfigMap
            metadata:
              generateName: owned-eg-
            data:
              some: value
    
    Suspend

    Suspend 模板将暂停执行一段时间或直到手动恢复。Suspend 模板可以通过 CLI(使用 argo resume)、API 接口或 UI 来恢复。

    示例:

      - name: delay
        suspend:
          duration: "20s"
    

    模板调用器

    这些模板用于调用其他模板并提供执行控制。

    Steps

    Steps 模板允许您在一系列步骤中定义您的任务。该模板的结构是“一组列表”。外部列表顺序执行,内部列表并行执行。如果你想顺序执行内部列表,请使用 Synchronization特性。您可以设置多种选项来控制执行,例如 when有条件地执行步骤的子句

    在这个示例中,首先执行 step1,待其执行完毕,再并行执行 step2a 和 step2b。

      - name: hello-hello-hello
        steps:
        - - name: step1
            template: prepare-data
        - - name: step2a
            template: run-data-first-half
          - name: step2b
            template: run-data-second-half
    
    DAG

    dag 模板允许您将任务定义为依赖关系图。在 DAG 中,您列出所有任务并设置在特定任务开始之前必须完成哪些其他任务。没有任何依赖关系的任务将立即运行。

    在这个示例中,A 首先执行,待其执行完毕后,B和C并行执行,待B和C都执行完毕后,执行D:

      - name: diamond
        dag:
          tasks:
          - name: A
            template: echo
          - name: B
            dependencies: [A]
            template: echo
          - name: C
            dependencies: [A]
            template: echo
          - name: D
            dependencies: [B, C]
            template: echo
    

    自定义资源类型

    WorkflowTemplates

    v2.4 and after

    介绍

    WorkflowTemplates 是集群中 Workflows 的定义。这允许您创建一个常用模板库,并通过直接提交它们(v2.7 及更高版本)或从您的 Workflows 中引用它们来重用它们。

    WorkflowTemplate vs template

    由于历史原因,WorkflowTemplate 和 template 存在一定的命名冲突,下面我们来介绍这两个术语的区别:

    • template(小写)是 Workflow 中的一个任务,或者是 templates 字段下的一个 WorkflowTemplate。当你定义一个 Workflow 时,你必须至少定义一个(但通常是多个)template。template 可以是 container、script、dag、steps、resources 或 suspend 类型,也可以引用一个 entrypoint 或其它 dag 和 step 的模板。

    这是一个有两个 templates 的 Workflow 的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: steps-
    spec:
      entrypoint: hello           # We reference our first "template" here
    
      templates:
      - name: hello               # The first "template" in this Workflow, it is referenced by "entrypoint"
        steps:                    # The type of this "template" is "steps"
        - - name: hello
            template: whalesay    # We reference our second "template" here
            arguments:
              parameters: [{name: message, value: "hello1"}]
    
      - name: whalesay             # The second "template" in this Workflow, it is referenced by "hello"
        inputs:
          parameters:
          - name: message
        container:                # The type of this "template" is "container"
          image: docker/whalesay
          command: [cowsay]
          args: ["{{inputs.parameters.message}}"]
    
    • WorkflowTemplate 用于定义集群中的 Workflow。由于它是 Workflow 的定义,它也包含 templates。这些 templates 可以引用当前 WorkflowTemplate,也可以引用集群内的其它 Workflows 和 WorkflowTemplates。更多信息请参考 Referencing Other WorkflowTemplates

    WorkflowTemplate Spec

    v2.7 and after

    在 v2.7 及更高版本中,WorkflowTemplates 支持 WorkflowSpec 中的所有字段(除 priority 外,它必须在 WorkflowSpec 中配置)。你可以将任何现有 Workflow 转换为 WorkflowTemplate,方法是将 kind: Workflow 替换为 kind: WorkflowTemplate。

    v2.4 – 2.6

    v2.4 - v2.6 中的 WorkflowTemplates 只是部分和 Workflow 定义相同,仅支持 templatesarguments 字段。

    这将不是 v2.4 - v2.6 中的有效 WorkflowTemplate(注意 entrypoint 字段):

    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: workflow-template-submittable
    spec:
      entrypoint: whalesay-template     # Fields other than "arguments" and "templates" not supported in v2.4 - v2.6
      arguments:
        parameters:
          - name: message
            value: hello world
      templates:
        - name: whalesay-template
          inputs:
            parameters:
              - name: message
          container:
            image: docker/whalesay
            command: [cowsay]
            args: ["{{inputs.parameters.message}}"]
    

    但是,这将是一个有效的 WorkflowTemplate:

    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: workflow-template-submittable
    spec:
      arguments:
        parameters:
          - name: message
            value: hello world
      templates:
        - name: whalesay-template
          inputs:
            parameters:
              - name: message
          container:
            image: docker/whalesay
            command: [cowsay]
            args: ["{{inputs.parameters.message}}"]
    

    使用 workflowMetadata 向工作流添加 labels/annotations

    2.10.2 and after

    要自动向从 WorkflowTemplates 创建的工作流添加 labels/annotations,请使用 workflowMetadata。

    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: workflow-template-submittable
    spec:
      workflowMetadata:
        labels:
          example-label: example-value
    

    使用参数

    在 WorkflowTemplate 中使用参数时,请注意以下几点:

    • 使用全局参数时,您可以在 Workflow 中实例化全局变量,然后在 WorkflowTemplate 中直接引用它们。如下所示:
    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: hello-world-template-global-arg
    spec:
      serviceAccountName: argo
      templates:
        - name: hello-world
          container:
            image: docker/whalesay
            command: [cowsay]
            args: ["{{workflow.parameters.global-parameter}}"]
    ---
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: hello-world-wf-global-arg-
    spec:
      serviceAccountName: argo
      entrypoint: whalesay
      arguments:
        parameters:
          - name: global-parameter
            value: hello
      templates:
        - name: whalesay
          steps:
            - - name: hello-world
                templateRef:
                  name: hello-world-template-global-arg
                  template: hello-world
    
    • 使用本地参数时,必须在 WorkflowTemplate 内的模板定义中提供本地参数的值。如下所示:
    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: hello-world-template-local-arg
    spec:
      templates:
        - name: hello-world
          inputs:
            parameters:
              - name: msg
                value: "hello world"
          container:
            image: docker/whalesay
            command: [cowsay]
            args: ["{{inputs.parameters.msg}}"]
    ---
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: hello-world-local-arg-
    spec:
      entrypoint: whalesay
      templates:
        - name: whalesay
          steps:
            - - name: hello-world
                templateRef:
                  name: hello-world-template-local-arg
                  template: hello-world
    

    引用其它 WorkflowTemplates

    您可以使用 templateRef 字段从另一个 WorkflowTemplates 引用 template。正如您在同一个 Workflow 引用其他 templates 的方式一样,您应该在 steps 或 dag 模板中这样做。

    以下是来自 steps 模板的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      entrypoint: whalesay
      templates:
      - name: whalesay
        steps:                              # You should only reference external "templates" in a "steps" or "dag" "template".
          - - name: call-whalesay-template
              templateRef:                  # You can reference a "template" from another "WorkflowTemplate" using this field
                name: workflow-template-1   # This is the name of the "WorkflowTemplate" CRD that contains the "template" you want
                template: whalesay-template # This is the name of the "template" you want to reference
              arguments:                    # You can pass in arguments as normal
                parameters:
                - name: message
                  value: "hello world"
    

    您也可以使用 dag 模板进行类似操作:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      entrypoint: whalesay
      templates:
      - name: whalesay
        dag:
          tasks:
            - name: call-whalesay-template
              templateRef:
                name: workflow-template-1
                template: whalesay-template
              arguments:
                parameters:
                - name: message
                  value: "hello world"
    

    永远不应该直接在 template 对象上引用另一个 template(在 steps 或 dag 模板之外)。这包括使用 template 和 templateRef。此行为已弃用,不再受支持,并将在未来版本中删除。

    以下是不应使用的已弃用参考的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      entrypoint: whalesay
      templates:
      - name: whalesay
        template:                     # You should NEVER use "template" here. Use it under a "steps" or "dag" template (see above).
        templateRef:                  # You should NEVER use "templateRef" here. Use it under a "steps" or "dag" template (see above).
          name: workflow-template-1
          template: whalesay-template
        arguments:                    # Arguments here are ignored. Use them under a "steps" or "dag" template (see above).
          parameters:
          - name: message
            value: "hello world"
    

    从 WorkflowTemplate spec 创建 Workflow

    您可以使用 workflowTemplateRef 从 WorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与工作流模板参数合并。下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中将 entrypoint 和 arguments 传递给 WorkflowTemplate:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      entrypoint: whalesay-template
      arguments:
        parameters:
          - name: message
            value: "from workflow"
      workflowTemplateRef:
        name: workflow-template-submittable
    

    下面是一个从 WorkflowTemplate spec 创建 Workflow 的示例,其中使用 WorkflowTemplate 的 entrypoint 和 arguments 的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      workflowTemplateRef:
        name: workflow-template-submittable
    

    管理 WorkflowTemplates

    CLI

    您可以创建一些示例模板,如下所示:

    argo template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/templates.yaml
    

    然后使用这些模板之一提交工作流:

    argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/workflow-template/hello-world.yaml
    

    2.7 and after

    然后提交一个 WorkflowTemplate 作为 Workflow:

    argo submit --from workflowtemplate/workflow-template-submittable
    

    如果您需要将 WorkflowTemplate 作为带参数的 Workflow 提交:

    argo submit --from workflowtemplate/workflow-template-submittable -p param1=value1
    

    kubectl

    使用 kubectl apply -f 和 kubectl get wftmpl

    通过 Argo CD 实现 GitOps

    可以使用 Argo CD 中 GitOps 中管理 WorkflowTemplate 资源

    UI

    WorkflowTemplate 资源也可以由 UI 管理。

    当从 UI 提交 WorkflowTemplates 时,用户可以在 enum 下指定选项以启用下拉列表选择。

    apiVersion: argoproj.io/v1alpha1
    kind: WorkflowTemplate
    metadata:
      name: workflow-template-with-enum-values
    spec:
      entrypoint: argosay
      arguments:
        parameters:
          - name: message
            value: one
            enum:
              -   one
              -   two
              -   three
      templates:
        - name: argosay
          inputs:
            parameters:
              - name: message
                value: '{{workflow.parameters.message}}'
          container:
            name: main
            image: 'argoproj/argosay:v2'
            command:
              - /argosay
            args:
              - echo
              - '{{inputs.parameters.message}}'
    

    ClusterWorkflowTemplates

    v2.8 and after

    介绍

    ClusterWorkflowTemplates 是集群范围的 WorkflowTemplates。 ClusterWorkflowTemplate 可以像 ClusterRole 一样在集群范围内使用,并且可以跨集群中的所有命名空间访问。

    定义 ClusterWorkflowTemplate

    apiVersion: argoproj.io/v1alpha1
    kind: ClusterWorkflowTemplate
    metadata:
      name: cluster-workflow-template-whalesay-template
    spec:
      templates:
      - name: whalesay-template
        inputs:
          parameters:
          - name: message
        container:
          image: docker/whalesay
          command: [cowsay]
          args: ["{{inputs.parameters.message}}"]
    

    引用其它 ClusterWorkflowTemplates

    您可以使用带有 clusterScope: true 的 templateRef 字段从其他 ClusterWorkflowTemplates 中引用模板。正如您在同一 Workflow 中引用其他模板的方式一样,您应该在 steps 或 dag 模板中这样做。

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: workflow-template-hello-world-
    spec:
      entrypoint: whalesay
      templates:
      - name: whalesay
        steps:                              # You should only reference external "templates" in a "steps" or "dag" "template".
          - - name: call-whalesay-template
              templateRef:                  # You can reference a "template" from another "WorkflowTemplate or ClusterWorkflowTemplate" using this field
                name: cluster-workflow-template-whalesay-template   # This is the name of the "WorkflowTemplate or ClusterWorkflowTemplate" CRD that contains the "template" you want
                template: whalesay-template # This is the name of the "template" you want to reference
                clusterScope: true          # This field indicates this templateRef is pointing ClusterWorkflowTemplate
              arguments:                    # You can pass in arguments as normal
                parameters:
                - name: message
                  value: "hello world"
    

    2.9 and after

    通过 ClusterWorkflowTemplate Spec 创建 Workflow

    您可以使用带有 clusterScope: true 的 workflowTemplateRef 从 ClusterWorkflowTemplate spec 创建 Workflow。如果您将参数传递给创建的 Workflow,它将与集群 Workflow 模板参数合并。

    这是带有 entrypointarguments 的 ClusterWorkflowTemplate 示例:

    apiVersion: argoproj.io/v1alpha1
    kind: ClusterWorkflowTemplate
    metadata:
      name: cluster-workflow-template-submittable
    spec:
      entrypoint: whalesay-template
      arguments:
        parameters:
          - name: message
            value: hello world
      templates:
        - name: whalesay-template
          inputs:
            parameters:
              - name: message
          container:
            image: docker/whalesay
            command: [cowsay]
            args: ["{{inputs.parameters.message}}"]
    

    这是一个通过 ClusterWorkflowTemplate 创建为 Workflow 的示例,并将 entrypointarguments 传递给 ClusterWorkflowTemplate:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: cluster-workflow-template-hello-world-
    spec:
      entrypoint: whalesay-template
      arguments:
        parameters:
          - name: message
            value: "from workflow"
      workflowTemplateRef:
        name: cluster-workflow-template-submittable
        clusterScope: true
    

    这是一个通过 WorkflowTemplate 创建为 Workflow 并使用 WorkflowTemplates 的 entrypointarguments 的示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: cluster-workflow-template-hello-world-
    spec:
      workflowTemplateRef:
        name: cluster-workflow-template-submittable
        clusterScope: true
    

    管理 ClusterWorkflowTemplates

    CLI

    您可以创建一些示例模板,如下所示:

    argo cluster-template create https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/clustertemplates.yaml
    

    使用这些模板之一提交工作流:

    argo submit https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cluster-workflow-template/cluster-wftmpl-dag.yaml
    

    2.7 and after

    将 ClusterWorkflowTemplate 作为 Workflow 提交:

    argo submit --from clusterworkflowtemplate/workflow-template-submittable
    

    kubectl

    使用 kubectl apply -f 和 kubectl get cwft

    UI

    ClusterWorkflowTemplate 资源也可以由 UI 管理

    CronWorkflows

    v2.5 and after

    介绍

    CronWorkflow 是按预设时间表运行的工作流。它们旨在轻松地从 Workflow 转换并模仿与 Kubernetes CronJob 相同的选项。本质上,CronWorkflow = Workflow + 一些特定的 cron 选项。

    CronWorkflow Spec

    CronWorkflow spec 示例如下:

    apiVersion: argoproj.io/v1alpha1
    kind: CronWorkflow
    metadata:
      name: test-cron-wf
    spec:
      schedule: "* * * * *"
      concurrencyPolicy: "Replace"
      startingDeadlineSeconds: 0
      workflowSpec:
        entrypoint: whalesay
        templates:
        - name: whalesay
          container:
            image: alpine:3.6
            command: [sh, -c]
            args: ["date; sleep 90"]
    

    workflowSpec 和 workflowMetadata

    CronWorkflow.spec.workflowSpec 和 Workflow.spec 类型相同,用于从中创建的 Workflow 对象的模板。spec 下的所有内容都将转换为 Workflow。

    生成的 Workflow 名称将是基于 CronWorkflow 名称生成的名称。在此示例中,它可能类似于 test-cron-wf-tj6fe

    CronWorkflow.spec.workflowMetadata 可用于添加 labelsannotations

    CronWorkflow 选项

    选项名称 默认值 描述
    schedule None, must be provided 工作流调度计划。例如,5 4 * * *
    timezone Machine timezone 将根据 IANA 时区标准运行工作流的时区,例如,America/Los_Angeles
    suspend false 如果为true,工作流调度将不会发生。可以从 CLI、GitOps 或直接设置
    concurrencyPolicy Allow 多个工作流同时调度时的处理策略。可选项:
    * Allow:全部允许
    * Replcace:在安排新的之前删除所有旧的
    * Forbid:有旧的存在时不许有新的
    startingDeadlineSeconds 0 Number of seconds after the last successful run during which a missed Workflow will be run
    successfulJobsHistoryLimit 3 将保留的成功工作流的数量
    failedJobsHistoryLimit 1 将保留的失败工作流的数量

    Cron Schedule 语法

    cron 调度程序使用标准的 cron 语法,参看documented on Wikipedia

    此处记录了所使用的特定库的更详细文档。

    崩溃恢复

    如果 workflow-controller 崩溃(CronWorkflow controller 也会崩溃),您可以设置一些选项,以确保在 controller 关闭时计划的 CronWorkflow 仍然可以运行。主要是可以设置 startingDeadlineSeconds 来指定 CronWorkflow 最后一次成功运行后的最大秒数,在此期间错过的运行仍将被执行。

    例如,如果每分钟运行的 CronWorkflow 最后一次运行是在 12:05:00,并且控制器在 12:05:55 和 12:06:05 之间崩溃,那么 12:06:00 的预期执行时间将会错过。然而,如果将 startingDeadlineSeconds 设置为大于65的值(上次计划运行时间 12:05:00 和当前控制器重新启动时间 12:06:05 之间经过的时间量),那么 CronWorkflow 的实例将在 12:06:05 准确执行。

    由于设置了startingDeadlineSeconds,当前只会执行一个实例。

    此设置也可以与 concurrencyPolicy 一起配置,以实现更精细的控制。

    管理 CronWorkflow

    CLI

    可以使用基本命令从 CLI 创建 CronWorkflow:

    $ argo cron create cron.yaml
    Name:                          test-cron-wf
    Namespace:                     argo
    Created:                       Mon Nov 18 10:17:06 -0800 (now)
    Schedule:                      * * * * *
    Suspended:                     false
    StartingDeadlineSeconds:       0
    ConcurrencyPolicy:             Forbid
    
    $ argo cron list
    NAME           AGE   LAST RUN   SCHEDULE    SUSPENDED
    test-cron-wf   49s   N/A        * * * * *   false
    
    # some time passes
    
    $ argo cron list
    NAME           AGE   LAST RUN   SCHEDULE    SUSPENDED
    test-cron-wf   56s   2s         * * * * *   false
    
    $ argo cron get test-cron-wf
    Name:                          test-cron-wf
    Namespace:                     argo
    Created:                       Wed Oct 28 07:19:02 -0600 (23 hours ago)
    Schedule:                      * * * * *
    Suspended:                     false
    StartingDeadlineSeconds:       0
    ConcurrencyPolicy:             Replace
    LastScheduledTime:             Thu Oct 29 06:51:00 -0600 (11 minutes ago)
    NextScheduledTime:             Thu Oct 29 13:03:00 +0000 (32 seconds from now)
    Active Workflows:              test-cron-wf-rt4nf
    

    注意:NextScheduledRun 假设工作流控制器使用 UTC 作为其时区

    kubectl

    使用 kubectl apply -f 和 kubectl get cwf

    回填天数

    See cron backfill.

    GitOps via Argo CD

    CronWorkflow resources can be managed with GitOps by using Argo CD

    UI

    CronWorkflow 资源也可以由 UI 管理

    模板类型

    HTTP 模板

    v3.2 and after

    HTTP 模板可用于执行 HTTP 请求。

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: http-template-
    spec:
      entrypoint: main
      templates:
        - name: main
          steps:
            - - name: get-google-homepage
                template: http
                arguments:
                  parameters: [{name: url, value: "https://www.google.com"}]
        - name: http
          inputs:
            parameters:
              - name: url
          http:
            timeoutSeconds: 20 # Default 30
            url: "{{inputs.parameters.url}}"
            method: "GET" # Default GET
            headers:
              - name: "x-header-name"
                value: "test-value"
            # Template will succeed if evaluated to true, otherwise will fail
            # Available variables:
            #  request.body: string, the request body
            #  request.headers: map[string][]string, the request headers
            #  response.url: string, the request url
            #  response.method: string, the request method
            #  response.statusCode: int, the response status code
            #  response.body: string, the response body
            #  response.headers: map[string][]string, the response headers
            successCondition: "response.body contains \"google\"" # available since v3.3
            body: "test body" # Change request body
    

    Argo 代理

    HTTP 模板使用 Argo 代理,它独立于控制器执行请求。Agent 和 Workflow Controller 通过 WorkflowTaskSet CRD 进行通信,该 CRD 是为每个需要使用 Agent 的正在运行的 Workflow 创建的。

    为了使用 Argo 代理,您需要确保已添加适当的 Workflow RBAC 以将代理角色添加到 Argo 工作流。代理角色示例如下所示:

    # https://argoproj.github.io/argo-workflows/workflow-rbac/
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: agent
      annotations:
        workflows.argoproj.io/description: |
          This is the minimum recommended permissions needed if you want to use the agent, e.g. for HTTP or plugin templates.
          If <= v3.2 you must replace `workflowtasksets/status` with `patch workflowtasksets`.
    rules:
      - apiGroups:
          - argoproj.io
        resources:
          - workflowtasksets
        verbs:
          - list
          - watch
      - apiGroups:
          - argoproj.io
        resources:
          - workflowtasksets/status
        verbs:
          - patch
    

    Container Set 模板

    v3.1 and after

    Container set 模板和普通的 container 或 script 模板类似,但允许你在一个 pod 中指定运行多个 containers。

    由于这多个 container 包含在一个 pod 中,它们将被调度到同一台宿主机上。你可以使用便宜且便捷的 empty-dir 卷替代 PVC 来实现多个步骤间共享数据。

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: container-set-template-
    spec:
      entrypoint: main
      templates:
        - name: main
          volumes:
            - name: workspace
              emptyDir: { }
          containerSet:
            volumeMounts:
              - mountPath: /workspace
                name: workspace
            containers:
              - name: a
                image: argoproj/argosay:v2
              - name: b
                image: argoproj/argosay:v2
              - name: main
                image: argoproj/argosay:v2
                dependencies:
                  - a
                  - b
          outputs:
            parameters:
              - name: message
                valueFrom:
                  path: /workspace/message
    

    有几点需要注意:

    1. 您必须使用 Emissary Executor。

    2. 或者所有容器必须并行运行——即它是一个没有依赖关系的图。

    3. 您不能使用增强的依赖逻辑。

    4. 它将使用所有资源请求的总和,可能比相同的 DAG 模板花费更多。如果您的请求已经花费了很多,这将是一个问题。见下文。

    通过指定依赖关系,可以将容器排列为图形。这适用于运行 10 个而不是 100 个容器。

    输入和输出

    与 container 和 script 模板一样,输入和输出只能从名为 main 的容器中加载和保存。

    所有有制品的 container set 模板必须/应该有一个名为 main 的容器。

    如果要使用基础层制品,main 必须最后完成,因此它必须是图中的根节点。

    那可能不切实际。

    相反,拥有一个工作区卷并确保所有工件路径都在该卷上。

    ⚠️资源请求

    一个 container set 实际上启动了所有容器,而 Emissary 仅在它所依赖的容器完成后才启动主容器进程。这意味着即使容器没有做任何有用的工作,它仍然在消耗资源,并且您仍然需要为它们付费。

    如果您的请求很小,这将不是问题。

    如果您的请求很大,请设置资源请求,以便资源总和是您一次性所需的最大值。

    示例 A:一个简单的序列,例如a -> b -> c

    • a 需要 1Gi 内存

    • b 需要 2Gi 内存

    • c 需要 1Gi 内存

    然后你知道你最多只需要 2Gi。你可以设置如下:

    • a 请求 512Mi 内存

    • b 请求 1Gi 内存

    • c 请求 512Mi 内存

    总共是2Gi,对于b来说已经足够了。所有任务都能正常运行。

    示例 B:菱形 DAG,例如菱形 a -> b -> d 和 a -> c -> d,即 b 和 c 同时运行。

    • a 需要 1000 cpu

    • b 需要 2000 cpu

    • c 需要 1000 cpu

    • d 需要 1000 cpu

    我知道 b 和 c 将同时运行。所以我需要确保总数是3000。

    • a 请求 500 cpu

    • b 请求 1000 cpu

    • c 请求 1000 cpu

    • d 请求 500 cpu

    一共3000,足够b+c了。所有任务都能正常运行。

    示例 C:不平衡的请求,例如a -> b 其中 a 便宜而 b 昂贵

    • a 需要 100 cpu, 1Mi 内存, 运行 10h

    • b 需要 8Ki GPU, 100 Gi 内存, 200 Ki GPU, 运行 5m

    你能看出这里的问题吗? a 只有小请求,但容器集将使用所有请求的总数。因此,就好像您将所有 GPU 都使用了 10 小时。这将是昂贵的。

    解决方案:当您有不平衡的请求时,不要使用容器集

    数据获取和转换

    v3.1 and after

    我们有意使此功能仅具有基本功能。我们希望我们能够根据社区的反馈构建此功能。如果您对此功能有想法和用例,请在 GitHub 上打开增强提案

    介绍

    用户经常将获取和转换数据作为其工作流程的一部分。data 模板为这些常见操作提供一流的支持。

    可以通过查看 bash 中的常见数据源和转换操作来理解 data 模板:

    find -r . | grep ".pdf" | sed "s/foo/foo.ready/"
    

    此类操作包括两个主要部分:

    • 一个数据源:find -r .

    • 一系列转换操作,串行转换源的输出:| grep ".pdf" | sed "s/foo/foo.ready/"

    例如,此操作在寻找待处理文件的潜在列表以及根据需要过滤和操作列表时可能很有用。

    在 Argo 中,这个操作可以写成:

    - name: generate-artifacts
      data:
        source:             # Define a source for the data, only a single "source" is permitted
          artifactPaths:    # A predefined source: Generate a list of all artifact paths in a given repository
            s3:             # Source from an S3 bucket
              bucket: test
              endpoint: minio:9000
              insecure: true
              accessKeySecret:
                name: my-minio-cred
                key: accesskey
              secretKeySecret:
                name: my-minio-cred
                key: secretkey
        transformation:     # The source is then passed to be transformed by transformations defined here
          - expression: "filter(data, {# endsWith \".pdf\"})"
          - expression: "map(data, {# + \".ready\"})"
    

    Spec

    data 模板必须包含一个 source 字段。当前可用的数据源有:

    • artifactPaths:从指定的制品库生成制品路径列表

    data 模板可能包含多个 transformations(也可能是0个)。转换将按顺序连续应用。当前可用的转换:

    • expression:一个expr表达式。语言定义请看这里。在定义 expr 表达式时,Argo 会将可用数据传递给名为 data 的环境变量(参见上面的示例)。

    我们知道 expression 转换是有限的。我们打算根据社区的反馈大大扩展此模板的功能。请参阅本文档顶部的链接以提交有关此功能的想法或用例。

    内联模板

    v3.2 and after

    您可以在 DAG 和步骤中内联其他模板。

    示例:

    • DAG
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: dag-inline-
      labels:
        workflows.argoproj.io/test: "true"
      annotations:
        workflows.argoproj.io/description: |
          This example demonstrates running a DAG with inline templates.
        workflows.argoproj.io/version: ">= 3.2.0"
    spec:
      entrypoint: main
      templates:
        - name: main
          dag:
            tasks:
              - name: a
                inline:
                  container:
                    image: argoproj/argosay:v2
    
    • Steps
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: steps-inline-
      labels:
        workflows.argoproj.io/test: "true"
      annotations:
        workflows.argoproj.io/description: |
          This workflow demonstrates running a steps with inline templates.
        workflows.argoproj.io/version: ">= 3.2.0"
    spec:
      entrypoint: main
      templates:
        - name: main
          steps:
            - - name: a
                inline:
                  container:
                    image: argoproj/argosay:v2
    

    警告:您只能内联一次。在 DAG 中内联 DAG 将不起作用。

    访问控制

    服务账户

    配置服务帐户以运行工作流

    Roles, Role-Bindings 和 Service Accounts

    为了让 Argo 支持制品、输出、访问 secrets 等功能,它需要使用 Kubernetes API 与 Kubernetes 资源进行通信。为了与 Kubernetes API 通信,Argo 使用一个 ServiceAccount 授权自己访问 Kubernetes API。您可以通过使用 RoleBinding 将角色绑定到 ServiceAccount 来指定 Argo 使用的 ServiceAccount 的角色(即哪些权限)。

    然后,在提交工作流时,您可以指定 Argo 使用哪个 ServiceAccount :

    argo submit --serviceaccount <name>
    

    如果没提供 ServiceAccount,Argo 将会使用其所运行的命名空间的 default ServiceAccount,默认情况下几乎总是没有足够的权限。

    有关为您的用例授予 Argo 必要权限的更多信息,请参阅 Workflow RBAC。

    授予 admin 权限

    出于本演示的目的,我们将授予default ServiceAccount 管理员权限(即,我们将 admin Role 绑定到当前命名空间的default ServiceAccount):

    kubectl create rolebinding default-admin --clusterrole=admin --serviceaccount=argo:default -n argo
    

    请注意,这将向运行命令的命名空间中的默认 ServiceAccount 授予管理员权限,因此您将只能在创建 RoleBinding 的命名空间中运行工作流。

    Workflow RBAC

    工作流中的所有 pod 都使用 workflow.spec.serviceAccountName 中指定的服务账户运行,如果省略,则使用工作流命名空间的 default 服务账户。工作流需要的访问权限取决于工作流需要做什么。例如,如果您的工作流需要部署资源,则工作流的服务帐户将需要对该资源的“create”权限。

    警告:我们不建议在生产环境使用 default 服务账户。它是一个共享帐户,因此可能会添加您不想要的权限。相反,仅为您的工作流程创建一个服务帐户。

    执行器工作所需的最小权限:

    对于 >= v3.4:

    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: executor
    rules:
      - apiGroups:
          - argoproj.io
        resources:
          - workflowtaskresults
        verbs:
          - create
          - patch
    

    对于 <= v3.3 使用。

    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: executor
    rules:
      - apiGroups:
          - ""
        resources:
          - pods
        verbs:
          - get
          - patch
    

    警告:对于许多组织来说,给工作流 Pod 补丁权限可能是不可接受的,参考#3961

    如果您不使用 emissary,则需要额外的权限。请参阅 executor 以获得合适的权限。

    功能

    Workflow 变量

    工作流规范中的某些字段允许由 Argo 自动替换的变量引用。

    如何使用变量

    变量用花括号括起来:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: hello-world-parameters-
    spec:
      entrypoint: whalesay
      arguments:
        parameters:
          - name: message
            value: hello world
      templates:
        - name: whalesay
          inputs:
            parameters:
              - name: message
          container:
            image: docker/whalesay
            command: [ cowsay ]
            args: [ "{{inputs.parameters.message}}" ]
    

    以下变量可用于引用工作流的各种元数据:

    模板标签类型

    有两种模板标签:

    • 简单的: 默认,示例 {{workflow.name}}

    • 表达式: {{ 后紧跟着 =,例如 {{=workflow.name}}

    简单的

    标签被替换为与标签同名的变量。

    简单标签可能在括号和变量之间有空格,如下所示。但是,存在一个已知问题,即变量可能无法使用空白进行插值,因此建议在解决此问题之前避免使用空白。请使用可重现的示例报告意外行为。

    args: [ "{{ inputs.parameters.message }}" ]  
    

    表达式

    Since v3.1

    该标签被作为表达式评估该标签的结果替换。

    请注意,任何带连字符的参数名称或步骤名称都会导致解析错误。您可以通过索引参数或步骤图来引用它们,例如 inputs.parameters['my-param']steps['my-step'].outputs.result

    学习表达式语法

    示例

    普通列表:

    [1, 2]
    

    列表过滤:

    filter([1, 2], { # > 1})
    

    映射一个列表:

    map([1, 2], { # * 2 })
    

    我们提供了一些核心函数:

    转成 int:

    asInt(inputs.parameters['my-int-param'])
    

    转成 float:

    asFloat(inputs.parameters['my-float-param'])
    

    转成字符串:

    string(1)
    

    转成JSON串(需要 withParam):

    toJson([1, 2])
    

    从 JSON 中提取数据:

    jsonpath(inputs.parameters.json, '$.some.path')
    

    您还可以使用 Sprig 函数

    修剪字符串:

    sprig.trim(inputs.parameters['my-string-param'])
    

    !!!警告 在 Sprig 函数中,通常不会引发错误。例如。如果 int 用于无效值,则返回 0。请查看 Sprig 文档以了解哪些函数可以使用,哪些不可以。

    引用

    所有模板

    变量 描述
    inputs.parameters.<NAME> 给模板传入参数
    inputs.parameters 模板的所有输入参数作为 JSON 字符串
    inputs.artifacts.<NAME> 将制品输入到模板

    Steps 模板

    变量 描述
    steps.name 步骤名称
    steps.<STEPNAME>.id 容器步骤的唯一 ID
    steps.<STEPNAME>.ip 任何先前守护程序 container 步骤的 IP 地址
    steps.<STEPNAME>.status 任何先前步骤的状态短语
    steps.<STEPNAME>.exitCode 任何先前 script/container 步骤的退出码
    steps.<STEPNAME>.startedAt 当前步骤开始的时间戳
    steps.<STEPNAME>.finishedAt 当前步骤结束的时间戳
    steps.<STEPNAME>.outputs.result 任何先前 script/container 步骤的输出结果
    steps.<STEPNAME>.outputs.parameters When the previous step uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation
    steps.<STEPNAME>.outputs.parameters.<NAME> Output parameter of any previous step. When the previous step uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation
    steps.<STEPNAME>.outputs.artifacts.<NAME> 任何先前步骤的输出工件

    DAG 模板

    变量 描述
    tasks.name 任务名称
    tasks.<TASKNAME>.id container 任务的唯一ID
    tasks.<TASKNAME>.ip 任何先前守护程序 container 任务的 IP 地址
    tasks.<TASKNAME>.status 任何先前任务的状态短语
    tasks.<TASKNAME>.exitCode 任何先前 script/container 任务的退出码
    tasks.<TASKNAME>.startedAt 当前任务开始的时间戳
    tasks.<TASKNAME>.finishedAt 当前任务结束的时间戳
    tasks.<TASKNAME>.outputs.result 任何先前 script/container 任务的输出结果
    tasks.<TASKNAME>.outputs.parameters When the previous task uses withItems or withParams, this contains a JSON array of the output parameter maps of each invocation
    tasks.<TASKNAME>.outputs.parameters.<NAME> Output parameter of any previous task. When the previous task uses withItems or withParams, this contains a JSON array of the output parameter values of each invocation
    tasks.<TASKNAME>.outputs.artifacts.<NAME> 任何先前任务的输出工件

    HTTP 模板

    Since v3.3

    Only available for successCondition

    变量 描述
    request.method 请求方法(字符串)
    request.url 请求URL(字符串)
    request.body 请求体(字符串)
    request.headers 请求头 (map[string][]string)
    response.statusCode 响应状态码 (int)
    response.body 响应体(string)
    response.headers 响应头 (map[string][]string)

    重试策略

    When using the expression field within retryStrategy, special variables are available.

    变量 描述
    lastRetry.exitCode Exit code of the last retry
    lastRetry.Status Status of the last retry
    lastRetry.Duration Duration in seconds of the last retry

    Note: These variables evaluate to a string type. If using advanced expressions, either cast them to int values (expression: "{{=asInt(lastRetry.exitCode) >= 2}}") or compare them to string values (expression: "{{=lastRetry.exitCode != '2'}}").

    Container/Script 模板

    变量 描述
    pod.name container/script 的 Pod 名称
    retries 如果指定了重试策略,container/script的重试次数
    inputs.artifacts.<NAME>.path 输入工件的本地路径
    outputs.artifacts.<NAME>.path 输出工件的本地路径
    outputs.parameters.<NAME>.path 输出参数的本地路径

    循环 (withItems / withParam)

    变量 描述
    item 元素列表
    item.<FIELDNAME> 列表中元素字段

    指标

    When emitting custom metrics in a template, special variables are available that allow self-reference to the current step.

    变量 描述
    status Phase status of the metric-emitting template
    duration Duration of the metric-emitting template in seconds (only applicable in Template-level metrics, for Workflow-level use workflow.duration)
    exitCode Exit code of the metric-emitting template
    inputs.parameters.<NAME> Input parameter of the metric-emitting template
    outputs.parameters.<NAME> Output parameter of the metric-emitting template
    outputs.result Output result of the metric-emitting template
    resourcesDuration.{cpu,memory} Resources duration in seconds. Must be one of resourcesDuration.cpu or resourcesDuration.memory, if available. For more info, see the Resource Duration doc.

    实时指标

    Some variables can be emitted in real-time (as opposed to just when the step/task completes). To emit these variables in real time, set realtime: true under gauge (note: only Gauge metrics allow for real time variable emission). Metrics currently available for real time emission:

    For Workflow-level metrics:

    • workflow.duration

    For Template-level metrics:

    • duration

    全局变量

    变量 描述
    workflow.name Workflow 名称
    workflow.namespace Workflow 命名空间
    workflow.serviceAccountName Workflow 服务账户名称
    workflow.uid Workflow UID。用于设置资源或唯一工件位置的所有权引用
    workflow.parameters.<NAME> Workflow的输入参数
    workflow.parameters Workflow的所有输入参数的 JSON 字符串
    workflow.outputs.parameters.<NAME> Workflow中的全局参数
    workflow.outputs.artifacts.<NAME> 工作流中的全局工件
    workflow.annotations.<NAME> Workflow的注解
    workflow.labels.<NAME> Workflow的标签
    workflow.creationTimestamp Workflow 的创建时间,格式为 RFC 3339 (e.g. 2018-08-23T05:42:49Z)
    workflow.creationTimestamp.<STRFTIMECHAR> Workflow 的创建时间,格式为strftime.
    workflow.creationTimestamp.RFC3339 Workflow 的创建时间,格式为 RFC 3339
    workflow.priority Workflow的优先级
    workflow.duration Workflow持续时间估计值可能与实际持续时间相差几秒钟
    workflow.scheduledTime 被调度时间,格式为 RFC 3339 (仅用于 CronWorkflow)

    退出处理器

    变量 描述
    workflow.status Workflow的状态。可选值为:Succeeded, Failed, Error
    workflow.failures A list of JSON objects containing information about nodes that failed or errored during execution. Available fields: displayName, message, templateName, phase, podName, and finishedAt.

    重试

    Argo工作流提供了一系列重试失败步骤的选项。

    在 WorkflowSpec 中配置 retryStrategy

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: retry-container-
    spec:
      entrypoint: retry-container
      templates:
      - name: retry-container
        retryStrategy:
          limit: "10"
        container:
          image: python:alpine3.6
          command: ["python", -c]
          # fail with a 66% probability
          args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
    

    重试策略

    使用retryPolicy选择要重试的失败:

    • Always:重试所有失败的步骤。

    • OnFailure: 重试其主容器在Kubernetes中标记为失败的步骤(这是默认值)。

    • OnError: 重试遇到Argo控制器错误或其init或wait容器失败的步骤

    • OnTransientError: Retry steps that encounter errors defined as transient, or errors matching the TRANSIENT_ERROR_PATTERN environment variable. Available in version 3.0 and later.

    示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: retry-on-error-
    spec:
      entrypoint: error-container
      templates:
      - name: error-container
        retryStrategy:
          limit: "2"
          retryPolicy: "Always"
        container:
          image: python
          command: ["python", "-c"]
          # fail with a 80% probability
          args: ["import random; import sys; exit_code = random.choice(range(0, 5)); sys.exit(exit_code)"]
    

    有条件地重试

    v3.2 and after

    可以使用 expression 来控制重试。expression 字段接受expr表达式,并且可以访问如下变量:

    • lastRetry.exitCode: 上次重试的退出码,如果不可用则为“-1”

    • lastRetry.status: 上次重试的结果短语: Error, Failed

    • lastRetry.duration: 上次重试耗时,秒

    如果 expression 评估为 false,该步骤将不会重试。

    示例:

    # Only retry if the retryStrategy.expression condition is satisfied. In this example, retries will be made until a pod has
    # exit code 2 or the limit of 10 is reached, whichever happens first.
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: retry-script-
    spec:
      entrypoint: main
      templates:
        - name: main
          steps:
            - - name: safe-to-retry
                template: safe-to-retry
            - - name: retry
                template: retry-script
                arguments:
                  parameters:
                    - name: safe-to-retry
                      value: "{{steps.safe-to-retry.outputs.result}}"
    
        - name: safe-to-retry
          script:
            image: python:alpine3.6
            command: ["python"]
            source: |
              print("true")
        - name: retry-script
          inputs:
            parameters:
                - name: safe-to-retry
          retryStrategy:
            limit: "3"
            # Only continue retrying if the last exit code is greater than 1 and the input parameter is true
            expression: "asInt(lastRetry.exitCode) > 1 && {{inputs.parameters.safe-to-retry}} == true"
          script:
            image: python:alpine3.6
            command: ["python"]
            # Exit 1 with 50% probability and 2 with 50%
            source: |
              import random;
              import sys;
              exit_code = random.choice([1, 2]);
              sys.exit(exit_code)
    

    Back-Off

    您可以使用 backoff 配置重试之间的延迟。

    # This example demonstrates the use of retry back offs
    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: retry-backoff-
    spec:
      entrypoint: retry-backoff
      templates:
      - name: retry-backoff
        retryStrategy:
          limit: "10"
          backoff:
            duration: "1"       # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
            factor: "2"
            maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        container:
          image: python:alpine3.6
          command: ["python", -c]
          # fail with a 66% probability
          args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
    

    生命周期钩子

    v3.3 and after

    介绍

    LifecycleHook 触发一个基于条件表达式的动作。它在工作流级别或模板级别进行配置,例如分别作为 workflow.statussteps.status 的功能。LifecycleHook 在执行期间执行并执行一次。

    换句话说,LifecycleHook 的功能类似于带有条件表达式的退出处理程序

    Workflow 级别 LifecycleHook: 满足配置的表达式时执行工作流。

    示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: lifecycle-hook-
    spec:
      entrypoint: main
      hooks:
        exit:
          template: http
        running:
          expression: workflow.status == "Running"
          template: http
      templates:
        - name: main
          steps:
          - - name: step1
              template: heads
    
        - name: heads
          container:
            image: alpine:3.6
            command: [sh, -c]
            args: ["echo \"it was heads\""]
    
        - name: http
          http:
            # url: http://dummy.restapiexample.com/api/v1/employees
            url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"
    

    Template-level Lifecycle-Hook: 满足配置的表达式时执行模板。

    示例:

    apiVersion: argoproj.io/v1alpha1
        kind: Workflow
        metadata:
          generateName: lifecycle-hook-tmpl-level-
        spec:
          entrypoint: main
          templates:
            - name: main
              steps:
                - - name: step-1
                    hooks:
                      exit:
                        # Expr will not support `-` on variable name. Variable should wrap with `[]`
                        expression: steps["step-1"].status == "Running"
                        template: http
                      success:
                        expression: steps["step-1"].status == "Succeeded"
                        template: http
                    template: echo
                - - name: step2
                    hooks:
                      exit:
                        expression: steps.step2.status == "Running"
                        template: http
                      success:
                        expression: steps.step2.status == "Succeeded"
                        template: http
                    template: echo
    
            - name: echo
              container:
                image: alpine:3.6
                command: [sh, -c]
                args: ["echo \"it was heads\""]
    
            - name: http
              http:
                # url: http://dummy.restapiexample.com/api/v1/employees
                url: "https://raw.githubusercontent.com/argoproj/argo-workflows/4e450e250168e6b4d51a126b784e90b11a0162bc/pkg/apis/workflow/v1alpha1/generated.swagger.json"
    

    支持的条件

    不支持的条件

    • outputs 不可用,因为 LifecycleHook 在执行期间执行,并且在该步骤完成之前不会产生输出。

    通知用例

    LifecycleHook 可用于根据工作流状态更改或模板状态更改配置通知,如下例所示:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
     generateName: lifecycle-hook-
    spec:
     entrypoint: main
     hooks:
       exit:
         template: http
       running:
         expression: workflow.status == "Running"
         template: http
     templates:
       - name: main
         steps:
           - - name: step1
               template: heads
    
       - name: heads
         container:
           image: alpine:3.6
           command: [sh, -c]
           args: ["echo \"it was heads\""]
    
       - name: http
         http:
           url: http://dummy.restapiexample.com/api/v1/employees
    

    步骤级记忆

    v2.10 and after

    介绍

    工作流通常具有计算成本高的输出。此功能通过记忆先前运行的步骤来降低成本和工作流执行时间:它将模板的输出存储到具有可变 key 的指定缓存中。

    缓存方法

    目前,缓存只能使用 config-maps。这使您可以轻松地通过 kubectl 和 Kubernetes API 手动操作缓存条目,而无需通过 Argo。

    使用记忆

    记忆是在模板级别设置的。您必须指定一个键,它可以是静态字符串,但更多时候取决于输入。您还必须指定配置映射缓存的名称。

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
       generateName: memoized-workflow-
    spec:
       entrypoint: whalesay
       templates:
          - name: whalesay
            memoize:
               key: "{{inputs.parameters.message}}"
               cache:
                  configMap:
                     name: whalesay-cache
    

    注意:为了使用记忆,有必要为适当的(集群)角色添加configmaps 资源的 create 和 update 权限。在集群安装的情况下,应该更新 argo-cluster-role 集群角色,而对于命名空间安装,应该更新 argo-role 角色。

    FAQ

    1. 如果你看到错误“error creating cache entry: ConfigMap "reuse-task" is invalid: []: Too long: must have at most 1048576 characters”,这是由于 ConfigMap 有 1MB 大小的限制。下面是一些解决方案:

      • 删除现有的 ConfigMap 缓存或切换到使用不同的缓存。

      • 减少正在记忆的节点的输出参数的大小。

      • 将您的缓存拆分为不同的记忆键和缓存名称,以便每个缓存条目都很小。

    模板默认值

    v3.1 and after

    介绍

    TemplateDefaults 功能使用户能够在工作流 spec 级别配置默认模板值,该值将应用于工作流中的所有模板。如果模板的值在 templateDefault 中也有默认值,则模板的值将优先。这些值将在运行时应用。将使用 Kubernetes 战略合并补丁合并模板值和默认值。要检查列表值是否合并以及如何合并,请检查工作流定义中的 patchStrategy 和 patchMergeKey 标签。

    在 WorkflowSpec 中配置 templateDefaults

    示例:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      name: template-defaults-example
    spec:
      entrypoint: main
      templateDefaults:
        timeout: 30s   # timeout value will be applied to all templates
        retryStrategy: # retryStrategy value will be applied to all templates
          limit: 2
      templates:
      - name: main
        container:
          image: docker/whalesay:latest
    

    template defaults example

    在控制器级别配置templateDefaults

    操作员可以在 workflow defaults中配置 templateDefaults。此 templateDefault 将应用于在控制器上运行的所有工作流。

    以下将在 Config Map 中指定:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: workflow-controller-configmap
    data:
      # Default values that will apply to all Workflows from this controller, unless overridden on the Workflow-level
      workflowDefaults: |
        metadata:
          annotations:
            argo: workflows
          labels:
            foo: bar
        spec:
          ttlStrategy:
            secondsAfterSuccess: 5
          templateDefaults:
            timeout: 30s 
    

    增强的依赖逻辑

    v2.9 and after

    介绍

    在 2.8 版之前,在 DAG 模板中指定依赖项的唯一方法是使用 dependencies 字段并指定当前任务所依赖的其他任务的列表。此语法具有限制性,因为它不允许用户指定要依赖的任务结果。例如,一个任务只有在其依赖的任务执行成功(或失败等)时才会运行。

    依赖

    为了解决这个问题,存在一个名为depends 的新字段,它允许用户指定相关任务、它们的状态以及任何复杂的布尔逻辑。该字段是一个字符串字段,其语法类似于表达式,格式为 <task-name>.<task-result>。示例包括 task-1.Suceeded、task-2.Failed、task-3.Daemoned。可用任务结果的完整列表如下:

    任务结果 描述 含义
    .Succeeded 任务执行成功 任务执行完成,且没有错误
    .Failed 任务执行失败 任务退出码非0
    .Errored 任务报错 任务有一个非 0 退出代码以外的错误
    .Skipped 任务被跳过 任务被跳过
    .Omitted 任务被省略 任务被省略
    .Daemoned 任务是守护进程并且不是 Pending 状态

    为方便起见,如果省略的任务结果等价于 (task.Succeeded || task.Skipped || task.Daemoned)。

    例如:

    depends: "task || task-2.Failed"
    

    等价于:

    depends: (task.Succeeded || task.Skipped || task.Daemoned) || task-2.Failed
    

    也可以使用完整的布尔逻辑。操作符包括:

    • &&

    • ||

    • !

    例如:

    depends: "(task-2.Succeeded || task-2.Skipped) && !task-3.Failed"
    

    如果您依赖使用了 withItems 的任务,您可以使用 .AnySucceeded 和 .AllFailed 依赖于任何项目任务是成功还是全部失败,例如:

    depends: "task-1.AnySucceeded || task-2.AllFailed"
    

    与 dependencies 和 dag.task.continueOn 的兼容性

    此功能完全兼容 dependencies,转换很容易。

    要转换,只需在 dependencies 中加入 &&

    dependencies: ["A", "B", "C"]
    

    等价于:

    depends: "A && B && C"
    

    由于 depends 中加入了控制逻辑,使用 depends 不能使用 dag.task.continueOn。此外,不能在同一个任务组中同时使用 dependenciesdepends

    Node字段选择器

    v2.8 and after

    介绍

    resume、stop 和 retry 等 Argo CLI 和 API 命令支持 --node-field-selector参数,允许用户选择命令要作用的一组节点。

    与 CLI 一起使用时的格式为:

    --node-field-selector=FIELD=VALUE
    

    可选项

    该字段可以是以下任何一项:

    字段 描述
    displayName Display name of the node. This is the name of the node as it is displayed on the CLI or UI, without considering its ancestors (see example below). This is a useful shortcut if there is only one node with the same displayName
    name Full name of the node. This is the full name of the node, including its ancestors (see example below). Using name is necessary when two or more nodes share the same displayName and disambiguation is required.
    templateName Template name of the node
    phase Phase status of the node - e.g. Running
    templateRef.name The name of the workflow template the node is referring to
    templateRef.template The template within the workflow template the node is referring to
    inputs.parameters.<NAME>.value The value of input parameter NAME

    The operator can be '=' or '!='. Multiple selectors can be combined with a comma, in which case they are anded together.

    示例

    过滤输入参数 'foo' 等于 'bar' 的节点:

    --node-field-selector=inputs.parameters.foo.value=bar
    

    过滤输入参数 'foo' 等于 'bar' ,且状态短语不是 Running 的节点:

    --node-field-selector=foo1=bar1,phase!=Running
    

    考虑以下工作流程:

     ● appr-promotion-ffsv4    code-release
     ├─✔ start                 sample-template/email                 appr-promotion-ffsv4-3704914002  2s
     ├─● app1                  wftempl1/approval-and-promotion
     │ ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-524476380   2s
     │ └─ǁ wait-approval       sample-template/waiting-for-approval
     ├─✔ app2                  wftempl2/promotion
     │ ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-2580536603  2s
     │ ├─✔ pr-approval         sample-template/approval              appr-promotion-ffsv4-3445567645  2s
     │ └─✔ deployment          sample-template/promote               appr-promotion-ffsv4-970728982   1s
     └─● app3                  wftempl1/approval-and-promotion
       ├─✔ notification-email  sample-template/email                 appr-promotion-ffsv4-388318034   2s
       └─ǁ wait-approval       sample-template/waiting-for-approval
    

    模式

    Empty Dir

    虽然默认情况下,Docker 和 PNS 工作流执行器可以从基础层(例如 /tmp)获取输出工件/参数,但 Kubelet 和 K8SAPI 执行器都不能。如果您使用安全上下文运行工作流 pod,则不太可能从基础层获得输出工件/参数。

    您可以通过将卷挂载到您的 pod 来解决此限制。最简单的方法是使用 emptyDir 卷。

    只能用于工件/参数输出,如果需要的话,工件/参数输入会自动挂载一个 empty-dir。

    此示例显示如何挂载输出卷:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: empty-dir-
    spec:
      entrypoint: main
      templates:
        - name: main
          container:
            image: argoproj/argosay:v2
            command: [sh, -c]
            args: ["cowsay hello world | tee /mnt/out/hello_world.txt"]
            volumeMounts:
              - name: out
                mountPath: /mnt/out
          volumes:
            - name: out
              emptyDir: { }
          outputs:
            parameters:
              - name: message
                valueFrom:
                  path: /mnt/out/hello_world.txt
    

    Cron 回填

    使用场景

    • 您正在使用 cron 工作流来运行日常作业,您可能需要重新运行某个日期,或者运行一些历史日期的作业。

    解决方案

    1. 为您的日常工作创建工作流模板。

    2. 创建您的 cron 工作流程以每天运行并调用该模板。

    3. 创建一个使用 withSequence的回填工作流运行每日工作流。

    这个完整示例包括:

    • 一个名为 job 的工作流模板

    • 一个名为 daily-job 的 cron 工作流

    • 一个名为 backfill-v1 的工作流,它使用资源模板为每个回填日期创建一个工作流。

    • 一个名为 backfill-v2 的替代工作流,它使用步骤模板为每个回填日期运行一个任务。

    状态

    资源预估

    v2.7 and after

    Argo Workflows 指示您的工作流使用了多少资源并保存此信息。这是一个指示性但不准确的值。

    预估耗时

    v2.12 and after

    当您运行工作流时,控制器将尝试预估其耗时。

    这是基于从同一工作流模板、集群工作流模板或 cron 工作流提交的最近成功的工作流。

    为了获取这些数据,控制器首先查询 Kubernetes API(因为这样更快),然后是工作流存档(如果启用)。

    如果您使用过 Jenkins 之类的工具,您就会知道估计可能不准确:

    • 一个 pod 花费了很长时间等待调度。

    • 工作流程是不确定的,例如它使用何时执行不同的路径。

    • 工作流程可能因规模而异,例如有时它使用 withItems,因此有时运行 100 个节点,有时运行 1000 个。

    • 如果 pod 运行时间不可预测。

    • 工作流是参数化的,不同的参数会影响其持续时间。

    工作流程进度

    v2.12 and after

    当您运行工作流时,控制器将报告其进度。

    我们将进度定义为两个数字,N/M 使得 0 <= N <= M 和 0 <= M。

    • N 是已完成的任务量。

    • M 是任务总量。

    如 0/0、0/1 或 50/100。

    和预计持续时间不同,进度是确定的。无论有任何问题,每个工作流程都是相同的。

    每个节点的进度计算如下:

    1. 对于 pod 节点,如果完成则为 1/1,否则为 0/1。

    2. 对于非叶节点,其子节点的总和。

    对于整个工作流,进度是其所有叶节点的总和。

    警告:每次将节点添加到图中,M 都会在工作流运行期间增加。

    自我报告进度

    工作流中的 Pod 可以在运行时报告自己的进度。这个自我报告的进度会覆盖自动生成的进度。

    报告进度实现方式如下:

    • 创建进度并将进度写入环境变量 ARGO_PROGRESS_FILE 指示的文件

    • 进度格式必须为 N/M

    执行器将每 3 秒读取一次此文件,如果有更新,会更新 pod 的 workflows.argoproj.io/progress: N/M 注解。控制器读取该数据,并将进度写入适当的状态属性。

    最初,工作流 pod 的进度始终为 0/1。如果您想影响这一点,请确保在 pod 上设置初始进度注解:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: progress-
    spec:
      entrypoint: main
      templates:
        - name: main
          dag:
            tasks:
              - name: progress
                template: progress
        - name: progress
          metadata:
            annotations:
              workflows.argoproj.io/progress: 0/100
          container:
            image: alpine:3.14
            command: [ "/bin/sh", "-c" ]
            args:
              - |
                for i in `seq 1 10`; do sleep 10; echo "$(($i*10))"'/100' > $ARGO_PROGRESS_FILE; done
    

    工作流创建者

    v2.9 and after

    如果您通过 CLI 或 UI 创建工作流,则会自动添加一个label,标记创建它的用户

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      name: my-wf
      labels:
        workflows.argoproj.io/creator: admin
        # labels must be DNS formatted, so the "@" is replaces by '.at.'  
        workflows.argoproj.io/creator-email: admin.at.your.org
        workflows.argoproj.io/creator-preferred-username: admin-preferred-username
    

    相关文章

      网友评论

        本文标题:Argo Workflows用户手册

        本文链接:https://www.haomeiwen.com/subject/rzadcdtx.html