7. 并行计算
7.1 Iterables
长期进行神经影像学数据处理的朋友应该都知道,对一批被试进行同样的数据处理是一种很常见的方式,例如要对所有的被试执行:
- Dicom2Nifti
- Remove first images
- Slice Timing
- Realign
- Normalize
- ...
对于这样的基本需求,Nipype当然也帮我们准备好了一些并行处理的手段,那就是Iterables。此外,如果你还想尝试一下不同的参数对于数据处理的结果有什么影响,那么使用Gretna这样的软件通常会需要执行几遍(麻烦的是不是运行,而是需要重新设定参数,如果没有处理好,运行结果还可能彼此覆盖),这时候如果你能想到用Nipype中的Iterables,那就太好了。废话少说,先看东西:
Example 1 试验不同参数
from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, IsotropicSmooth
# Initiate a skull stripping Node with BET
skullstrip = Node(
BET(mask=True, in_file='/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz'),
name="skullstrip")
isosmooth = Node(IsotropicSmooth(), name='iso_smooth')
isosmooth.iterables = ("fwhm", [4, 8, 16])
wf = Workflow(name="smoothflow")
wf.base_dir = "/output"
wf.connect(skullstrip, 'out_file', isosmooth, 'in_file')
# Run it in parallel (one core for each smoothing kernel)
wf.run('MultiProc', plugin_args={'n_procs': 3})
在这个例子中,整个Workflow由两个节点组成,第一个是剥头骨的节点,第二个是进行平滑化的节点,为了测试不同的平滑化参数,因此设置了一个iterables,指定了参数fwhm
的不同值,分别是[4, 8, 16]。Workflow在运行时指定使用了MultiProc
插件(官方文档中称之为plugin,不知道为什么要这么称呼),插件的参数为{'n_procs': 3}
,也就是最多可以运行三个进程(如果不指定的话好像是根据系统情况自行选择,但是没说自动选择是个啥概念)。
运行之后的结果可以从上图中看出来,对应于不同的参数,会生成三个不同的文件夹。
Example 2 用同一套计算流程计算所有被试
subject_list = ['01', '02', '03', '04', '05']
from nipype import IdentityInterface
infosource = Node(IdentityInterface(fields=['subject_id']),
name="infosource")
infosource.iterables = [('subject_id', subject_list)]
from os.path import join as opj
from nipype.interfaces.io import SelectFiles, DataSink
anat_file = opj('sub-{subject_id}', 'ses-test', 'anat', 'sub-{subject_id}_ses-test_T1w.nii.gz')
templates = {'anat': anat_file}
selectfiles = Node(SelectFiles(templates,
base_directory='/data/ds000114'),
name="selectfiles")
# Datasink - creates output folder for important outputs
datasink = Node(DataSink(base_directory="/output",
container="datasink"),
name="datasink")
wf_sub = Workflow(name="choosing_subjects")
wf_sub.connect(infosource, "subject_id", selectfiles, "subject_id")
wf_sub.connect(selectfiles, "anat", datasink, "anat_files")
wf_sub.run()
在这个例子中,是用的是IdentityInterface的iterables来处理不同的被试。这是最基本的套路,只要你想用Nipype来运行多个被试,就必然采用这种框架。
7.2 MapNode
在平时的计算中还有一种需求,就是当我们使用Iterable的时候,会生成多个输出,这个时候如果要将这些输出合并成一个列表并作为一个节点的输入(如图所示),那么该怎么办呢?这就需要我们的MapNode出场了。
还是通过一个简单的小例子来看一下:
from nipype import Function
def square_func(x):
return x ** 2
square = Function(["x"], ["f_x"], square_func)
square.run(x=2).outputs.f_x
以上是一个计算平方的函数,我们用Function将其封装起来,这样当x=2
时,square.run(x=2).outputs.f_x
的输出结果就是4了。此时,如果我们用MapNode来封装Function,就可以得到一个带有迭代功能的Node了,代码如下:
from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])
square_node.inputs.x = [0, 1, 2, 3]
res = square_node.run()
需要注意的是,我们在构造MapNode对象的时候,指定了一个iterfiled参数,该参数表明哪些参数是需要迭代的。例如,在该示例中,iterfield被指向为x,因此可以在设置input.x的时候将其设置为[0,1,2,3],这样一来,workflow就会在运行的时候依次将0,1,2,3代入其中并运行。
运行之后查看结果:
res.outputs.f_x
会看到结果为[0, 1, 4, 9]。如果你观察够仔细的话,会发现iterfield是一个列表,因此我们可以预见的是,其中可以迭代多个变量。
在下面这个例子中,不妨引入两个变量来看看结果:
def power_func(x, y):
return x ** y
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = [0, 1, 2, 3]
res = power_node.run()
print(res.outputs.f_xy)
在上面这个例子中,我们将iterfield
参数设置为["x", "y"]
,而x
和y
的变化都是从0到4,这样一来运行之后的结果是[1, 1, 4, 27]
目前,在我接触到的处理中还不涉及这一用途,所以用的还比较少。
7.3 JoinNode
就如官方文档中所说,JoinNode的作用和Iterable完全相反,Iterable会产生多个并行的分支,但是JoinNode则会将多个分支进行合并(如下图所示)
其中,D就是一个JoinNode,按照我目前的理解,
Iterables+JoinNode = MapNode
,当然,使用Iterables+JoinNode的方式要比MapNode的方式要灵活的多,因为MapNode刚刚分手就和好了嘛,而Iterables+JoinNode却可以像以色列一样,分手千年,仍能复国。
下面的代码是上面这张图的伪代码,在编写程序的时候可以参考一下:
from nipype import Node, JoinNode, Workflow
# Specify fake input node A
a = Node(interface=A(), name="a")
# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])
# Pass results on to fake node C
c = Node(interface=C(), name="c")
# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
joinsource="b",
joinfield="in_files",
name="d")
# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
(b, c, [('out_file', 'in_file')])
(c, d, [('out_file', 'in_files')])
])
7.3 并行中的一些参数设置
7.3.1 synchronize
synchronize的本意是同步,在Nipype中表示对于两个迭代变量,可以并行执行,这句话比较拗口,直接看图最合适不过了(都是官方的图和代码)。
b.iterables = [("m", [1, 2]), ("n", [3, 4])]
image.png
假设节点b
有两个迭代变量m
和n
,其迭代值分别是[1, 2]
和[3, 4]
,如果我们不设置synchronize
变量,那么正常情况下,会生成四条分支,也就是m和n值的组合:[1,3]
,[1,4]
,[2, 3]
,[2,4]
。
一旦我们设置了synchronize
变量:
b.iterables = [("m", [1, 2]), ("n", [3, 4])]
b.synchronize = True
就会生成如下的分支图:
image.png
如图所示,设置synchronize
后,就会生成m
和n
两两配对的分支。
7.3.2 itersource
itersource其实可以看成是synchronize
的高级版本,能够实现更加复杂的分支控制。关于其功能,官方文档只给出了一句话的描述,对于这句话该如何翻译,我还拿捏的不是很准确,暂且先放在这里"The itersource feature allows you to expand a downstream iterable based on a mapping of an upstream iterable.",编程嘛,还是通过实际例子来看看会比较好。
a = Node(interface=A(), name="a")
b = Node(interface=B(), name="b")
b.iterables = ("m", [1, 2])
c = Node(interface=C(), name="c")
d = Node(interface=D(), name="d")
d.itersource = ("b", "m")
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_file','in_file')]),
(b,c,[('out_file','in_file')])
(c,d,[('out_file','in_file')])
])
上面代码对应的分支图如下图所示:
关键点在于节点
d
的设置,其先设置了itersource
为b
和m
,也就是说d
节点的迭代是依赖于b
节点的m值,之后又执行了代码:
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
也就是说变化的是d节点的n变量,当m=1
时,n
值为1
和3
,m=2
时,n
值为5
和6
。
未完待续
网友评论