二十一 嵌套工作流
问题:如何将多个工作流连接在一起?
目标:了解如何从多个CWL工作流构造嵌套工作流。
工作流是组合多个工具以执行更大操作的方法。我们还可以将工作流视为工具本身;如果工作流引擎支持SubworkflowFeatureRequirement
,则CWL工作流可以用作另一个CWL工作流中的一个步骤:
requirements:
SubworkflowFeatureRequirement: {}
下面是一个使用我们的1st-workflow.cwl
作为嵌套工作流:
nestedworkflows.cwl
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: Workflow
inputs: []
outputs:
classout:
type: File
outputSource: compile/compiled_class
requirements:
SubworkflowFeatureRequirement: {}
steps:
compile:
run: 1st-workflow.cwl
in:
tarball: create-tar/tar_compressed_java_file
name_of_file_to_extract:
default: "Hello.java"
out: [compiled_class]
create-tar:
in: []
out: [tar_compressed_java_file]
run:
class: CommandLineTool
requirements:
InitialWorkDirRequirement:
listing:
- entryname: Hello.java
entry: |
public class Hello {
public static void main(String[] argv) {
System.out.println("Hello from Java");
}
}
inputs: []
baseCommand: [tar, --create, --file=hello.tar, Hello.java]
outputs:
tar_compressed_java_file:
type: File
streamable: true
outputBinding:
glob: "hello.tar"
从
compile
步骤看工作流和内部工作流的可视化这两个步骤的工作流从
图片.pngcreate-tar
步骤开始,该步骤连接到橙色的compile
步骤;compile
是另一个工作流。我们看到紫色的固定的字符串"Hello.java"
被提供为name_of_file_to_extract
。
图片.png
CWLWorkflow
可以作为一个step
使用,就像CommandLineTool
一样,它的CWL文件包含在run
中。然后,可以将工作流输入(inp
和 ex
)和输出(classout
)映射为步骤的输入/输出。
compile:
run: 1st-workflow.cwl
in:
inp:
source: create-tar/tar
ex:
default: "Hello.java"
out: [classout]
我们的1st-workflow.cwl
是用工作流输入参数化的,所以在运行它时,我们必须提供一个作业文件来表示tar文件和*.java
文件名。这通常是最佳实践,因为这意味着它可以在多个父工作流中重用,甚至可以在同一工作流中的多个步骤中重用。
这里我们使用default:
'来硬编码"Hello.java"
作为ex
输入,但是我们的工作流还需要在inp
处有一个tar文件,我们将在create-tar
步骤中准备该文件。此时,重构1st-workflow.cwl
以拥有更具体的输入/输出名称可能是一个好主意,因为这些名称也出现在它作为工具的使用中。
也可以使用不太通用的方法,避免作业文件中的外部依赖关系。因此,在这个工作流中,我们可以在将其添加到tar文件之前,使用前面提到的InitialWorkDirRequirement
生成一个硬编码Hello.java
文件。
create-tar:
requirements:
InitialWorkDirRequirement:
listing:
- entryname: Hello.java
entry: |
public class Hello {
public static void main(String[] argv) {
System.out.println("Hello from Java");
}
}
在这种情况下,我们可以假设Hello.java
而不是参数化,因此只要CWL工作流引擎支持ShellCommandRequirement
,我们就可以使用更简单的arguments
形式:
run:
class: CommandLineTool
requirements:
ShellCommandRequirement: {}
arguments:
- shellQuote: false
valueFrom: >
tar cf hello.tar Hello.java
注意:这里使用了shellQuote: false
,否则shell将尝试执行带引号的二进制文件"tar cf hello.tar Hello.java"
这里的>
块意味着新行被剥离,因此可以在多行上编写单个命令。类似地,我们上面使用的|
将保留换行符,结合ShellCommandRequirement
,这将允许嵌入一个shell脚本。但是,在CWL中应该谨慎地使用Shell命令,因为这意味着您“跳出”工作流,不再获得可重用的组件、出处或可伸缩性。为了重现性和可移植性,建议只将shell命令与DockerRequirement
的提示(hint)一起使用,以便命令在可预测的shell环境中执行。
您是否注意到我们没有将tar cf
工具拆分到单独的文件中,而是将其嵌入到CWL工作流文件中?这通常不是最佳实践,因为该工具无法重用。在这种情况下这样做的原因是命令行是硬编码的,文件名只在这个工作流中有意义。
在这个例子中,我们必须在外部准备一个tar文件,但这仅仅是因为我们的内部工作流被设计成将其作为输入。内部工作流的一个更好的重构方法是获取一个Java文件列表进行编译,这将简化它在其他工作流中作为工具步骤的使用。
嵌套工作流在生成高级功能和可重用工作流单元方面是非常强大的,但就像创建CWL工具描述一样,必须注意提高其在多个工作流中的可用性。
总结
- 如果工作流引擎支持
SubworkflowFeatureRequirement
,则可以将一个工作流用作另一个工作流中的一个步骤。 - 工作流在
steps
下指定,工作流程的描述文件作为run
字段value的值提供。 - 使用
default
指定字段的默认值,该值可以被输入对象中的值覆盖。 - 使用
>
可忽略拆分为多行的长命令中的换行符。
二十二 Scattering Workflows
问题:如何并行运行工具或工作流?
既然我们知道了如何编写工作流,就可以开始使用ScatterFeatureRequirement
。此功能告诉运行器您希望在输入列表上多次运行工具或工作流。然后,工作流将输入作为一个数组,并将对数组的每个元素运行指定的步骤,就像它是单个输入一样。这允许对多个输入上运行相同的工作流,而不必生成许多不同的命令或yaml输入文件。
requirements:
ScatterFeatureRequirement: {}
新用户可能希望使用分裂(scatter)的最常见原因是对不同的样本执行相同的分析。让我们从一个简单的工作流开始,它调用我们的第一个示例,并将字符串数组作为工作流的输入:
scatter-workflow.cwl
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: Workflow
requirements:
ScatterFeatureRequirement: {}
inputs:
message_array: string[]
steps:
echo:
run: 1st-tool.cwl
scatter: message
in:
message: message_array
out: []
outputs: []
除了requirements
部分,包括ScatterFeatureRequirement
,这里发生了什么?
inputs:
message_array: string[]
首先,请注意,这里的主工作流级别的输入需要一个字符串数组。
steps:
echo:
run: 1st-tool.cwl
scatter: message
in:
message: message_array
out: []
这里我们在步骤echo
中添加了一个名为scatter
的新字段。这个字段告诉运行程序,在这个特定步骤中,我们希望分裂输入。请注意,分裂之后列出的输入名称是步骤的输入之一,而不是工作流级别的输入。
对于我们的第一次分裂,就这么简单!由于我们的工具不收集任何输出,所以我们在工作流中仍然使用outputs: []
,但是如果希望工作流最终有多个输出要收集,请务必将其更新为数组类型!
Using the following input file:
scatter-job.yml
message_array:
- Hello world!
- Hola mundo!
- Bonjour le monde!
- Hallo welt!
As a reminder, 1st-tool.cwl
simply calls the command echo
on a message. If we invoke cwl-runner scatter-workflow.cwl scatter-job.yml
on the command line:
作为提醒,1st-tool.cwl
只需对消息调用echo
命令。如果我们在命令行上调用cwl-runner scatter-workflow.cwl scatter-job.yml
:
$ cwl-runner scatter-workflow.cwl scatter-job.yml
[workflow scatter-workflow.cwl] start
[step echo] start
[job echo] /tmp/tmp0hqmg400$ echo \
'Hello world!'
Hello world!
[job echo] completed success
[step echo] start
[job echo_2] /tmp/tmpu65_m1zw$ echo \
'Hola mundo!'
Hola mundo!
[job echo_2] completed success
[step echo] start
[job echo_3] /tmp/tmp5cs7a2wh$ echo \
'Bonjour le monde!'
Bonjour le monde!
[job echo_3] completed success
[step echo] start
[job echo_4] /tmp/tmp301wo7p8$ echo \
'Hallo welt!'
Hallo welt!
[job echo_4] completed success
[step echo] completed success
[workflow scatter-workflow.cwl] completed success
{}
Final process status is success
您可以看到,工作流对message_array
的每个元素调用多次echo。那么,如果我们想分裂工作流中的两个步骤呢?
让我们像上面一样执行一个简单的echo,通过添加下面行而不是outputs: []
来捕获stdout
1st-tool-mod.cwl
outputs:
echo_out:
type: stdout
在第二步中使用wc
来对文件中的字符计数。请参阅以下工具:
wc-tool.cwl
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: CommandLineTool
baseCommand: wc
arguments: ["-c"]
inputs:
input_file:
type: File
inputBinding:
position: 1
outputs: []
现在,我们如何合并分裂?记住每一步的分裂字段:
scatter-two-steps.cwl
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: Workflow
requirements:
ScatterFeatureRequirement: {}
inputs:
message_array: string[]
steps:
echo:
run: 1st-tool-mod.cwl
scatter: message
in:
message: message_array
out: [echo_out]
wc:
run: wc-tool.cwl
scatter: input_file
in:
input_file: echo/echo_out
out: []
outputs: []
在这里,我们在每个步骤下设置了分裂字段。对于这个示例来说,这是很好的,因为它运行得很快,但是如果您正在为更复杂的工作流运行许多样本,那么考虑另一种方法。在这里,我们在每个步骤上独立运行分裂,但是由于第二步并不依赖于完成所有语言的第一步,所以我们没有有效地使用scatter功能。第二步需要一个来自第一步的数组作为输入,所以它将等到第一步中的所有内容都完成之后再做任何事情。假设echo Hello World!
需要1分钟来执行,wc -c
的输出需要3分钟,echo Hallo welt!
执行需要5分钟,wc
在该输出上需要3分钟。即使echo Hello World!
本可以在4分钟内完成,但因为第一步必须等待 echo Hallo welt!
,实际上8分钟内完成。
好的,那么我们如何分散在可以独立于其他样本的步骤上呢?记住第21章,我们可以使整个工作流成为另一个工作流中的一个步骤!将我们的两步工作流转换为单步子工作流:
Ok, so how do we scatter on steps that can proceed independent of other samples? Remember from chapter 21, that we can make an entire workflow a single step in another workflow! Convert our two step workflow to a single step subworkflow:
scatter-nested-workflow.cwl
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: Workflow
requirements:
ScatterFeatureRequirement: {}
SubworkflowFeatureRequirement: {}
inputs:
message_array: string[]
steps:
subworkflow:
run:
class: Workflow
inputs:
message: string
outputs: []
steps:
echo:
run: 1st-tool-mod.cwl
in:
message: message
out: [echo_out]
wc:
run: wc-tool.cwl
in:
input_file: echo/echo_out
out: []
scatter: message
in:
message: message_array
out: []
outputs: []
Now the scatter acts on a single step, but that step consists of two steps so each step is performed in parallel.
Key Points
- A workflow can scatter over an input array in a step of a workflow, if the workflow engine supports the
ScatterFeatureRequirement
. - The
scatter
field is specified for each step you want to scatter - The
scatter
field references the step level inputs, not the workflow inputs - Scatter runs on each step specified independently
网友评论