Operators
Nextflow操作符是一些方法,允许您将信道相互连接,或转换应用用户提供的某些规则的信道发出的值。
操作符可分为七组:
- Filtering operators
- Transforming operators
- Splitting operators
- Combining operators
- Forking operators
- Maths operators
- Other operators
Note
操作符set和subscribe是最终操作符,因此,如果使用它们,它们必须是组合操作符链中的最后一个操作符。
Filtering operators
对于给定的信道,过滤操作符允许您仅选择符合给定规则的项目。
可用的过滤操作符有:
filter
过滤器操作符允许您仅获取由满足条件的信道发出的项,并丢弃所有其他项。过滤条件可以通过使用正则表达式、文字值、类型限定符(例如Java类)或任何布尔Predicate来指定。(Predicate意思是判定一个参数是否符合要求)
下面的示例展示了如何使用正则表达式过滤信道,该表达式只返回以a开头的字符串:
Channel
.from( 'a', 'b', 'aa', 'bc', 3, 4.5 )
.filter( ~/^a.*/ )
.view()
a
aa
以下示例显示如何通过指定类型限定符来过滤信道,Number以便仅返回数字:
Channel
.from( 'a', 'b', 'aa', 'bc', 3, 4.5 )
.filter( Number )
.view()
3
4.5
最后,可以使用任何布尔Predicate来定义过滤条件。Predicate由返回布尔值的闭包表示。例如,以下片段显示了如何过滤发出数字的信道,以便返回奇数:
Channel
.from( 1, 2, 3, 4, 5 )
.filter { it % 2 == 1 }
.view()
1
3
5
Note
在上面的例子中,过滤条件被包裹在大括号中,而不是圆括号中,因为它指定了一个闭包作为操作符的参数。这只是一种语言语法糖filter({ it % 2 == 1 })
unique
惟一操作符允许您从信道中删除重复项,并且只发出没有重复的单个项。
例如:
Channel
.from( 1,1,1,5,7,7,7,3,3 )
.unique()
.view()
1
5
7
3
您还可以指定一个可选闭包,该闭包自定义其区分唯一项的方式。例如:
Channel
.from(1,3,4,5)
.unique { it % 2 }
.view()
1
4
distinct
distinct操作符允许您从信道中删除连续的重复项,以便发出的每个项与前一个项不同。例如:
Channel
.from( 1,1,2,2,2,3,1,1,2,2,3 )
.distinct()
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
2
3
1
2
3
Done
您还可以指定一个可选闭包,该闭包自定义其区分不同项的方式。例如:
Channel
.from( 1,1,2,2,2,3,1,1,2,4,6 )
.distinct { it % 2 }
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
2
3
2
Done
first
first操作符创建一个信道,该信道返回源信道发出的第一个项,或最终返回与可选条件匹配的第一个项。可以使用正则表达式、Java类类型或任何布尔Predicate来指定条件。例如:
// 如果没有指定条件,则抛出第一项:1
Channel
.from( 1, 2, 3 )
.first()
.view()
// 发出第一个字符串值:'a'
Channel
.from( 1, 2, 'a', 'b', 3 )
.first( String )
.view()
// 发出与正则表达式匹配的第一项:'aa'
Channel
.from( 'a', 'aa', 'aaa' )
.first( ~/aa.*/ )
.view()
// 发出第一个Predicate计算为true的项:4
Channel
.from( 1,2,3,4,5 )
.first { it > 3 }
.view()
randomSample
randomSample操作符允许创建一个信道,从应用的信道中随机抽取指定数量的项目。例如:
Channel
.from( 1..100 )
.randomSample( 10 )
.view()
上面的代码片段将打印从1到100的10个数字。
该操作符支持第二个参数,允许您为随机数生成器设置初始种子。通过设置它,randomSample操作符将始终返回相同的伪随机序列。例如:
Channel
.from( 1..100 )
.randomSample( 10, 234 )
.view()
上面的示例将打印10个范围在1到100之间的随机数。每次运行脚本时,都会返回相同的序列。
take
take操作符允许您只过滤由信道发出的前n项。例如:
Channel
.from( 1,2,3,4,5,6 )
.take( 3 )
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
2
3
Done
Note
通过指定值-1,操作符接受所有值。
See also until.
last
last操作符创建一个信道,该信道只返回源信道发出的最后一个项。例如:
Channel
.from( 1,2,3,4,5,6 )
.last()
.view()
6
until
until操作符创建一个信道,该信道返回源信道发出的项,并在验证指定的条件时停止。例如:
Channel
.from( 3,2,1,5,1,5 )
.until{ it==5 }
.view()
3
2
1
See also take.
Transforming operators
转换操作符用于将信道发出的项转换为新值。
这些操作符是:
map
map操作符将您选择的函数应用于信道发出的每个项,并将这样获得的项作为新信道返回。所应用的函数被称为映射函数,并用一个闭包表示,如下面的示例所示:
Channel
.from( 1, 2, 3, 4, 5 )
.map { it * it }
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
4
9
16
25
Done
flatMap
flatMap操作符将您选择的函数应用于由信道发出的每个项,并将这样获得的项作为新信道返回。尽管映射函数返回一个项目列表,但这个列表是扁平的,以便每个项目都独立发出。
例如:
// create a channel of numbers
numbers = Channel.from( 1, 2, 3 )
// map each number to a tuple (array), which items are emitted separately
results = numbers.flatMap { n -> [ n*2, n*3 ] }
// print the final results
results.subscribe onNext: { println it }, onComplete: { println 'Done' }
2
3
4
6
6
9
Done
关联数组也以同样的方式处理,因此每个数组条目都作为单个键值项发出。例如:
Channel.from ( 1, 2, 3 )
.flatMap { it -> [ number: it, square: it*it ] }
.view { it.key + ': ' + it.value }
number: 1
square: 1
number: 2
square: 4
number: 3
square: 9
reduce
reduce运算符将您选择的函数应用于信道发出的每个项。每次调用这个函数时,它都接受两个参数:首先是第i个发出的项,其次是之前调用函数本身的结果。结果与第i+1项一起传递给下一个函数调用,直到处理完所有项。
最后,reduce操作符将函数最后一次调用的结果作为唯一输出。
例如:
// 累积求和
Channel
.from( 1, 2, 3, 4, 5 )
.reduce { a, b -> println "a: $a b: $b"; return a+b }
.view { "result = $it" }
它将打印以下输出:
a: 1 b: 2
a: 3 b: 3
a: 6 b: 4
a: 10 b: 5
result = 15
Note
在常见的使用场景中,第一个函数参数用作累加器,第二个参数表示要处理的第i项。
你也可以指定一个种子值来初始化累加器参数,如下所示:
myChannel.reduce( seedValue ) { a, b -> ... }
groupTuple
groupTuple操作符收集源信道发出的值的元组(或列表),源信道将共享相同键的元素分组在一起。最后,它为收集到的每个不同的键发出一个新的元组对象。
换句话说,该操作符将一个元组序列(K, V, W, ..)转换为一个新的信道,发出一个序列(K, list(V), list(W), ..)
例如:
Channel
.from( [1,'A'], [1,'B'], [2,'C'], [3, 'B'], [1,'C'], [2, 'A'], [3, 'D'] )
.groupTuple()
.view()
[1, [A, B, C]]
[2, [C, A]]
[3, [B, D]]
默认情况下,元组中的第一个条目被用作分组键。通过使用by参数并指定要用作键的条目的索引(索引是基于零的),可以选择不同的键。例如,按每个元组中的第二个值分组:
Channel
.from( [1,'A'], [1,'B'], [2,'C'], [3, 'B'], [1,'C'], [2, 'A'], [3, 'D'] )
.groupTuple(by: 1)
.view()
[[1, 2], A]
[[1, 3], B]
[[2, 1], C]
[[3], D]
可用的参数:
Field | Description |
---|---|
by | 要用作分组键的元素的索引(基于零)。一个由多个元素组成的键可以定义为一个索引列表,例如:[0,2] |
sort | 定义分组项的排序标准。请参阅下面的可用排序选项 |
size | 分组列表必须包含的项目数。当达到指定的大小时,将触发元组 |
remainder | 当错误的不完整元组(即小于大小的分组项)被丢弃时(默认)。当为真时,不完整元组作为结束触发。仅当指定size参数时有效 |
排序选项:
Sort | Description |
---|---|
false | 没有应用排序(默认) |
true | 将分组的项目按项目的自然顺序排序,例如,用数字表示数字,用字典排序表示字符串,等等。参见http://docs.oracle.com/javase/tutorial/collections/interfaces/order.html |
hash | 按与每个条目关联的散列号对分组项进行排序 |
deep | 与前面的类似,但是哈希值是在实际的条目内容上创建的,例如,当条目是一个文件时,哈希值是在实际的文件内容上创建的 |
custom | 用于对包含值列表的元组元素进行排序的自定义排序标准。它可以通过Closure或Comparator对象来指定 |
Tip
您应该始终使用size属性指定每个元组中期望的元素数量,以允许groupTuple操作符尽快将收集的值流化。然而,在一些用例中,每个元组根据分组键具有不同的大小。在本例中,使用内置函数groupKey,该函数允许您创建一个特殊的分组键对象,可以将给定键的组大小关联到该对象。
buffer
buffer操作符将源信道发出的项收集到子集中,并分别发出这些子集。
有很多方法可以调节buffer如何将源信道中的项收集到子集中:
- buffer( closingCondition ):开始将信道发出的项目收集到子集中,直到验证关闭条件。 之后,子集被发送到结果信道,新项目被收集到一个新的子集中。 重复该过程,直到发送源信道中的最后一个值。 closingCondition可以指定为正则表达式、Java 类、文字值或必须满足的布尔Predicate 。 例如:
Channel
.from( 1,2,3,1,2,3 )
.buffer { it == 2 }
.view()
// 这里表示取到值为2就输出一次
[1,2]
[3,1,2]
- buffer( openingCondition, closingCondition ):一旦其中一个条目验证了打开条件,就开始收集信道发出的条目,并继续收集,直到有一个条目验证了关闭条件。然后发出子集,并继续应用所描述的逻辑,直到发出最后一个信道项。这两个条件可以定义为正则表达式、文字值、Java类或需要满足的布尔谓词。例如:
Channel
.from( 1,2,3,4,5,1,2,3,4,5,1,2 )
.buffer( 2, 4 )
.view()
// 2开始,4结束输出一次
[2,3,4]
[2,3,4]
- buffer( size: n ):转换源信道,使其发出由n个元素组成的元组。不完整的元组将被丢弃。例如:
Channel
.from( 1,2,3,1,2,3,1 )
.buffer( size: 2 )
.view()
// emitted values
[1, 2]
[3, 1]
[2, 3]
如果您想在包含少于n个元素的元组中生成最后的元素,只需添加指定为true的参数remainder,例如:
Channel
.from( 1,2,3,1,2,3,1 )
.buffer( size: 2, remainder: true )
.view()
// emitted values
[1, 2]
[3, 1]
[2, 3]
[1]
- Buffer (size: n, skip: m):和前面的例子一样,它发出包含n个元素的元组,但是在开始收集下一个元组(包括第一个发射)的值之前会跳过m个值。例如:
Channel
.from( 1,2,3,4,5,1,2,3,4,5,1,2 )
.buffer( size:3, skip:2 )
.view()
// emitted values
[3, 4, 5]
[3, 4, 5]
如果您想在包含少于n个元素的元组中生成其余的项,只需添加指定为true的参数remainder,如前面的示例所示。
See also: collate operator.
collate
collate操作符转换信道的方式是将发出的值分组到包含n项的元组中。例如:
Channel
.from(1,2,3,1,2,3,1)
.collate( 3 )
.view()
[1, 2, 3]
[1, 2, 3]
[1]
如上例所示,最后一个元组可能不完整,例如 包含少于指定大小的元素。 如果要避免这种情况,请将 false 指定为第二个参数。 例如:
Channel
.from(1,2,3,1,2,3,1)
.collate( 3, false )
.view()
[1, 2, 3]
[1, 2, 3]
collate操作符的第二个版本允许在大小之后指定在元组中收集元素的步骤。例如:
Channel
.from(1,2,3,4)
.collate( 3, 1 )
.view()
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]
与前面一样,如果不想发出没有完成元组的最后一项,请指定false作为第三个参数。
See also: buffer operator.
collect
collect操作符收集信道向List发送的所有项,并将结果对象作为唯一发送返回。例如:
Channel
.from( 1, 2, 3, 4 )
.collect()
.view()
// outputs
[1,2,3,4]
在将每个项添加到结果列表之前,可以指定一个可选闭包来转换每个项。例如:
Channel
.from( 'hello', 'ciao', 'bonjour' )
.collect { it.length() }
.view()
// outputs
[5,4,7]
See also: toList and toSortedList operator.
flatten
flatten操作符转换信道的方式是,将Collection或Array类型的每一项都扁平化,以便最终信道分别发出每一项。例如:
Channel
.from( [1,[2,3]], 4, [5,[6]] )
.flatten()
.view()
1
2
3
4
5
6
See also: flatMap operator.
toList
toList操作符收集信道向List对象发出的所有项,并将结果集合作为单个项发出。例如:
Channel
.from( 1, 2, 3, 4 )
.toList()
.subscribe onNext: { println it }, onComplete: { println 'Done' }
[1,2,3,4]
Done
See also: collect operator.
toSortedList
toSortedList操作符收集由一个对List对象进行排序的信道发出的所有项,并将结果集合作为单个项发出。例如:
Channel
.from( 3, 2, 1, 4 )
.toSortedList()
.subscribe onNext: { println it }, onComplete: { println 'Done' }
[1,2,3,4]
Done
您还可以将一个比较器闭包作为参数传递给toSortedList操作符来定制排序条件。例如,按元组的第二个元素降序排序:
Channel
.from( ["homer", 5], ["bart", 2], ["lisa", 10], ["marge", 3], ["maggie", 7])
.toSortedList( { a, b -> b[1] <=> a[1] } )
.view()
[[lisa, 10], [maggie, 7], [homer, 5], [marge, 3], [bart, 2]]
See also: collect operator.
transpose
转置操作符对信道进行转换,使发出的项是每个项中所有元组元素的转置结果。例如:
Channel.from([
['a', ['p', 'q'], ['u','v'] ],
['b', ['s', 't'], ['x','y'] ]
])
.transpose()
.view()
上述片段打印:
[a, p, u]
[a, q, v]
[b, s, x]
[b, t, y]
可用的参数:
Field | Description |
---|---|
by | 要转置的元素的下标(以零为基)。多个元素可以定义为索引列表,例如:[0,2] |
remainder | 当不完整元组被丢弃时(默认)。当为真时,触发的不完整元组中包含一个空来代替缺失的元素 |
Splitting operators
这些操作符用于将信道发出的项分割成可由下游操作符或流程处理的块。
可用的拆分操作符有:
splitCsv
splitCsv操作符允许您解析使用CSV格式格式化的信道发出的文本项,并将它们拆分为记录或将它们分组为具有指定长度的记录列表。
在最简单的情况下,只需将splitCsv操作符应用到发出CSV格式文本文件或文本条目的信道。例如:
Channel
.from( 'alpha,beta,gamma\n10,20,30\n70,80,90' )
.splitCsv()
.view { row -> "${row[0]} - ${row[1]} - ${row[2]}" }
上面的示例显示了CSV文本被解析并分割成单行。值可以通过行对象中的列索引访问。
当CSV以定义列名的标题行开始时,您可以指定参数header: true,它允许您通过名称引用每个值,如下面的示例所示:
Channel
.from( 'alpha,beta,gamma\n10,20,30\n70,80,90' )
.splitCsv(header: true)
.view { row -> "${row.alpha} - ${row.beta} - ${row.gamma}" }
它将打印
10 - 20 - 30
70 - 80 - 90
或者,你可以通过在头参数中指定字符串列表a来提供自定义头名称,如下所示:
Channel
.from( 'alpha,beta,gamma\n10,20,30\n70,80,90' )
.splitCsv(header: ['col1', 'col2', 'col3'], skip: 1 )
.view { row -> "${row.col1} - ${row.col2} - ${row.col3}" }
可用参数:
Field | Description |
---|---|
by | 每个块中的行数 |
sep | 用于分隔值的字符(默认值:,) |
quote | 值可以用单引号或双引号字符括起来 |
header | 当为true时,第一行用作列名。或者,它可以用于提供列名称列表 |
charset | 使用指定的字符集(如UTF-8)解析内容 |
strip | 从值中删除前导和尾随空格(默认:false) |
skip | 解析CSV内容时,文件开始忽略后的行数 |
limit | 将每个文件的检索记录数量限制为指定的值 |
decompress | 当为true时,在处理之前使用GZIP格式解压内容(注意:以.gz扩展名结尾的文件将自动解压) |
elem | 当该操作符应用于发出列表/元组对象的信道时,要分割的元素的索引(默认值:第一个文件对象或第一个元素) |
splitFasta
splitFasta操作符允许拆分由信道发出的条目,这些条目使用FASTA格式进行格式化。它返回一个信道,该信道为接收到的FASTA内容中的每个序列发出文本项。
splitFasta操作符生成的每个文本块中的序列数可以通过使用by参数来设置。下面的例子展示了如何读取FASTA文件,并将其分成块,每个块包含10个序列:
Channel
.fromPath('misc/sample.fa')
.splitFasta( by: 10 )
.view()
Warning
默认情况下,块保存在内存中。当分割大文件时指定参数file: true以将块保存到文件中,以避免引发OutOfMemoryException。有关详细信息,请参阅下面的可用参数表。
splitFasta操作符的第二个版本允许您将FASTA内容分割为记录对象,而不是文本块。记录对象包含一组允许您轻松访问和操作FASTA序列信息的字段。
为了将FASTA内容分割为记录对象,只需使用record参数指定所需字段的映射,如下面的示例所示:
Channel
.fromPath('misc/sample.fa')
.splitFasta( record: [id: true, seqString: true ])
.filter { record -> record.id =~ /^ENST0.*/ }
.view { record -> record.seqString }
Note
在本例中,文件misc/sample。fa被分割成包含id和seqString字段(即序列id和序列数据)的记录。下面的过滤操作符只保留ID以ENST0前缀开头的序列,最后使用订阅操作符打印序列内容。
可用参数:
Field | Description |
---|---|
by | 每个块中的行数 |
size | 定义预期块(如)的内存单元大小。1. mb |
limit | 将每个文件的检索序列数量限制为指定的值 |
record | 将FASTA文件中的每个条目解析为记录对象(可接受的值请参见下表) |
charset | 使用指定的字符集(如UTF-8)解析内容 |
compress | 当为true时,生成的文件块被GZIP压缩。块文件名会自动添加.gz后缀。 |
decompress | 当为true时,在处理之前使用GZIP格式解压内容(注意:以.gz扩展名结尾的文件将自动解压) |
file | 当为true时,将每个分割保存到一个文件。使用字符串而不是真值来创建带有特定名称的分割文件(分割索引号将自动添加)。最后,将此属性设置为现有目录,以便将分割文件保存到指定的文件夹中 |
elem | 当该操作符应用于发出列表/元组对象的信道时,要分割的元素的索引(默认值:第一个文件对象或第一个元素) |
使用record参数时,有以下字段:
Field | Description |
---|---|
id | FASTA序列标识符,即>符号后面直到第一个空白或换行符的单词 |
header | FASTA序列中没有>字符的第一行 |
desc | FASTA标题中ID值后面的文本 |
text | 包含头信息的完整FASTA序列 |
seqString | 序列数据为单行字符串,即不包含换行符 |
sequence | 序列数据作为多行字符串(总是以换行符结尾) |
width | 当使用序列字段时,定义单行的长度,然后序列数据在新行上继续 |
splitFastq
splitFastq 运算符允许您拆分信道发出的条目,这些条目使用 FASTQ 格式进行格式化。 它返回一个信道,该信道为接收到的项目中的每个序列发出一个文本块。
splitFastq 运算符生成的每个文本块中的序列数由参数 by 定义。 以下示例向您展示了如何读取 FASTQ 文件并将其拆分为包含 10 个序列的块:
Channel
.fromPath('misc/sample.fastq')
.splitFastq( by: 10 )
.view()
Warning
默认情况下,块保存在内存中。当分割大文件时指定参数file: true以将块保存到文件中,以避免引发OutOfMemoryException。有关详细信息,请参阅下面的可用参数表。
splitFastq操作符的第二个版本允许您将FASTQ格式的内容分割为记录对象,而不是文本块。记录对象包含一组字段,您可以轻松地访问和操作FASTQ序列数据。
为了将FASTQ序列分割成记录对象,只需使用record参数指定所需字段的映射,或者直接指定record: true,如下所示:
Channel
.fromPath('misc/sample.fastq')
.splitFastq( record: true )
.view { record -> record.readHeader }
最后,splitFastq操作符能够分割成对读的FASTQ文件。它必须应用于一个信道,该信道发出包含至少两个元素的元组,这些元素是要分割的文件。例如:
Channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq', flat:true)
.splitFastq(by: 100_000, pe:true, file:true)
.view()
Note
fromFilePairs要求使用flat:true选项将文件对作为生成的元组中的独立元素。
可用参数:
Field | Description |
---|---|
by | 定义每个块中读取的数量(默认值:1) |
pe | 当为true时拆分对端读取文件,因此源信道发出的项必须是元组,其中至少有两个元素是要拆分的读对文件 |
size | 定义预期块(如)的内存单元大小。1. mb |
limit | 将每个文件的检索序列数量限制为指定的值 |
record | 将FASTA文件中的每个条目解析为记录对象(可接受的值请参见下表) |
charset | 使用指定的字符集(如UTF-8)解析内容 |
compress | 当为true时,生成的文件块被GZIP压缩。块文件名会自动添加.gz后缀。 |
decompress | 当为true时,在处理之前使用GZIP格式解压内容(注意:以.gz扩展名结尾的文件将自动解压) |
file | 当为true时,将每个分割保存到一个文件。使用字符串而不是真值来创建带有特定名称的分割文件(分割索引号将自动添加)。最后,将此属性设置为现有目录,以便将分割文件保存到指定的文件夹中 |
elem | 当该操作符应用于发出列表/元组对象的信道时,要分割的元素的索引(默认值:第一个文件对象或第一个元素) |
使用record参数时,以下字段可用:
Field | Description |
---|---|
readHeader | 序列头(没有@前缀) |
readString | 原始序列数据 |
qualityHeader | 基本质量标头(可能为空) |
qualityString | 序列的质量值 |
splitText
该splitText运算符允许您将源信道发出的多行字符串或文本文件项拆分为包含n行的块,这些块将由结果信道发出。
例如:
Channel
.fromPath('/some/path/*.txt')
.splitText()
.view()
它用 suffix 分割文件的内容.txt,并逐行打印。
默认情况下,splitText运算符将每个项目拆分为一行的块。您可以使用参数 定义每个块中的行数by,如以下示例所示:
Channel
.fromPath('/some/path/*.txt')
.splitText( by: 10 )
.subscribe {
print it;
print "--- end of the chunk ---\n"
}
可以指定一个可选的闭包来转换操作符生成的文本块。以下示例显示了如何将文本文件拆分为 10 行的块并将它们转换为大写字母:
Channel
.fromPath('/some/path/*.txt')
.splitText( by: 10 ) { it.toUpperCase() }
.view()
Note
运算符返回的文本块splitText总是以newline字符结尾。
可用参数:
Field | Description |
---|---|
by | 定义每个块中读取的数量(默认值:1) |
limit | 将每个文件的检索序列数量限制为指定的值 |
charset | 使用指定的字符集(如UTF-8)解析内容 |
compress | 当为true时,生成的文件块被GZIP压缩。块文件名会自动添加.gz后缀。 |
decompress | 当为true时,在处理之前使用GZIP格式解压内容(注意:以.gz扩展名结尾的文件将自动解压) |
file | 当为true时,将每个分割保存到一个文件。使用字符串而不是真值来创建带有特定名称的分割文件(分割索引号将自动添加)。最后,将此属性设置为现有目录,以便将分割文件保存到指定的文件夹中 |
elem | 当该操作符应用于发出列表/元组对象的信道时,要分割的元素的索引(默认值:第一个文件对象或第一个元素) |
keepHeader | 将第一行作为头文件进行解析,并将其附加到每个发出的块 |
网友评论