NextFlow用法3--process方法(2)

作者: 京古 | 来源:发表于2019-06-06 13:58 被阅读1次

5. Process per file range

Problem

You need to execute a task over two or more series of files having a common index range.

Solution

Use a the from method define the range over which repeat the task execution, then chain it with a map operator to associate the each index the corresponding input files. Finally use the resulting channel as input for the process.

Code

Channel
  .from(1..23)
  .map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
  .set { pairs_ch }

process foo {
  tag "$sampleId"

  input:
  set sampleId, file(indels), file(snps) from pairs_ch

  """  echo foo_command --this $indels --that $snps  """
}

Run it

nextflow run patterns/process-per-file-range.nf

6. map

map操作符将您选择的函数应用于通道发出的每一项,并返回作为新通道获得的项。应用的函数称为映射函数,并且是 用闭包表示,如下例所示:

Channel
  .from(1,2,3,4,5)
  .map { it * it }
  .subscribe onNext {println it }, onComplete { println 'Done' }
#结果:
1
4
9
16
25
Done

7. collect

Collection操作符将通道发出的所有内容收集到一个列表中,并将结果对象作为唯一的内容返回。
例一:

Channel
    .from( 1, 2, 3, 4 )
    .collect()
    .println()
# outputs
[1,2,3,4]

可以指定一个闭包,以便在将每个内容添加到结果列表之前对其进行转换。
例二:

Channel
    .from( 'hello', 'ciao', 'bonjour' )
    .collect { it.length() }
    .println()
# outputs
[5,4,7]

例三:

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """  < $x zcat > file.fq  """
}
process bar {
  echo true
  input:
  file '*.fq' from unzipped_ch.collect()
  """  cat *.fq  """
}

8. Collect outputs into a file

Problem

You need to concatenate into a single file all output files produced by an upstream process.

需要将上游process产生的所有输出文件连接到单个文件中。使用 collectFile 就可以完成.

Solution

Use the collectFile operator to merge all the output files into a single file.

Code

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """  < $x zcat > file.fq  """
}

unzipped_ch
      .collectFile()
      .println{ it.text }

9. Store process outputs

Problem

You need to store the outputs of one or more processes into a directory structure of your choice.

把生成的结果输出到指定目录中,使用 publishDir 即可。

Solution

Use the publishDir directive to set a custom directory where the process outputs need to be made available.

Code

params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'
Channel.fromFilePairs(params.reads).set{ samples_ch }
process foo {
  publishDir "$params.outdir/$sampleId"
  input:
  set sampleId, file(samples) from samples_ch
  output:
  file '*.fq'

  script:
  """  < ${samples[0]} zcat > sample_1.fq  < ${samples[1]} zcat > sample_2.fq  """
}

相关文章

网友评论

    本文标题:NextFlow用法3--process方法(2)

    本文链接:https://www.haomeiwen.com/subject/wmoyxctx.html