Nextflow基于Dataflow编程模型,在该模型中进程通过信道进行通信。
信道有两个主要属性:
1、发送消息是一个异步操作,它可以立即完成,而不必等待接收进程。
2、接收数据是一个阻塞操作,它停止接收过程,直到消息到达。
信道类型
Nextflow区分了两种不同的信道: 队列信道和值信道。
队列信道
队列信道是一个非阻塞的单向FIFO队列,它连接两个processes或operators。
队列信道通常使用from, fromPath等factory方法创建,或者使用map, flatMap等信道操作符链接它。
队列信道也由进程输出声明使用into子句创建。
值信道
根据定义,值信道又名单例信道绑定到单个值,并且可以无限次读取而不消耗其内容。
Tip
因此,一个值信道可以被多个进程用作输入。
值信道是使用value工厂方法或通过返回单个值的运算符创建的,例如我们的 first、last、collect、count、min、max、reduce、sum。
Note
当输入在 from 子句中指定一个简单值时,值信道由进程隐式创建。 此外,还隐式地创建了一个价值渠道,作为其输入仅为价值渠道的流程的输出。
例如:
process foo {
input:
val x from 1
output:
file 'x.txt' into result
"""
echo $x > x.txt
"""
}
上面代码段中的流程声明了一个输入,它隐式地是一个值信道。因此,结果输出也是一个值信道,可以被多个进程读取。
See also: Understand how multiple input channels work.
信道工厂
信道可以由流程输出声明隐式创建,也可以使用以下信道工厂方法显式创建。
现有工厂方法是:
of
of方法允许创建一个信道,发出指定为方法参数的任意值序列,例如:
ch = Channel.of( 1, 3, 5, 7 )
ch.view { "value: $it" }
Tip
相应扩展值的范围。
Channel
.of(1..23, 'X', 'Y')
.view()
输出
1
2
3
4
:
23
X
Y
Note
该特性需要Nextflow 19.10.0或更高版本。
value
值工厂方法用于创建值信道。可以指定一个可选的非空参数来将信道绑定到一个特定的值。例如:
expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )
示例中的第一行创建了一个' empty '变量。第二行创建一个信道并将一个字符串绑定到它。最后,最后一个创建了一个信道,并将一个列表对象绑定到它,该对象将作为唯一的发射。
fromList
fromList方法允许您创建一个信道,以元素列表的形式发出提供的值,例如:
Channel
.fromList( ['a', 'b', 'c', 'd'] )
.view { "value: $it" }
打印
value: a
value: b
value: c
value: d
fromPath
通过使用fromPath方法并将路径字符串指定为参数,可以创建一个发出一个或多个文件路径的信道。例如:
myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )
上面的行创建了一个信道,并为其绑定了一个引用指定文件的Path项。
Note
它不检查文件是否存在。
当fromPath参数包含*或?通配符,它被解释为一个全局路径匹配器。例如:
myFileChannel = Channel.fromPath( '/data/big/*.txt' )
这个例子创建了一个信道,并发出与/data/big文件夹中以txt扩展名的文件一样多的Path项。
Tip
两个星号,即*,与类似,但跨越目录边界。此语法通常用于匹配完整路径。花括号指定子模式的集合。
例如:
files = Channel.fromPath( 'data/**.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )
第一行返回一个信道,发出data文件夹中以.fa结尾的文件,并递归地发送到它的所有子文件夹中。而第二个只发出数据路径中任意子文件夹中具有相同后缀的文件。最后,最后一个示例发出两个文件:data/file_1。fq和数据/ file_2.fq。
Note
在Linux Bash中,*通配符不匹配隐藏文件(例如,文件名以a开头的文件。字符)。
为了包含隐藏文件,您需要以句点字符开始模式或指定hidden: true选项。例如:
expl1 = Channel.fromPath( '/path/.*' )
expl2 = Channel.fromPath( '/path/.*.fa' )
expl3 = Channel.fromPath( '/path/*', hidden: true )
第一个示例返回指定路径下的所有隐藏文件。第二个函数返回所有以.fa后缀结尾的隐藏文件。最后一个示例返回该路径中的所有文件(隐藏的和非隐藏的)。
默认情况下,glob模式只查找符合指定条件的常规文件路径,即它不会返回目录路径。
您可以使用参数类型指定值file、dir或any来定义您想要的路径类型。例如:
myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )
第一个示例将返回所有以b后缀结尾的目录路径,而第二个示例将返回所有以a前缀开头的文件和目录。
Name | Description |
---|---|
glob | 当为true时,将字符*、?、[]和{}作为通配符,否则将它们作为普通字符处理(默认为true) |
type | 返回的路径类型,file, dir或any(默认:file) |
hidden | 当为true时,结果路径中包含隐藏文件(默认:false) |
maxDepth | 要访问的目录级别的最大数目(默认:no limit) |
followLinks | 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true) |
relative | 当为true时,返回的路径相对于最顶部的常见目录(默认:false) |
checkIfExists | 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false) |
Note
可以使用list作为参数指定多个路径或glob模式:Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )
(需要0.31.x或更高版本)
fromFilePairs
fromFilePairs方法创建一个信道,发出匹配用户提供的glob模式的文件对。匹配的文件作为元组发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:
Channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq')
.view()
它将产生类似如下的输出:
[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]
Note
glob模式必须至少包含一个星号通配符。
另外,也可以实现自定义文件对分组策略,提供一个闭包,给定当前文件作为参数,返回分组键。例如:
Channel
.fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
.view { ext, files -> "Files with the extension $ext are $files" }
可选参数表:
Name | Description |
---|---|
type | 返回的路径类型,file, dir或any(默认:file) |
hidden | 当为true时,结果路径中包含隐藏文件(默认:false) |
maxDepth | 要访问的目录级别的最大数目(默认:no limit) |
followLinks | 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true) |
size | 定义每个发出的项期望保存的文件数量(默认值:2) |
flat | 当为true时,匹配的文件将作为发出的元组中的唯一元素生成(默认为false) |
checkIfExists | 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false) |
fromSRA
fromSRA方法查询NCBI SRA数据库,并返回一个发送符合指定条件(即项目或登录号)的FASTQ文件的信道。例如:
Channel
.fromSRA('SRP043510')
.view()
返回值:
[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)
可以使用列表对象指定多个登录id:
ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
.fromSRA(ids)
.view()
Note
隐式管理的读对作为文件列表返回。
Tip
在幕后它使用了NCBI搜索API,因此fromSRA方法允许使用该API支持的任何查询词。
可选参数表:
Name | Description |
---|---|
apiKey | NCBI用户API密钥 |
cache | 启用/禁用缓存API请求(默认:true) |
max | 可以重试的最大条目数(默认:unlimited) |
要访问NCBI搜索服务,需要提供以下两种API密钥:
- 使用apiKey可选参数,例如Channel.fromSRA(ids, apiKey:'0123456789abcdef')。
- 在您的环境中导出NCBI_API_KEY变量,例如export NCBI_API_KEY=0123456789abcdef。
Note
该特性需要Nextflow版本19.04.0或更高版本。
watchPath
watchPath方法监视与指定模式匹配的一个或多个文件的文件夹。只要有满足指定条件的文件,它就通过watchPath方法返回的信道发出。要监视的文件条件可以使用*或?通配符,即通过指定全局路径匹配条件。
Channel
.watchPath( '/path/*.fa' )
.subscribe { println "Fasta file: $it" }
默认情况下,它只监视在指定文件夹中创建的新文件。还可以提供第二个参数,指定要监视的事件。支持的事件有:
Name | Description |
---|---|
create | 创建一个新文件(默认) |
modify | 文件被修改 |
delete | 文件被删除 |
可以使用逗号分隔的字符串指定多个事件,如下所示:
Channel
.watchPath( '/path/*.fa', 'create,modify' )
.subscribe { println "File created or modified: $it" }
Warning
watchPath工厂无休止地等待与指定模式和事件匹配的文件。因此,无论何时在脚本中使用它,生成的管道都不会结束。
See also: fromPath factory method.
empty
根据定义,empty工厂方法创建一个不发出任何值的通道。
See also: ifEmpty and close operators.
Binding values
因为在Nextflow中,信道是使用数据流变量或队列实现的。因此,发送消息相当于将一个值绑定到表示通信信道的对象上。
bind
信道对象提供了bind()方法,这是通过信道发送消息的基本操作。例如:
myChannel = Channel.create()
myChannel.bind( 'Hello world' )
operator <<
操作符<<只是bind方法的语法糖。因此,下面的例子产生了与前一个相同的结果:
myChannel = Channel.create()
myChannel << 'Hello world'
Observing events
subscribe
订阅方法允许您在每次源信道发出新值时执行用户定义的函数。
发出的值隐式地传递给指定的函数。例如:
// 定义一个发送三个值的信道
source = Channel.from ( 'alpha', 'beta', 'delta' )
// 向输出发出值的信道道subscribe 一个函数
source.subscribe { println "Got: $it" }
// 输出
Got: alpha
Got: beta
Got: delta
Note
形式上,用户定义的函数是一个Closure,它是由Groovy编程语言定义的,Nextflow脚本基于Groovy编程语言。
如果需要,可以显式定义闭包参数,使用it
以外的名称,并可选地指定期望的值类型,如下面的示例所示:
Channel
.from( 'alpha', 'beta', 'lambda' )
.subscribe { String str ->
println "Got: ${str}; len: ${str.size()}"
}
// results
Got: alpha; len: 5
Got: beta; len: 4
Got: lambda; len: 6
onNext, onComplete, and onError
subscribe方法可以接受以下事件处理程序中的一个或多个:
- onNext: 注册一个函数,该函数在通道发出值时被调用。这与使用上面示例中描述的带有plain闭包的subscribe是一样的。
- onComplete: 注册一个在通道发出最后一个值之后调用的函数。
- onError: 注册一个在处理onNext事件时引发异常时调用的函数。它不会进一步调用onNext或onComplete。onError方法将导致错误的Throwable作为参数。
Channel
.from( 1, 2, 3 )
.subscribe onNext: { println it }, onComplete: { println 'Done' }
// 输出
1
2
3
Done
网友评论