NextFlow用法4--Channel (1)

作者: 京古 | 来源:发表于2019-06-12 21:51 被阅读1次

    Nextflow基于Dataflow编程模型,其中进程通过通道(channel )进行通信。

    channel 有两个主要属性:

    (1)发送消息是一种异步操作,它立即完成,而不必等待接收过程。

    (2)接收数据是一种阻止操作,它会在消息到达之前停止接收过程。

    1.1 channel 类型

    Nextflow区分两种不同的channel :队列通道(queue channel) 和值通道(value channel)。

    1.1.1 队列 channel

    队列channel 是连接两个 processes or operators的非阻挡单向FIFO队列。

    队列channel 通常使用方法(如 from ,fromPath 等)来创建,或者结合使用channel 运算符(如 map , flatMap 等)。

    队列channel 也可以通过process 的输出声明使用into子句创建。

    注意:

    (1)相同的队列channel 不能2次和2次以上用作process输出,也不能2次和2次以上用作process输入。

    (2) 如果需要将一个process的输出channel 连接到多个process or operator,使用into运算符创建这个channel 的两个(或更多)副本,并使用每个副本channel连接单独的process。

    1.1.2 channel

    根据定义,值channel (也称为单channel )是单个变量值绑定的,并且可以无限次读取和引用。

    注意

    值channel 的内容可以多次用于process的input。

    使用 value factory method方法或返回单个值的运算符创建值channel ,例如 first ,last, collect,count,min,max,reduce,sum。

    注意:

    当在input的from 中指定简单值时,process 将隐式创建值channel 。此外,当一个process的inputs只是value channels时,也将隐式地为该channel创建一个output的value channel 。

    例如:

    process foo {
    input:
    val x from 1
    output:
    file 'x.txt' into result
    
    """  echo $x > x.txt  """}
    

    上面代码段中的process 声明了一个input,是一个value channel。因此,result中的output的channel也是一个value channel,可以被多个process读取。

    1.2 channel factory

    channel 可以由process 的output声明隐式创建,也可以使用以下channel 工厂方法显式创建。

    可用的方法是:

    · create

    · empty

    · from

    · fromPath

    · fromFilePairs

    · fromSRA

    · value

    · watchPath

    1.2.1 create

    使用create方法创建新channel ,如下所示:

    channelObj = Channel.create()
    

    1.2.2 from

    用 from方法创建channel,该channel可以输出from方法中指定的参数的内容,例如:

    ch = Channel.from( 1, 3, 5, 7 )
    ch.subscribe { println "value: $it" }
    

    此示例中的第一行创建了一个包含channel 对象的ch变量。此channel 输出的值被指定为from方法的参数。因此第二行将打印以下内容:

    value: 1
    value: 3
    value: 5
    value: 7
    

    以下示例显示如何从一系列数字或字符串创建channel :

    zeroToNine = Channel.from( 0..9 )
    strings = Channel.from( 'A'..'Z' )
    

    注意:Note that when the from argument is an object implementing the (Java) Collection interface, the resulting channel emits the collection entries as individual emissions.

    因此,以下两个声明生成的结果是一样的:

    Channel.from( 1, 3, 5, 7, 9 )
    Channel.from( [1, 3, 5, 7, 9] )
    

    但是,当提供不止一个参数时,它们总是作为单个内容进行释放。因此,以下示例创建的是一个释放3次内容的channel ,每次的内容是包含两个元素的列表:

    Channel.from( [1, 2], [5,6], [7,9] )
    

    1.2.3 value

    value 方法用于创建一个值channel。传入参数可以将channel与参数值绑定。例如:

    expl1 = Channel.value()
    expl2 = Channel.value( 'Hello there' )
    expl3 = Channel.value( [1,2,3,4,5] )
    

    示例中的第一行创建一个“空”变量。第二行创建一个channel 并将字符串绑定到channel 。最后一个创建一个channel 并将一个列表对象绑定到channel ,该list将作为一个整体进行释放。

    1.2.4 fromPath

    可以使用该fromPath方法创建channel,该channel可以发出一个或多个文件的路径,只需将路径字符串指定为参数。例如:

    myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )
    

    上面的行创建了一个channel ,并将一个文件的Path项绑定到该channel

    注意:它不检查文件是否存在,如果文件不存在可能会出错。

    每当fromPath参数包含一个*或?通配符时,它都被解释为glob路径匹配器。例如:

    myFileChannel = Channel.fromPath( '/data/big/*.txt' )
    

    此示例创建一个channel ,并释放 /data/big 路径下所有以 .txt 文件名结果的文件。
    注意:
    两个星号,即 ** 类似 * 但跨越目录边界。此语法通常用于匹配完整路径。圆括号指定子模式的集合。
    例如:

    files = Channel.fromPath( 'data/**.fa' )
    moreFiles = Channel.fromPath( 'data/**/*.fa' )
    pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )
    

    第一行返回一个channel ,该channel 发出data文件夹中以后缀.fa结尾的文件,并递归其所有子文件夹。而第二个只释放 /data/ 所有子文件夹中以 .fa 结果的文件。最后一个示例释放两个文件:data/file_1.fq和data/file_2.fq。
    注意:与在Linux Bash中一样,*通配符不匹配隐藏文件(即名称以.字符开头的文件)。

    要包含隐藏文件,您需要使用句点字符启动模式或指定选项。例如:hidden: true

    expl1 = Channel.fromPath( '/path/.*' )
    expl2 = Channel.fromPath( '/path/.*.fa' )
    expl3 = Channel.fromPath( '/path/*', hidden: true )
    

    第一个示例返回指定路径中的所有隐藏文件。第二个返回以.fa后缀结尾的所有隐藏文件。最后,最后一个示例返回该路径中的所有文件(隐藏和非隐藏)。

    默认情况下,glob模式仅查找与指定条件匹配的常规文件路径,即它不会返回目录路径。

    您可以使用type指定值的参数file,dir或者any定义所需的路径类型。例如:

    myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
    myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )
    

    第一个示例将返回以b为后缀结尾的所有目录路径,而第二个示例将返回以a前缀开头的任何文件和目录。
    Name 描述
    glob 当true时,可以把 *,?,[]和{}作为通配符,否则处理它们的正常字符(默认值:true)
    type 待返回路径的类型,file,dir或any(默认值:file)
    hidden 当true时可以查找隐藏文件(默认值:false)
    maxDepth 要访问的最大目录级别数(默认值:无限制)
    followLinks 当true它可以在目录树的遍历链接,否则会被作为文件处理(默认:true)
    relative 当true时,返回的路径是相对于最顶层的公共目录的相对路径(默认值:false)
    checkIfExists 当true时,如果指定路径不在文件中,会抛出异常(默认值:false)

    注意:可以使用列表作为参数指定多个路径的glob模式:

    Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )
    

    1.2.5 fromFilePairs

    该fromFilePairs方法创建的channel,可以释放与用户提供的glob模式匹配的文件对的元组对象。在该元组中,第一个元素是匹配文件对的共有名称,第二个元素是文件列表(按字典顺序排序)。例如:

    Channel
    .fromFilePairs('/my/data/SRR*_{1,2}.fastq')
    .println()
    

    它将产生类似于以下的输出:

    [SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
    [SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
    [SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
    [SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
    [SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
    [SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]
    

    注意:glob模式必须至少包含一个星形通配符。

    1.2.6 fromSRA

    该fromSRA方法查询NCBI SRA数据库,并返回符合指定标准(即项目或登录号)的FASTQ文件的channel 。例如:

    Channel
    .fromSRA('SRP043510')
    .println()
    
    #结果:
    [SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
    [SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
    [SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
    [SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
    [SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
    [SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
    (remaining omitted)
    

    可以使用列表对象指定多个ID:

    ids = ['ERR908507', 'ERR908506', 'ERR908505']Channel
    .fromSRA(ids)
    .println()
    #结果:
    [ERR908507,
    [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz,
    ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
    [ERR908506,
    [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz, 
    ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
    [ERR908505,
    [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz,
    ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]
    

    可用参数表:
    Name : 描述
    apiKey : NCBI用户API密钥。
    cache : 启用/禁用缓存API请求(默认值:true)
    max : 可以重试的最大条目数(默认值:无限制)。

    要访问NCBI搜索服务,应提供NCBI API密钥

    (1)使用apiKey可选参数,例如。Channel.fromSRA(ids, apiKey:'0123456789abcdef')
    (2)在您的环境中导出变量NCBI_API_KEY,例如。export NCBI_API_KEY=0123456789abcdef

    注意:此功能需要Nextflow版本19.04.0或更高版本。

    1.2.7 watchPath

    该watchPath方法监视文件夹以查找与指定模式匹配的一个或多个文件。只要存在满足指定条件的文件,就会通过该watchPath 方法返回的channel 发出该文件。监视条件可以通过使用*或?通配符来指定,即通过指定glob路径匹配条件。

    例如:

    Channel
    .watchPath( '/path/*.fa' )
    .subscribe { println "Fasta file: $it" }
    

    默认情况下,它仅监视在指定文件夹中创建的新文件。此外,可以提供第二个参数来指定要监视的事件。支持的事件是:
    Name 描述
    create 创建了一个新文件(默认)
    modify 文件被修改
    delete 文件被删除

    可以使用逗号分隔的字符串指定多个这些事件,如下所示:

    Channel
    .watchPath( '/path/*.fa', 'create,modify' )
    .subscribe { println "File created or modified: $it" }
    

    警告:该watchPath方法无休止地执行匹配符合指定模式(一个或多个)的文件。因此,无论何时在脚本中使用它,生成的管道都将永远不会完成。

    1.2.8 empty

    根据empty定义,该方法创建一个空channel ,不发出任何值。

    相关文章

      网友评论

        本文标题:NextFlow用法4--Channel (1)

        本文链接:https://www.haomeiwen.com/subject/kevuxctx.html