美文网首页
06-Nextflow Channels

06-Nextflow Channels

作者: 亦是旅人呐 | 来源:发表于2021-09-12 18:49 被阅读0次

Nextflow基于Dataflow编程模型,在该模型中进程通过信道进行通信。

信道有两个主要属性:
1、发送消息是一个异步操作,它可以立即完成,而不必等待接收进程。
2、接收数据是一个阻塞操作,它停止接收过程,直到消息到达。


信道类型

Nextflow区分了两种不同的信道: 队列信道和值信道。

队列信道

队列信道是一个非阻塞的单向FIFO队列,它连接两个processes或operators。

队列信道通常使用from, fromPath等factory方法创建,或者使用map, flatMap等信道操作符链接它。

队列信道也由进程输出声明使用into子句创建。

值信道

根据定义,值信道又名单例信道绑定到单个值,并且可以无限次读取而不消耗其内容。

Tip
因此,一个值信道可以被多个进程用作输入。

值信道是使用value工厂方法或通过返回单个值的运算符创建的,例如我们的 first、last、collect、count、min、max、reduce、sum。

Note
当输入在 from 子句中指定一个简单值时,值信道由进程隐式创建。 此外,还隐式地创建了一个价值渠道,作为其输入仅为价值渠道的流程的输出。

例如:

process foo {
  input:
  val x from 1
  output:
  file 'x.txt' into result

  """
  echo $x > x.txt
  """
}

上面代码段中的流程声明了一个输入,它隐式地是一个值信道。因此,结果输出也是一个值信道,可以被多个进程读取。
See also: Understand how multiple input channels work.


信道工厂

信道可以由流程输出声明隐式创建,也可以使用以下信道工厂方法显式创建。

现有工厂方法是:

of

of方法允许创建一个信道,发出指定为方法参数的任意值序列,例如:

ch = Channel.of( 1, 3, 5, 7 )
ch.view { "value: $it" }

Tip
相应扩展值的范围。

Channel
    .of(1..23, 'X', 'Y')
    .view()

输出

1
2
3
4
:
23
X
Y

Note
该特性需要Nextflow 19.10.0或更高版本。

value

值工厂方法用于创建值信道。可以指定一个可选的非空参数来将信道绑定到一个特定的值。例如:

expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )

示例中的第一行创建了一个' empty '变量。第二行创建一个信道并将一个字符串绑定到它。最后,最后一个创建了一个信道,并将一个列表对象绑定到它,该对象将作为唯一的发射。

fromList

fromList方法允许您创建一个信道,以元素列表的形式发出提供的值,例如:

Channel
    .fromList( ['a', 'b', 'c', 'd'] )
    .view { "value: $it" }

打印

value: a
value: b
value: c
value: d

fromPath

通过使用fromPath方法并将路径字符串指定为参数,可以创建一个发出一个或多个文件路径的信道。例如:

myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )

上面的行创建了一个信道,并为其绑定了一个引用指定文件的Path项。

Note
它不检查文件是否存在。

当fromPath参数包含*或?通配符,它被解释为一个全局路径匹配器。例如:

myFileChannel = Channel.fromPath( '/data/big/*.txt' )

这个例子创建了一个信道,并发出与/data/big文件夹中以txt扩展名的文件一样多的Path项。

Tip
两个星号,即*,与类似,但跨越目录边界。此语法通常用于匹配完整路径。花括号指定子模式的集合。

例如:

files = Channel.fromPath( 'data/**.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )

第一行返回一个信道,发出data文件夹中以.fa结尾的文件,并递归地发送到它的所有子文件夹中。而第二个只发出数据路径中任意子文件夹中具有相同后缀的文件。最后,最后一个示例发出两个文件:data/file_1。fq和数据/ file_2.fq。

Note
在Linux Bash中,*通配符不匹配隐藏文件(例如,文件名以a开头的文件。字符)。

为了包含隐藏文件,您需要以句点字符开始模式或指定hidden: true选项。例如:

expl1 = Channel.fromPath( '/path/.*' )
expl2 = Channel.fromPath( '/path/.*.fa' )
expl3 = Channel.fromPath( '/path/*', hidden: true )

第一个示例返回指定路径下的所有隐藏文件。第二个函数返回所有以.fa后缀结尾的隐藏文件。最后一个示例返回该路径中的所有文件(隐藏的和非隐藏的)。

默认情况下,glob模式只查找符合指定条件的常规文件路径,即它不会返回目录路径。

您可以使用参数类型指定值file、dir或any来定义您想要的路径类型。例如:

myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )

第一个示例将返回所有以b后缀结尾的目录路径,而第二个示例将返回所有以a前缀开头的文件和目录。

Name Description
glob 当为true时,将字符*、?、[]和{}作为通配符,否则将它们作为普通字符处理(默认为true)
type 返回的路径类型,file, dir或any(默认:file)
hidden 当为true时,结果路径中包含隐藏文件(默认:false)
maxDepth 要访问的目录级别的最大数目(默认:no limit)
followLinks 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true)
relative 当为true时,返回的路径相对于最顶部的常见目录(默认:false)
checkIfExists 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false)

Note
可以使用list作为参数指定多个路径或glob模式:

Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )

(需要0.31.x或更高版本)

fromFilePairs

fromFilePairs方法创建一个信道,发出匹配用户提供的glob模式的文件对。匹配的文件作为元组发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:

Channel
    .fromFilePairs('/my/data/SRR*_{1,2}.fastq')
    .view()

它将产生类似如下的输出:

[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]

Note
glob模式必须至少包含一个星号通配符。

另外,也可以实现自定义文件对分组策略,提供一个闭包,给定当前文件作为参数,返回分组键。例如:

Channel
    .fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
    .view { ext, files -> "Files with the extension $ext are $files" }

可选参数表:

Name Description
type 返回的路径类型,file, dir或any(默认:file)
hidden 当为true时,结果路径中包含隐藏文件(默认:false)
maxDepth 要访问的目录级别的最大数目(默认:no limit)
followLinks 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true)
size 定义每个发出的项期望保存的文件数量(默认值:2)
flat 当为true时,匹配的文件将作为发出的元组中的唯一元素生成(默认为false)
checkIfExists 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false)

fromSRA

fromSRA方法查询NCBI SRA数据库,并返回一个发送符合指定条件(即项目或登录号)的FASTQ文件的信道。例如:

Channel
    .fromSRA('SRP043510')
    .view()

返回值:

[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)

可以使用列表对象指定多个登录id:

ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
    .fromSRA(ids)
    .view()

Note
隐式管理的读对作为文件列表返回。

Tip
在幕后它使用了NCBI搜索API,因此fromSRA方法允许使用该API支持的任何查询词。

可选参数表:

Name Description
apiKey NCBI用户API密钥
cache 启用/禁用缓存API请求(默认:true)
max 可以重试的最大条目数(默认:unlimited)

要访问NCBI搜索服务,需要提供以下两种API密钥:

  • 使用apiKey可选参数,例如Channel.fromSRA(ids, apiKey:'0123456789abcdef')。
  • 在您的环境中导出NCBI_API_KEY变量,例如export NCBI_API_KEY=0123456789abcdef。

Note
该特性需要Nextflow版本19.04.0或更高版本。

watchPath

watchPath方法监视与指定模式匹配的一个或多个文件的文件夹。只要有满足指定条件的文件,它就通过watchPath方法返回的信道发出。要监视的文件条件可以使用*或?通配符,即通过指定全局路径匹配条件。

Channel
   .watchPath( '/path/*.fa' )
   .subscribe { println "Fasta file: $it" }

默认情况下,它只监视在指定文件夹中创建的新文件。还可以提供第二个参数,指定要监视的事件。支持的事件有:

Name Description
create 创建一个新文件(默认)
modify 文件被修改
delete 文件被删除

可以使用逗号分隔的字符串指定多个事件,如下所示:

Channel
   .watchPath( '/path/*.fa', 'create,modify' )
   .subscribe { println "File created or modified: $it" }

Warning
watchPath工厂无休止地等待与指定模式和事件匹配的文件。因此,无论何时在脚本中使用它,生成的管道都不会结束。

See also: fromPath factory method.

empty

根据定义,empty工厂方法创建一个不发出任何值的通道。

See also: ifEmpty and close operators.


Binding values

因为在Nextflow中,信道是使用数据流变量或队列实现的。因此,发送消息相当于将一个值绑定到表示通信信道的对象上。

bind

信道对象提供了bind()方法,这是通过信道发送消息的基本操作。例如:

myChannel = Channel.create()
myChannel.bind( 'Hello world' )

operator <<

操作符<<只是bind方法的语法糖。因此,下面的例子产生了与前一个相同的结果:

myChannel = Channel.create()
myChannel << 'Hello world'

Observing events

subscribe

订阅方法允许您在每次源信道发出新值时执行用户定义的函数。

发出的值隐式地传递给指定的函数。例如:

// 定义一个发送三个值的信道
source = Channel.from ( 'alpha', 'beta', 'delta' )

// 向输出发出值的信道道subscribe 一个函数
source.subscribe {  println "Got: $it"  }

// 输出
Got: alpha
Got: beta
Got: delta

Note
形式上,用户定义的函数是一个Closure,它是由Groovy编程语言定义的,Nextflow脚本基于Groovy编程语言。

如果需要,可以显式定义闭包参数,使用it以外的名称,并可选地指定期望的值类型,如下面的示例所示:

Channel
    .from( 'alpha', 'beta', 'lambda' )
    .subscribe { String str ->
        println "Got: ${str}; len: ${str.size()}"
     }

// results
Got: alpha; len: 5
Got: beta; len: 4
Got: lambda; len: 6

onNext, onComplete, and onError

subscribe方法可以接受以下事件处理程序中的一个或多个:

  • onNext: 注册一个函数,该函数在通道发出值时被调用。这与使用上面示例中描述的带有plain闭包的subscribe是一样的。
  • onComplete: 注册一个在通道发出最后一个值之后调用的函数。
  • onError: 注册一个在处理onNext事件时引发异常时调用的函数。它不会进一步调用onNext或onComplete。onError方法将导致错误的Throwable作为参数。
Channel
    .from( 1, 2, 3 )
    .subscribe onNext: { println it }, onComplete: { println 'Done' }

// 输出
1
2
3
Done

相关文章

网友评论

      本文标题:06-Nextflow Channels

      本文链接:https://www.haomeiwen.com/subject/gxupwltx.html