介绍
Argo Workflows 是一个开源容器原生工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。
特点如下:
-
工作流的每一步都是一个容器
-
将多步骤工作流建模为一系列任务,或者使用有向无环图(DAG)描述任务之间的依赖关系
-
可以在短时间内轻松运行用于机器学习或数据处理的计算密集型作业
-
在Kubernetes上运行CI/CD Pipeline,无需复杂的软件配置
Argo 可以让用户用一个类似于传统的 YAML 文件定义的 DSL 来运行多个步骤的 Pipeline。该框架提供了复杂的循环、条件判断、依赖管理等功能,这有助于提高部署应用程序的灵活性以及配置和依赖的灵活性。
系统架构
Init
当用户的 template 中需要使用到 inputs 中的 artifact 或者是 script 类型时(script 类型需要注入脚本),Argo 都会为这个 pod 加上一个 InitContainer —— 其镜像为 argoexec,命令是 argoexec init。在这个 Init Container 中,主要工作就是加载 artifact。
Wait
除了 Resource 类型外的 template,Argo 都会注入一个 Wait Container,用于等待 Main Container 的完成并结束所有 Sidecar 容器,并做一些收尾的工作(如捕获脚本结果,保存日志、输出参数、artifact等)。这个 Wait Container 的镜像同样为 argoexec,命令是 argoexec wait
。(Resource 类型的不需要是因为 Resource 类型的 template 直接使用 argoexec 作为 Main Container 运行)。
核心概念
Workflow
Workflow是Argo中最重要的资源,其主要有两个重要功能:
-
它定义要执行的工作流
-
它存储工作流程的状态
要执行的工作流定义在Workflow.spec字段中,其主要包括templates
和entrypoint
,如下:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world- # Workflow的配置名称
spec:
entrypoint: whalesay # 解析whalesay templates
templates:
- name: whalesay # 定义whalesay templates,和entrypoint保持一致
container: # 定义一个容器,输出"helloworld"
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
Templates
templates是列表结构,主要分为两类:
-
定义具体的工作流
-
调用其他模板提供并行控制
WorkflowTemplate
WorkflowTemplate 相当于 Workflow 的模板库,和 Workflow 一样,也由 template 组成。用户在创建完 WorkflowTemplate 后,可以通过直接提交它们来执行 Workflow。
Workflow Template 的定义与 Workflow 几乎一致,除了类型不同。正因为 Workflow 既可以是一个定义也可以是一个实例,所以才需要 WorkflowTemplate 作为 Workflow 的模板,WorkflowTemplate 在定义后可以通过提交(Submit)来创建一个 Workflow。
使用 workflowMetadata 向工作流添加 labels/annotations
要自动向从 WorkflowTemplates 创建的工作流添加 labels/annotations,请使用 workflowMetadata。
ClusterWorkflowTemplates
ClusterWorkflowTemplates 是集群范围
的 WorkflowTemplates。 ClusterWorkflowTemplate 可以像 ClusterRole 一样在集群范围内使用,并且可以跨集群中的所有命名空间访问。
apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
name: cluster-workflow-template-whalesay-template
spec:
templates:
- name: whalesay-template
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
CronTemplate
CronWorkflow 是按预设时间表运行的工作流。它们旨在轻松地从 Workflow 转换并模仿与 Kubernetes CronJob 相同的选项。本质上,CronWorkflow = Workflow + 一些特定的 cron 选项。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 90"]
Artifacts(工件)
您将需要配置一个工件存储库(推荐使用 S3)来运行此示例。在此处配置工件存储库。
在运行工作流时,生成或使用工件的步骤是很常见的。通常,一个步骤的输出工件可以用作后续步骤的输入工件。
下面的工作流程规范由两个按顺序运行的步骤组成。
第一个步骤名为 generate-artifact,将使用whalsay 模板生成一个工件,第二个步骤名为 print-message,将使用第一步生成的工件。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-passing-
spec:
entrypoint: artifact-example
templates:
- name: artifact-example
steps:
- - name: generate-artifact
template: whalesay
- - name: consume-artifact
template: print-message
arguments:
artifacts:
# bind message to the hello-art artifact
# generated by the generate-artifact step
- name: message
from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
# generate hello-art artifact from /tmp/hello_world.txt
# artifacts can be directories as well as files
- name: hello-art
path: /tmp/hello_world.txt
- name: print-message
inputs:
artifacts:
# unpack the message input artifact
# and put it at /tmp/message
- name: message
path: /tmp/message
container:
image: alpine:latest
command: [sh, -c]
args: ["cat /tmp/message"]
whalesay 模板使用 cowsay 命令生成一个名为 /tmp/hello-world.txt 的文件。然后它将这个文件输出为一个名为 hello-art 的工件。通常,工件的路径可能是一个目录而不仅仅是一个文件。print-message 模板接受一个名为 message 的输入工件,在名为 /tmp/message 的路径解压缩它,然后使用 cat 命令打印 /tmp/message 的内容。artifact-example 模板将 generate-artifact 步骤生成的 output hello-art 工件作为 print-message 步骤的 input 工件 message。DAG 模板使用 tasks 前缀来引用另一个任务,例如 {{tasks.generate-artifact.outputs.artifacts.hello-art}}。
Artifacts 被打包为 Tarballs,默认情况下被 gzip 压缩。您可以通过使用 archive 字段指定归档策略来自定义此行为。例如:
<... snipped ...>
outputs:
artifacts:
# default behavior - tar+gzip default compression.
- name: hello-art-1
path: /tmp/hello_world.txt
# disable archiving entirely - upload the file / directory as is.
# this is useful when the container layout matches the desired target repository layout.
- name: hello-art-2
path: /tmp/hello_world.txt
archive:
none: {}
# customize the compression behavior (disabling it here).
# this is useful for files with varying compression benefits,
# e.g. disabling compression for a cached build workspace and large binaries,
# or increasing compression for "perfect" textual data - like a json/xml export of a large database.
- name: hello-art-3
path: /tmp/hello_world.txt
archive:
tar:
# no compression (also accepts the standard gzip 1 to 9 values)
compressionLevel: 0
<... snipped ...>
定义具体的工作流
定义具体的工作流有7种类别,如下:
-
Container
-
Script
-
Resource
-
Suspend
-
HTTP
-
Container Set
-
Data
Container
container是最常用的模板类型,它将调度一个container,其模板规范和K8S的容器规范相同,如下:
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
Script
Script是Container的另一种包装实现,其定义方式和Container相同,只是增加了source
字段用于自定义脚本,如下:
- name: gen-random-int
script:
image: python:alpine3.6
command: [python]
source: |
import random
i = random.randint(1, 100)
print(i)
脚本的输出结果会根据调用方式自动导出到 {{tasks.outputs.result}}
或 {{steps.outputs.result}}
中。
Resource
Resource主要用于直接在K8S集群上执行集群资源操作,可以 get、create、apply、delete、replace、patch集群资源。如下在集群中创建一个ConfigMap类型资源:
- name: k8s-owner-reference
resource:
action: create
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
generateName: owned-eg-
data:
some: value
Suspend
Suspend主要用于暂停,可以暂停一段时间,也可以手动恢复,命令使用argo resume进行恢复。定义格式如下:
- name: delay
suspend:
duration: "20s"
HTTP
HTTP 模板可用于执行 HTTP 请求。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: http-template-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: get-google-homepage
template: http
arguments:
parameters: [{name: url, value: "https://www.google.com"}]
- name: http
inputs:
parameters:
- name: url
http:
timeoutSeconds: 20 # Default 30
url: "{{inputs.parameters.url}}"
method: "GET" # Default GET
headers:
- name: "x-header-name"
value: "test-value"
# Template will succeed if evaluated to true, otherwise will fail
# Available variables:
# request.body: string, the request body
# request.headers: map[string][]string, the request headers
# response.url: string, the request url
# response.method: string, the request method
# response.statusCode: int, the response status code
# response.body: string, the response body
# response.headers: map[string][]string, the response headers
successCondition: "response.body contains \"google\"" # available since v3.3
body: "test body" # Change request body
Container Set
Container Set 模板和普通的 container 或 script 模板类似,但允许你在一个 pod 中指定运行多个 containers。
由于这多个 container 包含在一个 pod 中,它们将被调度到同一台宿主机上。你可以使用 empty-dir 卷替代 PVC 来实现多个步骤间共享数据。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: container-set-template-
spec:
entrypoint: main
templates:
- name: main
volumes:
- name: workspace
emptyDir: { }
containerSet:
volumeMounts:
- mountPath: /workspace
name: workspace
containers:
- name: a
image: argoproj/argosay:v2
- name: b
image: argoproj/argosay:v2
- name: main
image: argoproj/argosay:v2
dependencies:
- a
- b
outputs:
parameters:
- name: message
valueFrom:
path: /workspace/message
Data
用户经常将获取和转换数据作为其工作流程的一部分。data
模板为这些常见操作提供一流的支持。
可以通过查看 bash
中的常见数据源和转换操作来理解 data
模板:
find -r . | grep ".pdf" | sed "s/foo/foo.ready/"
此类操作包括两个主要部分:
-
一个数据源:find -r .
-
一系列转换操作,串行转换源的输出:| grep ".pdf" | sed "s/foo/foo.ready/"
例如,此操作在寻找待处理文件的潜在列表以及根据需要过滤和操作列表时可能很有用。
在 Argo 中,这个操作可以写成:
- name: generate-artifacts
data:
source: # Define a source for the data, only a single "source" is permitted
artifactPaths: # A predefined source: Generate a list of all artifact paths in a given repository
s3: # Source from an S3 bucket
bucket: test
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
transformation: # The source is then passed to be transformed by transformations defined here
- expression: "filter(data, {# endsWith \".pdf\"})"
- expression: "map(data, {# + \".ready\"})"
data 模板必须包含一个 source 字段。当前可用的数据源有S3、Git、HTTP、HDFS、OSS、GCS:
-
artifactPaths
:从指定的制品库生成制品路径列表
data 模板可能包含多个 transformations(也可能是0个)。转换将按顺序连续应用。当前可用的转换:
我们知道 expression 转换是有限的。我们打算根据社区的反馈大大扩展此模板的功能。请参阅本文档顶部的链接以提交有关此功能的想法或用例。
调用其他模板提供并行控制
调用其他模板也有两种类别:
-
Steps
-
Dag
Steps
Steps主要是通过定义一系列步骤来定义任务,其结构是"list of lists",外部列表将顺序执行,内部列表将并行执行
。如下:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello-hello-hello
# This spec contains two templates: hello-hello-hello and whalesay
templates:
- name: hello-hello-hello
# Instead of just running a container
# This template has a sequence of steps
steps:
- - name: hello1 # hello1 is run before the following steps
template: whalesay
arguments:
parameters:
- name: message
value: "hello1"
- - name: hello2a # double dash => run after previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2a"
- name: hello2b # single dash => run in parallel with previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2b"
# This is the same template as from the previous example
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
其中step1和step2a是顺序执行,而step2a和step2b是并行执行。
STEP TEMPLATE PODNAME DURATION MESSAGE
✔ steps-z2zdn hello-hello-hello
├───✔ hello1 whalesay steps-z2zdn-27420706 2s
└─┬─✔ hello2a whalesay steps-z2zdn-2006760091 3s
└─✔ hello2b whalesay steps-z2zdn-2023537710 3s
还可以通过When来进行条件判断。如下:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
- - name: flip-coin
template: flip-coin
- - name: heads
template: heads
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: tails
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was tails\""]
除了使用When进行条件判断,还可以进行循环操作,示例代码如下:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
entrypoint: loop-example
templates:
- name: loop-example
steps:
- - name: print-message
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withItems:
- hello world
- goodbye world
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
DAG
DAG 模板允许您将任务定义为依赖关系图。在 DAG 中,您列出所有任务并设置在特定任务开始之前必须完成哪些其他任务。没有任何依赖关系的任务将立即运行。
在这个示例中,A 首先执行,待其执行完毕后,B和C并行执行,待B和C都执行完毕后,执行D:
- name: diamond
dag:
tasks:
- name: A
template: echo
- name: B
dependencies: [A]
template: echo
- name: C
dependencies: [A]
template: echo
- name: D
dependencies: [B, C]
template: echo
环境部署
Controller and Server
安装结果:
NAME READY STATUS RESTARTS AGE
argo-server-746dc95c84-6pwj2 1/1 Running 0 5d4h
workflow-controller-777b7f45d8-whkdk 1/1 Running 0 5d4h
Argo CLI
Mac
# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-darwin-amd64.gz
# Unzip
gunzip argo-darwin-amd64.gz
# Make binary executable
chmod +x argo-darwin-amd64
# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo
# Test installation
argo version
Linux
# Download the binary
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.3.9/argo-linux-amd64.gz
# Unzip
gunzip argo-linux-amd64.gz
# Make binary executable
chmod +x argo-linux-amd64
# Move binary to path
mv ./argo-linux-amd64 /usr/local/bin/argo
# Test installation
argo version
常用命令:
argo submit hello-world.yaml # 提交工作流到 Kubernetes
argo list # 列出当前所有工作流
argo get hello-world-xxx # 获取工作流配置信息
argo logs hello-world-xxx # 打印工作流日志
argo delete hello-world-xxx # 删除工作流
您也可以使用 kubectl 直接操作工作流,但 Argo CLI 提供语法检查、更好的输出并且需要更少的输入。
kubectl create -f hello-world.yaml
kubectl get wf
kubectl get wf hello-world-xxx
kubectl get po --selector=workflows.argoproj.io/workflow=hello-world-xxx --show-all # similar to argo
kubectl logs hello-world-xxx-yyy -c main
kubectl delete wf hello-world-xxx
Hello World
让我们从创建一个非常简单的工作流模板开始,使用来自 Docker Hub 的 docker/whalesay 容器镜像来回显“hello world”。
您可以使用简单的 docker 命令直接从 shell 运行它:
$ docker run docker/whalesay cowsay "hello world"
_____________
< hello world >
-------------
\
\
\
## .
## ## ## ==
## ## ## ## ===
/""""""""""""""""___/ ===
~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~
\______ o __/
\ \ __/
\____\______/
Hello from Docker!
This message shows that your installation appears to be working correctly.
一个简单的 Workflow 示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
labels:
workflows.argoproj.io/archive-strategy: "false"
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: whalesay:latest
command: [cowsay]
args: ["hello world"]
Workflow 配置
参数
让我们看一个稍微复杂的带有参数的工作流规范。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-parameters-
spec:
# invoke the whalesay template with
# "hello world" as the argument
# to the message parameter
entrypoint: whalesay
arguments:
parameters:
- name: message
value: hello world
templates:
- name: whalesay
inputs:
parameters:
- name: message # parameter declaration
container:
# run cowsay with that message input parameter as args
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
whalesay 模板采用一个名为 message 的输入参数,该参数作为 args 传递给 cowsay 命令。为了引用参数(例如,“{{inputs.parameters.message}}”),参数必须用双引号括起来以转义 YAML 中的花括号。
argo CLI 提供了一种方便的方法来覆盖用于调用 entrypoint 的参数。例如,以下命令会将消息参数绑定到“goodbye world”,而不是默认的“hello world”。
argo submit arguments-parameters.yaml -p message="goodbye world"
在可以覆盖多个参数的情况下,argo CLI 提供了一个命令来加载 YAML 或 JSON 格式的参数文件。以下是此类参数文件的示例:
message: goodbye world
要运行使用以下命令:
argo submit arguments-parameters.yaml --parameter-file params.yaml
命令行参数也可用于覆盖默认 entrypoint 并调用工作流 spec 中的任何模板。例如,如果您添加了一个名为whalsay-caps 的新版本的whalsay 模板,但您不想更改默认入口点,则可以从命令行调用它,如下所示:
argo submit arguments-parameters.yaml --entrypoint whalesay-caps
通过使用 --entrypoint 和 -p 参数的组合,您可以使用您喜欢的任何参数调用工作流 spec 中的任何模板。
spec.arguments.parameters 中设置的值全局可用,可以通过 {{workflow.parameters.parameter_name}} 访问。这对于将信息传递给工作流中的多个步骤很有用。例如,如果您想使用在每个容器的环境中设置的不同日志记录级别来运行工作流,您可以使用一个类似于此的 YAML 文件:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: global-parameters-
spec:
entrypoint: A
arguments:
parameters:
- name: log-level
value: INFO
templates:
- name: A
container:
image: containerA
env:
- name: LOG_LEVEL
value: "{{workflow.parameters.log-level}}"
command: [runA]
- name: B
container:
image: containerB
env:
- name: LOG_LEVEL
value: "{{workflow.parameters.log-level}}"
command: [runB]
在此工作流中,步骤 A 和 B 都将相同的日志级别设置为 INFO,并且可以使用 -p 参数在提交工作流时轻松更改。
变量
Argo Workflows用户手册——Workflow变量
访问控制
Secrets
Argo 支持与 Kubernetes Pod specs 相同的 secrets 语法和机制,允许以环境变量或卷挂载的形式访问 secrets。
# To run this example, first create the secret by running:
# kubectl create secret generic my-secret --from-literal=mypassword=S00perS3cretPa55word
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: secret-example-
spec:
entrypoint: whalesay
# To access secrets as files, add a volume entry in spec.volumes[] and
# then in the container template spec, add a mount using volumeMounts.
volumes:
- name: my-secret-vol
secret:
secretName: my-secret # name of an existing k8s secret
templates:
- name: whalesay
container:
image: alpine:3.7
command: [sh, -c]
args: ['
echo "secret from env: $MYSECRETPASSWORD";
echo "secret from file: `cat /secret/mountpath/mypassword`"
']
# To access secrets as environment variables, use the k8s valueFrom and
# secretKeyRef constructs.
env:
- name: MYSECRETPASSWORD # name of env var
valueFrom:
secretKeyRef:
name: my-secret # name of an existing k8s secret
key: mypassword # 'key' subcomponent of the secret
volumeMounts:
- name: my-secret-vol # mount file containing secret at /secret/mountpath
mountPath: "/secret/mountpath"
输出参数
输出参数提供了一种通用机制,可将步骤的结果用作参数(而不仅仅是工件)。这使您可以将任何类型的 step(不仅仅是 script)的结果用于条件测试、循环和参数。输出参数的工作方式与 script 结果类似,只是输出参数的值设置为生成文件的内容,而不是 stdout 的内容。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: output-parameter-
spec:
entrypoint: output-parameter
templates:
- name: output-parameter
steps:
- - name: generate-parameter
template: whalesay
- - name: consume-parameter
template: print-message
arguments:
parameters:
# Pass the hello-param output from the generate-parameter step as the message input to print-message
- name: message
value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo -n hello world > /tmp/hello_world.txt"] # 生成 hello_world.txt 文件内容
outputs:
parameters:
- name: hello-param # 输出参数名称
valueFrom:
path: /tmp/hello_world.txt # set the value of hello-param to the contents of this hello-world.txt
- name: print-message
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
DAG 模板使用 tasks 前缀来引用另一个任务,例如 {{tasks.generate-parameter.outputs.parameters.hello-param}}。
输出参数 result
输出参数 result 会捕获标准输出,并通过 outputs.result 访问。只会捕获 256 kb 的标准输出流。
Scripts
script 模板的输出会被分配给标准输出,并通过 result 参数捕获。
Containers
container 模板的标准输出也会被 result 参数捕获。如果使用 DAG,假设有一个名为 log-int 的 task,可以通过 {{ tasks.log-int.outputs.result }} 访问其结果。如果使用 steps,访问方式为 {{ steps.log-int.outputs.result }}。
循环
在编写工作流时,能够迭代一组输入通常非常有用,如下例所示:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
entrypoint: loop-example
templates:
- name: loop-example
steps:
- - name: print-message
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: # invoke whalesay once for each item in parallel
- hello world # item 1
- goodbye world # item 2
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
我们还可以迭代对象集合:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-maps-
spec:
entrypoint: loop-map-example
templates:
- name: loop-map-example
steps:
- - name: test-linux
template: cat-os-release
arguments:
parameters:
- name: image
value: "{{item.image}}"
- name: tag
value: "{{item.tag}}"
withItems:
- { image: 'debian', tag: '9.1' } #item set 1
- { image: 'debian', tag: '8.9' } #item set 2
- { image: 'alpine', tag: '3.6' } #item set 3
- { image: 'ubuntu', tag: '17.10' } #item set 4
- name: cat-os-release
inputs:
parameters:
- name: image
- name: tag
container:
image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
command: [cat]
args: [/etc/os-release]
我们可以将项目列表作为参数传递:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-param-arg-
spec:
entrypoint: loop-param-arg-example
arguments:
parameters:
- name: os-list # a list of items
value: |
[
{ "image": "debian", "tag": "9.1" },
{ "image": "debian", "tag": "8.9" },
{ "image": "alpine", "tag": "3.6" },
{ "image": "ubuntu", "tag": "17.10" }
]
templates:
- name: loop-param-arg-example
inputs:
parameters:
- name: os-list
steps:
- - name: test-linux
template: cat-os-release
arguments:
parameters:
- name: image
value: "{{item.image}}"
- name: tag
value: "{{item.tag}}"
withParam: "{{inputs.parameters.os-list}}" # parameter specifies the list to iterate over
# This template is the same as in the previous example
- name: cat-os-release
inputs:
parameters:
- name: image
- name: tag
container:
image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
command: [cat]
args: [/etc/os-release]
我们甚至可以动态生成要迭代的项目列表!
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-param-result-
spec:
entrypoint: loop-param-result-example
templates:
- name: loop-param-result-example
steps:
- - name: generate
template: gen-number-list
# Iterate over the list of numbers generated by the generate step above
- - name: sleep
template: sleep-n-sec
arguments:
parameters:
- name: seconds
value: "{{item}}"
withParam: "{{steps.generate.outputs.result}}"
# Generate a list of numbers in JSON format
- name: gen-number-list
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(20, 31)], sys.stdout)
- name: sleep-n-sec
inputs:
parameters:
- name: seconds
container:
image: alpine:latest
command: [sh, -c]
args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]
条件语句
我们还支持条件执行。语法由 govaluate 实现,它提供对复杂语法的支持。见示例:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: tails # call tails template if "tails"
when: "{{steps.flip-coin.outputs.result}} == tails"
- - name: flip-again
template: flip-coin
- - name: complex-condition
template: heads-tails-or-twice-tails
# call heads template if first flip was "heads" and second was "tails" OR both were "tails"
when: >-
( {{steps.flip-coin.outputs.result}} == heads &&
{{steps.flip-again.outputs.result}} == tails
) ||
( {{steps.flip-coin.outputs.result}} == tails &&
{{steps.flip-again.outputs.result}} == tails )
- name: heads-regex
template: heads # call heads template if ~ "hea"
when: "{{steps.flip-again.outputs.result}} =~ hea"
- name: tails-regex
template: tails # call heads template if ~ "tai"
when: "{{steps.flip-again.outputs.result}} =~ tai"
# Return heads or tails based on a random number
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was tails\""]
- name: heads-tails-or-twice-tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
注意
如果参数值包含引号,则可能会使 govaluate 表达式无效。要处理带引号的参数,请在条件中嵌入 expr 表达式。例如:
when: "{{=inputs.parameters['may-contain-quotes'] == 'example'}}"
重试失败或错误的步骤
您可以在 Workflow.spec 或 templates 中指定一个 retryStrategy 来指示如何重试失败或错误的步骤:
# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-backoff-
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: 10
retryPolicy: "Always"
backoff:
duration: "1" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
factor: 2
maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
affinity:
nodeAntiAffinity: {}
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
-
limit:容器将被重试的最大次数。
-
retryPolicy:指定容器是否将在失败、错误、两者或仅暂时错误(例如 i/o 或 TLS 握手超时)时重试。
-
Always:重试所有失败的步骤。
-
OnFailure: 重试其main容器在Kubernetes中标记为失败的步骤(这是默认值)。
-
OnError: 重试遇到Argo控制器错误或其init或wait容器失败的步骤。
-
-
backoff:重试之间的延迟。
-
nodeAntiAffinity:防止在同一主机上运行步骤, 当前的实现只允许空的 nodeAntiAffinity(即 nodeAntiAffinity: {}),默认情况下它使用标签 kubernetes.io/hostname 作为选择器。
提供一个空的 retryStrategy(即 retryStrategy: {})将导致容器重试直到完成。
递归
模板可以递归地相互调用!在上述抛硬币模板的变体中,我们继续抛硬币,直到出现正面。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-recursive-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails # keep flipping coins if "tails"
template: coinflip
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
这是几次抛硬币的结果以进行比较。
argo get coinflip-recursive-tzcb5
STEP PODNAME MESSAGE
✔ coinflip-recursive-vhph5
├───✔ flip-coin coinflip-recursive-vhph5-2123890397
└─┬─✔ heads coinflip-recursive-vhph5-128690560
└─○ tails
STEP PODNAME MESSAGE
✔ coinflip-recursive-tzcb5
├───✔ flip-coin coinflip-recursive-tzcb5-322836820
└─┬─○ heads
└─✔ tails
├───✔ flip-coin coinflip-recursive-tzcb5-1863890320
└─┬─○ heads
└─✔ tails
├───✔ flip-coin coinflip-recursive-tzcb5-1768147140
└─┬─○ heads
└─✔ tails
├───✔ flip-coin coinflip-recursive-tzcb5-4080411136
└─┬─✔ heads coinflip-recursive-tzcb5-4080323273
└─○ tails
在第一次运行中,硬币立即出现正面,我们停止。在第二轮中,硬币出现了三次反面,最后出现正面,我们停下来。
退出处理程序
退出处理程序是在工作流结束时始终执行的模板,无论成功或失败。
退出处理程序的一些常见用例是:
-
工作流程运行后清理
-
发送工作流状态通知(例如,电子邮件/Slack)
-
将通过/失败状态发布到 web-hook 结果(例如 GitHub 构建结果)
-
重新提交或提交另一个工作流程
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: exit-handlers-
spec:
entrypoint: intentional-fail
onExit: exit-handler # invoke exit-handler template at end of the workflow
templates:
# primary workflow template
- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
# Exit handler templates
# After the completion of the entrypoint template, the status of the
# workflow is made available in the global variable {{workflow.status}}.
# {{workflow.status}} will be one of: Succeeded, Failed, Error
- name: exit-handler
steps:
- - name: notify
template: send-email
- name: celebrate
template: celebrate
when: "{{workflow.status}} == Succeeded"
- name: cry
template: cry
when: "{{workflow.status}} != Succeeded"
- name: send-email
container:
image: alpine:latest
command: [sh, -c]
args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
- name: celebrate
container:
image: alpine:latest
command: [sh, -c]
args: ["echo hooray!"]
- name: cry
container:
image: alpine:latest
command: [sh, -c]
args: ["echo boohoo!"]
超时
要限制工作流的经过时间,您可以设置变量 activeDeadlineSeconds。
# To enforce a timeout for a container template, specify a value for activeDeadlineSeconds.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: timeouts-
spec:
entrypoint: sleep
templates:
- name: sleep
container:
image: alpine:latest
command: [sh, -c]
args: ["echo sleeping for 1m; sleep 60; echo done"]
activeDeadlineSeconds: 10 # terminate container template after 10 seconds
Volumes
这并不是 Argo 处理产物传递的一种标准方式,但是通过共享存储,我们显然也能达到共通产物的结果。当然,如果使用 Volume,我们则无需借助 Inputs 和 Outputs。在 Workflow 的 Spec 中,我们定义一个 Volume 模板。
以下示例动态创建一个卷,然后在两步工作流中使用该卷。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: volumes-pvc-
spec:
entrypoint: volumes-pvc-example
volumeClaimTemplates: # define volume, same syntax as k8s Pod spec
- metadata:
name: workdir # name of volume claim
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi # Gi => 1024 * 1024 * 1024
templates:
- name: volumes-pvc-example
steps:
- - name: generate
template: whalesay
- - name: print
template: print-message
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
# Mount workdir volume at /mnt/vol before invoking docker/whalesay
volumeMounts: # same syntax as k8s Pod spec
- name: workdir
mountPath: /mnt/vol
- name: print-message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
# Mount workdir volume at /mnt/vol before invoking docker/whalesay
volumeMounts: # same syntax as k8s Pod spec
- name: workdir
mountPath: /mnt/vol
卷是将大量数据从工作流中的一个步骤移动到另一个步骤的非常有用的方法。根据系统的不同,某些卷可能可以在多个步骤同时访问。
在某些情况下,您希望访问一个已经存在的卷,而不是动态地创建/销毁一个。
# Define Kubernetes PVC
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: my-existing-volume
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: volumes-existing-
spec:
entrypoint: volumes-existing-example
volumes:
# Pass my-existing-volume as an argument to the volumes-existing-example template
# Same syntax as k8s Pod spec
- name: workdir
persistentVolumeClaim:
claimName: my-existing-volume
templates:
- name: volumes-existing-example
steps:
- - name: generate
template: whalesay
- - name: print
template: print-message
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
- name: print-message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
也可以在 template 级别而不是 workflow 级别声明现有卷。工作流可以使用 resource 步骤生成卷。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: template-level-volume-
spec:
entrypoint: generate-and-use-volume
templates:
- name: generate-and-use-volume
steps:
- - name: generate-volume
template: generate-volume
arguments:
parameters:
- name: pvc-size
# In a real-world example, this could be generated by a previous workflow step.
value: '1Gi'
- - name: generate
template: whalesay
arguments:
parameters:
- name: pvc-name
value: '{{steps.generate-volume.outputs.parameters.pvc-name}}'
- - name: print
template: print-message
arguments:
parameters:
- name: pvc-name
value: '{{steps.generate-volume.outputs.parameters.pvc-name}}'
- name: generate-volume
inputs:
parameters:
- name: pvc-size
resource:
action: create
setOwnerReference: true
manifest: |
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
generateName: pvc-example-
spec:
accessModes: ['ReadWriteOnce', 'ReadOnlyMany']
resources:
requests:
storage: '{{inputs.parameters.pvc-size}}'
outputs:
parameters:
- name: pvc-name
valueFrom:
jsonPath: '{.metadata.name}'
- name: whalesay
inputs:
parameters:
- name: pvc-name
volumes:
- name: workdir
persistentVolumeClaim:
claimName: '{{inputs.parameters.pvc-name}}'
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
- name: print-message
inputs:
parameters:
- name: pvc-name
volumes:
- name: workdir
persistentVolumeClaim:
claimName: '{{inputs.parameters.pvc-name}}'
container:
image: alpine:latest
command: [sh, -c]
args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
暂停
可以通过以下方式暂停工作流
argo suspend WORKFLOW
或者通过在工作流上指定 suspend 步骤:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: suspend-template-
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: build
template: whalesay
- - name: approve
template: approve
- - name: delay
template: delay
- - name: release
template: whalesay
- name: approve
suspend: {}
- name: delay
suspend:
duration: "20" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["hello world"]
后台容器
Argo 工作流可以启动在后台运行的容器(也称为 daemon containers),而工作流本身会继续执行。请注意,当工作流退出调用守护进程的模板范围时,daemon 将自动销毁。守护程序容器对于启动要测试或用于测试的服务(例如,fixtures)很有用。我们还发现它在运行大型模拟以启动数据库作为收集和组织结果的守护进程时非常有用。与 sidecar 相比,守护进程的最大优势在于它们的存在可以跨多个步骤甚至整个工作流程持续存在。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: daemon-step-
spec:
entrypoint: daemon-example
templates:
- name: daemon-example
steps:
- - name: influx
template: influxdb # start an influxdb as a daemon (see the influxdb template spec below)
- - name: init-database # initialize influxdb
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http://{{steps.influx.ip}}:8086/query' --data-urlencode "q=CREATE DATABASE mydb"
- - name: producer-1 # add entries to influxdb
template: influxdb-client
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server01,region=uswest load=$i" ; sleep .5 ; done
- name: producer-2 # add entries to influxdb
template: influxdb-client
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server02,region=uswest load=$((RANDOM % 100))" ; sleep .5 ; done
- name: producer-3 # add entries to influxdb
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d 'cpu,host=server03,region=useast load=15.4'
- - name: consumer # consume intries from influxdb
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl --silent -G http://{{steps.influx.ip}}:8086/query?pretty=true --data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"
- name: influxdb
daemon: true # start influxdb as a daemon
retryStrategy:
limit: 10 # retry container if it fails
container:
image: influxdb:1.2
command:
- influxd
readinessProbe: # wait for readinessProbe to succeed
httpGet:
path: /ping
port: 8086
- name: influxdb-client
inputs:
parameters:
- name: cmd
container:
image: appropriate/curl:latest
command: ["/bin/sh", "-c"]
args: ["{{inputs.parameters.cmd}}"]
resources:
requests:
memory: 32Mi
cpu: 100m
Step 模板使用 steps 前缀来引用另一个 step:例如 {{steps.influx.ip}}。DAG 模板中使用 tasks 前缀:例如 {{tasks.influx.ip}}。
Sidecars
Sidecar 是另一个容器,它与主容器在同一个 pod 中同时执行,在创建多容器 pod 时很有用。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sidecar-nginx-
spec:
entrypoint: sidecar-nginx-example
templates:
- name: sidecar-nginx-example
container:
image: appropriate/curl
command: [sh, -c]
# Try to read from nginx web server until it comes up
args: ["until `curl -G 'http://127.0.0.1/' >& /tmp/out`; do echo sleep && sleep 1; done && cat /tmp/out"]
# Create a simple nginx web server
sidecars:
- name: nginx
image: nginx:1.13
command: [nginx, -g, daemon off;]
在上面的例子中,我们创建了一个 sidecar 容器,将 Nginx 作为一个简单的 Web 服务器运行。容器出现的顺序是随机的,所以在这个例子中,主容器轮询 Nginx 容器,直到它准备好为请求提供服务。在设计多容器系统时,这是一个很好的设计模式:在运行主代码之前始终等待您需要的任何服务。
使用 Sidecar 的 Docker-in-Docker
Sidecar 的一个应用是实现 Docker-in-Docker (DIND)。当您想从容器内运行 Docker 命令时,DIND 很有用。例如,您可能希望从构建容器内部构建和推送容器映像。在以下示例中,我们使用 docker:dind 映像在 sidecar 中运行 Docker 守护程序,并授予主容器访问该守护程序的权限。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sidecar-dind-
spec:
entrypoint: dind-sidecar-example
templates:
- name: dind-sidecar-example
container:
image: docker:19.03.13
command: [sh, -c]
args: ["until docker ps; do sleep 3; done; docker run --rm debian:latest cat /etc/os-release"]
env:
- name: DOCKER_HOST # the docker daemon can be access on the standard port on localhost
value: 127.0.0.1
sidecars:
- name: dind
image: docker:19.03.13-dind # Docker already provides an image for running a Docker daemon
command: [dockerd-entrypoint.sh]
env:
- name: DOCKER_TLS_CERTDIR # Docker TLS env config
value: ""
securityContext:
privileged: true # the Docker daemon can only run in a privileged container
# mirrorVolumeMounts will mount the same volumes specified in the main container
# to the sidecar (including artifacts), at the same mountPaths. This enables
# dind daemon to (partially) see the same filesystem as the main container in
# order to use features such as docker volume binding.
mirrorVolumeMounts: true
Hardwired Artifacts
使用 Argo,您可以使用任何您喜欢的容器镜像来生成任何类型的工件。然而,在实践中,我们发现某些类型的工件非常常见,因此内置了对 git、HTTP、GCS 和 S3 工件的支持。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hardwired-artifact-
spec:
entrypoint: hardwired-artifact
templates:
- name: hardwired-artifact
inputs:
artifacts:
# Check out the master branch of the argo repo and place it at /src
# revision can be anything that git checkout accepts: branch, commit, tag, etc.
- name: argo-source
path: /src
git:
repo: https://github.com/argoproj/argo-workflows.git
revision: "master"
# Download kubectl 1.8.0 and place it at /bin/kubectl
- name: kubectl
path: /bin/kubectl
mode: 0755
http:
url: https://storage.googleapis.com/kubernetes-release/release/v1.8.0/bin/linux/amd64/kubectl
# Copy an s3 compatible artifact repository bucket (such as AWS, GCS and MinIO) and place it at /s3
- name: objects
path: /s3
s3:
endpoint: storage.googleapis.com
bucket: my-bucket-name
key: path/in/bucket
accessKeySecret:
name: my-s3-credentials
key: accessKey
secretKeySecret:
name: my-s3-credentials
key: secretKey
container:
image: debian
command: [sh, -c]
args: ["ls -l /src /bin/kubectl /s3"]
网友评论