美文网首页脑核磁共振数据处理——fMRI
Nipype学习笔记(3)——Nipype中的并行

Nipype学习笔记(3)——Nipype中的并行

作者: 韧心222 | 来源:发表于2018-07-06 09:06 被阅读149次

    7. 并行计算

    7.1 Iterables

    长期进行神经影像学数据处理的朋友应该都知道,对一批被试进行同样的数据处理是一种很常见的方式,例如要对所有的被试执行:

    • Dicom2Nifti
    • Remove first images
    • Slice Timing
    • Realign
    • Normalize
    • ...

    对于这样的基本需求,Nipype当然也帮我们准备好了一些并行处理的手段,那就是Iterables。此外,如果你还想尝试一下不同的参数对于数据处理的结果有什么影响,那么使用Gretna这样的软件通常会需要执行几遍(麻烦的是不是运行,而是需要重新设定参数,如果没有处理好,运行结果还可能彼此覆盖),这时候如果你能想到用Nipype中的Iterables,那就太好了。废话少说,先看东西:

    Example 1 试验不同参数

    from nipype import Node, Workflow
    from nipype.interfaces.fsl import BET, IsotropicSmooth
    
    # Initiate a skull stripping Node with BET
    skullstrip = Node(
          BET(mask=True, in_file='/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz'),
          name="skullstrip")
    isosmooth = Node(IsotropicSmooth(), name='iso_smooth')
    isosmooth.iterables = ("fwhm", [4, 8, 16])
    wf = Workflow(name="smoothflow")
    wf.base_dir = "/output"
    wf.connect(skullstrip, 'out_file', isosmooth, 'in_file')
    
    # Run it in parallel (one core for each smoothing kernel)
    wf.run('MultiProc', plugin_args={'n_procs': 3})
    

    在这个例子中,整个Workflow由两个节点组成,第一个是剥头骨的节点,第二个是进行平滑化的节点,为了测试不同的平滑化参数,因此设置了一个iterables,指定了参数fwhm的不同值,分别是[4, 8, 16]。Workflow在运行时指定使用了MultiProc插件(官方文档中称之为plugin,不知道为什么要这么称呼),插件的参数为{'n_procs': 3},也就是最多可以运行三个进程(如果不指定的话好像是根据系统情况自行选择,但是没说自动选择是个啥概念)。

    结果

    运行之后的结果可以从上图中看出来,对应于不同的参数,会生成三个不同的文件夹。

    Example 2 用同一套计算流程计算所有被试

    subject_list = ['01', '02', '03', '04', '05']
    from nipype import IdentityInterface
    infosource = Node(IdentityInterface(fields=['subject_id']),
                      name="infosource")
    infosource.iterables = [('subject_id', subject_list)]
    
    from os.path import join as opj
    from nipype.interfaces.io import SelectFiles, DataSink
    
    anat_file = opj('sub-{subject_id}', 'ses-test', 'anat', 'sub-{subject_id}_ses-test_T1w.nii.gz')
    
    templates = {'anat': anat_file}
    
    selectfiles = Node(SelectFiles(templates,
                                   base_directory='/data/ds000114'),
                       name="selectfiles")
    
    # Datasink - creates output folder for important outputs
    datasink = Node(DataSink(base_directory="/output",
                             container="datasink"),
                    name="datasink")
    
    wf_sub = Workflow(name="choosing_subjects")
    wf_sub.connect(infosource, "subject_id", selectfiles, "subject_id")
    wf_sub.connect(selectfiles, "anat", datasink, "anat_files")
    wf_sub.run()
    

    在这个例子中,是用的是IdentityInterface的iterables来处理不同的被试。这是最基本的套路,只要你想用Nipype来运行多个被试,就必然采用这种框架。

    7.2 MapNode

    在平时的计算中还有一种需求,就是当我们使用Iterable的时候,会生成多个输出,这个时候如果要将这些输出合并成一个列表并作为一个节点的输入(如图所示),那么该怎么办呢?这就需要我们的MapNode出场了。

    还是通过一个简单的小例子来看一下:

    from nipype import Function
    def square_func(x):
        return x ** 2
    square = Function(["x"], ["f_x"], square_func)
    square.run(x=2).outputs.f_x
    

    以上是一个计算平方的函数,我们用Function将其封装起来,这样当x=2时,square.run(x=2).outputs.f_x的输出结果就是4了。此时,如果我们用MapNode来封装Function,就可以得到一个带有迭代功能的Node了,代码如下:

    from nipype import MapNode
    square_node = MapNode(square, name="square", iterfield=["x"])
    square_node.inputs.x = [0, 1, 2, 3]
    res = square_node.run()
    

    需要注意的是,我们在构造MapNode对象的时候,指定了一个iterfiled参数,该参数表明哪些参数是需要迭代的。例如,在该示例中,iterfield被指向为x,因此可以在设置input.x的时候将其设置为[0,1,2,3],这样一来,workflow就会在运行的时候依次将0,1,2,3代入其中并运行。

    运行之后查看结果:

    res.outputs.f_x
    

    会看到结果为[0, 1, 4, 9]。如果你观察够仔细的话,会发现iterfield是一个列表,因此我们可以预见的是,其中可以迭代多个变量。

    在下面这个例子中,不妨引入两个变量来看看结果:

    def power_func(x, y):
        return x ** y
    power = Function(["x", "y"], ["f_xy"], power_func)
    power_node = MapNode(power, name="power", iterfield=["x", "y"])
    power_node.inputs.x = [0, 1, 2, 3]
    power_node.inputs.y = [0, 1, 2, 3]
    res = power_node.run()
    print(res.outputs.f_xy)
    

    在上面这个例子中,我们将iterfield参数设置为["x", "y"],而xy的变化都是从0到4,这样一来运行之后的结果是[1, 1, 4, 27]

    目前,在我接触到的处理中还不涉及这一用途,所以用的还比较少。

    7.3 JoinNode

    就如官方文档中所说,JoinNode的作用和Iterable完全相反,Iterable会产生多个并行的分支,但是JoinNode则会将多个分支进行合并(如下图所示)

    JoinNode
    其中,D就是一个JoinNode,按照我目前的理解,Iterables+JoinNode = MapNode,当然,使用Iterables+JoinNode的方式要比MapNode的方式要灵活的多,因为MapNode刚刚分手就和好了嘛,而Iterables+JoinNode却可以像以色列一样,分手千年,仍能复国。

    下面的代码是上面这张图的伪代码,在编写程序的时候可以参考一下:

    from nipype import Node, JoinNode, Workflow
    
    # Specify fake input node A
    a = Node(interface=A(), name="a")
    
    # Iterate over fake node B's input 'in_file?
    b = Node(interface=B(), name="b")
    b.iterables = ('in_file', [file1, file2])
    
    # Pass results on to fake node C
    c = Node(interface=C(), name="c")
    
    # Join forked execution workflow in fake node D
    d = JoinNode(interface=D(),
                 joinsource="b",
                 joinfield="in_files",
                 name="d")
    
    # Put everything into a workflow as usual
    workflow = Workflow(name="workflow")
    workflow.connect([(a, b, [('subject', 'subject')]),
                      (b, c, [('out_file', 'in_file')])
                      (c, d, [('out_file', 'in_files')])
                      ])
    

    7.3 并行中的一些参数设置

    7.3.1 synchronize

    synchronize的本意是同步,在Nipype中表示对于两个迭代变量,可以并行执行,这句话比较拗口,直接看图最合适不过了(都是官方的图和代码)。

    b.iterables = [("m", [1, 2]), ("n", [3, 4])]
    
    image.png

    假设节点b有两个迭代变量mn,其迭代值分别是[1, 2][3, 4],如果我们不设置synchronize变量,那么正常情况下,会生成四条分支,也就是m和n值的组合:[1,3][1,4][2, 3][2,4]
    一旦我们设置了synchronize变量:

    b.iterables = [("m", [1, 2]), ("n", [3, 4])]
    b.synchronize = True
    

    就会生成如下的分支图:


    image.png

    如图所示,设置synchronize后,就会生成mn两两配对的分支。

    7.3.2 itersource

    itersource其实可以看成是synchronize的高级版本,能够实现更加复杂的分支控制。关于其功能,官方文档只给出了一句话的描述,对于这句话该如何翻译,我还拿捏的不是很准确,暂且先放在这里"The itersource feature allows you to expand a downstream iterable based on a mapping of an upstream iterable.",编程嘛,还是通过实际例子来看看会比较好。

    a = Node(interface=A(), name="a")
    b = Node(interface=B(), name="b")
    b.iterables = ("m", [1, 2])
    c = Node(interface=C(), name="c")
    d = Node(interface=D(), name="d")
    d.itersource = ("b", "m")
    d.iterables = [("n", {1:[3,4], 2:[5,6]})]
    my_workflow = Workflow(name="my_workflow")
    my_workflow.connect([(a,b,[('out_file','in_file')]),
                         (b,c,[('out_file','in_file')])
                         (c,d,[('out_file','in_file')])
                         ])
    

    上面代码对应的分支图如下图所示:

    image.png
    关键点在于节点d的设置,其先设置了itersourcebm,也就是说d节点的迭代是依赖于b节点的m值,之后又执行了代码:
    d.iterables = [("n", {1:[3,4], 2:[5,6]})]
    

    也就是说变化的是d节点的n变量,当m=1时,n值为13m=2时,n值为56

    未完待续

    相关文章

      网友评论

        本文标题:Nipype学习笔记(3)——Nipype中的并行

        本文链接:https://www.haomeiwen.com/subject/ucczyftx.html