美文网首页
06-Nextflow Channels

06-Nextflow Channels

作者: 亦是旅人呐 | 来源:发表于2021-09-12 18:49 被阅读0次

    Nextflow基于Dataflow编程模型,在该模型中进程通过信道进行通信。

    信道有两个主要属性:
    1、发送消息是一个异步操作,它可以立即完成,而不必等待接收进程。
    2、接收数据是一个阻塞操作,它停止接收过程,直到消息到达。


    信道类型

    Nextflow区分了两种不同的信道: 队列信道和值信道。

    队列信道

    队列信道是一个非阻塞的单向FIFO队列,它连接两个processes或operators。

    队列信道通常使用from, fromPath等factory方法创建,或者使用map, flatMap等信道操作符链接它。

    队列信道也由进程输出声明使用into子句创建。

    值信道

    根据定义,值信道又名单例信道绑定到单个值,并且可以无限次读取而不消耗其内容。

    Tip
    因此,一个值信道可以被多个进程用作输入。

    值信道是使用value工厂方法或通过返回单个值的运算符创建的,例如我们的 first、last、collect、count、min、max、reduce、sum。

    Note
    当输入在 from 子句中指定一个简单值时,值信道由进程隐式创建。 此外,还隐式地创建了一个价值渠道,作为其输入仅为价值渠道的流程的输出。

    例如:

    process foo {
      input:
      val x from 1
      output:
      file 'x.txt' into result
    
      """
      echo $x > x.txt
      """
    }
    

    上面代码段中的流程声明了一个输入,它隐式地是一个值信道。因此,结果输出也是一个值信道,可以被多个进程读取。
    See also: Understand how multiple input channels work.


    信道工厂

    信道可以由流程输出声明隐式创建,也可以使用以下信道工厂方法显式创建。

    现有工厂方法是:

    of

    of方法允许创建一个信道,发出指定为方法参数的任意值序列,例如:

    ch = Channel.of( 1, 3, 5, 7 )
    ch.view { "value: $it" }
    

    Tip
    相应扩展值的范围。

    Channel
        .of(1..23, 'X', 'Y')
        .view()
    

    输出

    1
    2
    3
    4
    :
    23
    X
    Y
    

    Note
    该特性需要Nextflow 19.10.0或更高版本。

    value

    值工厂方法用于创建值信道。可以指定一个可选的非空参数来将信道绑定到一个特定的值。例如:

    expl1 = Channel.value()
    expl2 = Channel.value( 'Hello there' )
    expl3 = Channel.value( [1,2,3,4,5] )
    

    示例中的第一行创建了一个' empty '变量。第二行创建一个信道并将一个字符串绑定到它。最后,最后一个创建了一个信道,并将一个列表对象绑定到它,该对象将作为唯一的发射。

    fromList

    fromList方法允许您创建一个信道,以元素列表的形式发出提供的值,例如:

    Channel
        .fromList( ['a', 'b', 'c', 'd'] )
        .view { "value: $it" }
    

    打印

    value: a
    value: b
    value: c
    value: d
    

    fromPath

    通过使用fromPath方法并将路径字符串指定为参数,可以创建一个发出一个或多个文件路径的信道。例如:

    myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )
    

    上面的行创建了一个信道,并为其绑定了一个引用指定文件的Path项。

    Note
    它不检查文件是否存在。

    当fromPath参数包含*或?通配符,它被解释为一个全局路径匹配器。例如:

    myFileChannel = Channel.fromPath( '/data/big/*.txt' )
    

    这个例子创建了一个信道,并发出与/data/big文件夹中以txt扩展名的文件一样多的Path项。

    Tip
    两个星号,即*,与类似,但跨越目录边界。此语法通常用于匹配完整路径。花括号指定子模式的集合。

    例如:

    files = Channel.fromPath( 'data/**.fa' )
    moreFiles = Channel.fromPath( 'data/**/*.fa' )
    pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )
    

    第一行返回一个信道,发出data文件夹中以.fa结尾的文件,并递归地发送到它的所有子文件夹中。而第二个只发出数据路径中任意子文件夹中具有相同后缀的文件。最后,最后一个示例发出两个文件:data/file_1。fq和数据/ file_2.fq。

    Note
    在Linux Bash中,*通配符不匹配隐藏文件(例如,文件名以a开头的文件。字符)。

    为了包含隐藏文件,您需要以句点字符开始模式或指定hidden: true选项。例如:

    expl1 = Channel.fromPath( '/path/.*' )
    expl2 = Channel.fromPath( '/path/.*.fa' )
    expl3 = Channel.fromPath( '/path/*', hidden: true )
    

    第一个示例返回指定路径下的所有隐藏文件。第二个函数返回所有以.fa后缀结尾的隐藏文件。最后一个示例返回该路径中的所有文件(隐藏的和非隐藏的)。

    默认情况下,glob模式只查找符合指定条件的常规文件路径,即它不会返回目录路径。

    您可以使用参数类型指定值file、dir或any来定义您想要的路径类型。例如:

    myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
    myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )
    

    第一个示例将返回所有以b后缀结尾的目录路径,而第二个示例将返回所有以a前缀开头的文件和目录。

    Name Description
    glob 当为true时,将字符*、?、[]和{}作为通配符,否则将它们作为普通字符处理(默认为true)
    type 返回的路径类型,file, dir或any(默认:file)
    hidden 当为true时,结果路径中包含隐藏文件(默认:false)
    maxDepth 要访问的目录级别的最大数目(默认:no limit)
    followLinks 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true)
    relative 当为true时,返回的路径相对于最顶部的常见目录(默认:false)
    checkIfExists 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false)

    Note
    可以使用list作为参数指定多个路径或glob模式:

    Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )
    

    (需要0.31.x或更高版本)

    fromFilePairs

    fromFilePairs方法创建一个信道,发出匹配用户提供的glob模式的文件对。匹配的文件作为元组发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:

    Channel
        .fromFilePairs('/my/data/SRR*_{1,2}.fastq')
        .view()
    

    它将产生类似如下的输出:

    [SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
    [SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
    [SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
    [SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
    [SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
    [SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]
    

    Note
    glob模式必须至少包含一个星号通配符。

    另外,也可以实现自定义文件对分组策略,提供一个闭包,给定当前文件作为参数,返回分组键。例如:

    Channel
        .fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
        .view { ext, files -> "Files with the extension $ext are $files" }
    

    可选参数表:

    Name Description
    type 返回的路径类型,file, dir或any(默认:file)
    hidden 当为true时,结果路径中包含隐藏文件(默认:false)
    maxDepth 要访问的目录级别的最大数目(默认:no limit)
    followLinks 当为true时,它遵循目录树遍历期间的符号链接,否则它们将作为文件管理(默认:true)
    size 定义每个发出的项期望保存的文件数量(默认值:2)
    flat 当为true时,匹配的文件将作为发出的元组中的唯一元素生成(默认为false)
    checkIfExists 当为true时,抛出文件系统中不存在指定路径的异常(默认值:false)

    fromSRA

    fromSRA方法查询NCBI SRA数据库,并返回一个发送符合指定条件(即项目或登录号)的FASTQ文件的信道。例如:

    Channel
        .fromSRA('SRP043510')
        .view()
    

    返回值:

    [SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
    [SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
    [SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
    [SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
    [SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
    [SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
    (remaining omitted)
    

    可以使用列表对象指定多个登录id:

    ids = ['ERR908507', 'ERR908506', 'ERR908505']
    Channel
        .fromSRA(ids)
        .view()
    

    Note
    隐式管理的读对作为文件列表返回。

    Tip
    在幕后它使用了NCBI搜索API,因此fromSRA方法允许使用该API支持的任何查询词。

    可选参数表:

    Name Description
    apiKey NCBI用户API密钥
    cache 启用/禁用缓存API请求(默认:true)
    max 可以重试的最大条目数(默认:unlimited)

    要访问NCBI搜索服务,需要提供以下两种API密钥:

    • 使用apiKey可选参数,例如Channel.fromSRA(ids, apiKey:'0123456789abcdef')。
    • 在您的环境中导出NCBI_API_KEY变量,例如export NCBI_API_KEY=0123456789abcdef。

    Note
    该特性需要Nextflow版本19.04.0或更高版本。

    watchPath

    watchPath方法监视与指定模式匹配的一个或多个文件的文件夹。只要有满足指定条件的文件,它就通过watchPath方法返回的信道发出。要监视的文件条件可以使用*或?通配符,即通过指定全局路径匹配条件。

    Channel
       .watchPath( '/path/*.fa' )
       .subscribe { println "Fasta file: $it" }
    

    默认情况下,它只监视在指定文件夹中创建的新文件。还可以提供第二个参数,指定要监视的事件。支持的事件有:

    Name Description
    create 创建一个新文件(默认)
    modify 文件被修改
    delete 文件被删除

    可以使用逗号分隔的字符串指定多个事件,如下所示:

    Channel
       .watchPath( '/path/*.fa', 'create,modify' )
       .subscribe { println "File created or modified: $it" }
    

    Warning
    watchPath工厂无休止地等待与指定模式和事件匹配的文件。因此,无论何时在脚本中使用它,生成的管道都不会结束。

    See also: fromPath factory method.

    empty

    根据定义,empty工厂方法创建一个不发出任何值的通道。

    See also: ifEmpty and close operators.


    Binding values

    因为在Nextflow中,信道是使用数据流变量或队列实现的。因此,发送消息相当于将一个值绑定到表示通信信道的对象上。

    bind

    信道对象提供了bind()方法,这是通过信道发送消息的基本操作。例如:

    myChannel = Channel.create()
    myChannel.bind( 'Hello world' )
    

    operator <<

    操作符<<只是bind方法的语法糖。因此,下面的例子产生了与前一个相同的结果:

    myChannel = Channel.create()
    myChannel << 'Hello world'
    

    Observing events

    subscribe

    订阅方法允许您在每次源信道发出新值时执行用户定义的函数。

    发出的值隐式地传递给指定的函数。例如:

    // 定义一个发送三个值的信道
    source = Channel.from ( 'alpha', 'beta', 'delta' )
    
    // 向输出发出值的信道道subscribe 一个函数
    source.subscribe {  println "Got: $it"  }
    
    // 输出
    Got: alpha
    Got: beta
    Got: delta
    

    Note
    形式上,用户定义的函数是一个Closure,它是由Groovy编程语言定义的,Nextflow脚本基于Groovy编程语言。

    如果需要,可以显式定义闭包参数,使用it以外的名称,并可选地指定期望的值类型,如下面的示例所示:

    Channel
        .from( 'alpha', 'beta', 'lambda' )
        .subscribe { String str ->
            println "Got: ${str}; len: ${str.size()}"
         }
    
    // results
    Got: alpha; len: 5
    Got: beta; len: 4
    Got: lambda; len: 6
    

    onNext, onComplete, and onError

    subscribe方法可以接受以下事件处理程序中的一个或多个:

    • onNext: 注册一个函数,该函数在通道发出值时被调用。这与使用上面示例中描述的带有plain闭包的subscribe是一样的。
    • onComplete: 注册一个在通道发出最后一个值之后调用的函数。
    • onError: 注册一个在处理onNext事件时引发异常时调用的函数。它不会进一步调用onNext或onComplete。onError方法将导致错误的Throwable作为参数。
    Channel
        .from( 1, 2, 3 )
        .subscribe onNext: { println it }, onComplete: { println 'Done' }
    
    // 输出
    1
    2
    3
    Done
    

    相关文章

      网友评论

          本文标题:06-Nextflow Channels

          本文链接:https://www.haomeiwen.com/subject/gxupwltx.html