美文网首页
流水线设计模式的Go实现

流水线设计模式的Go实现

作者: 山中散人的博客 | 来源:发表于2019-05-11 17:21 被阅读0次

    Go提供了对多线程编程的原生支持,用Go编写多线程程序和用C编写单线程程序一样自然。下面我们在Go中实践一下经典的多线程设计模式之一,流水线模式(Pipeline)。目标是实现一个文件过滤器,将输入文件按照文件后缀和文件大小过滤后显示。

    在开始工作之前,我们先学习一下Go中的数据队列(channel),channel是Go中引入的用于同步访问的数据类型。当多个线程(routine)要共享数据时,传统上我们需要加锁以保证数据访问是“原子”的,不出现“竞争”情形,也就是要将潜在的并发访问串行化。但是channel本身可以保证访问的串行性,不需要额外加锁。同时,channel的使用类似于管道(Pipe),通常从一端送入数据(生产者),从另一端拿到数据(消费者),这样,当生产者需要等待消费者(或者反之)时,channel可以实现阻塞等待。总而言之,channel在多数场合可以替代"加锁数据"和“同步事件”的概念,同时使得代码逻辑更加简单直观。

    首先导入依赖的库

    import (
        "flag"
        "fmt"
        "log"
        "os"
        "path/filepath"
        "runtime"
        "strings"
    )
    

    解析输入命令行参数的部分按下不表,以下是主线程中实现功能的部分,首先是调用source,将待分析的文件files分发到 channel1中,然后在filterSuffixes中启动一个线程(go routine),将后缀不匹配的文件去除,再在filterSize中启动一个routine,将大小不匹配的文件滤除,最后在主routine中将滤除剩下的文件信息显示。

            channel1 := source(files) //input file names
            channel2 := filterSuffixes(suffixes, channel1)
            channel3 := filterSize(minSize, maxSize, channel2)
            sink(channel3)
    

    首先是source()的实现,顺便介绍一下channel的使用。source()将文件名一一送入channel中,完成后关闭channel,channel被关闭后,channel另一端的routine不再会为了等待更多数据而阻塞。

    func source(files []string) <-chan string {
        out := make(chan string, 1000) //make a channel
        go func() {
            //put files into channel
            for _, filename := range files {
                out <- filename
            }
            close(out) //close channel, and no more data would be sent
        }()
        return out //return channel
    }
    

    文件名channel作为输入传入filterSuffixes(),同时传入的还有后缀清单,filterSuffixes()用一个匿名函数作为入口启动routine,将文件名channel中的文件一一比较后缀,进行过滤。

    // make the buffer the same size as for files to maximize throughput
    func filterSuffixes(suffixes []string, in <-chan string) <-chan string {
        out := make(chan string, cap(in))
        go func() {
            for filename := range in {
                if len(suffixes) == 0 {
                    out <- filename
                    continue
                }
    
                //filter file by extension
                ext := strings.ToLower(filepath.Ext(filename))
                for _, suffix := range suffixes {
                    if ext == suffix {
                        out <- filename //save match file
                        break
                    }
                }
            }
            close(out)
        }()
        return out
    }
    

    filterSize()和filterSuffixes()的思路相同,检查输入channel中文件的大小,将满足条件的放入输出channel。

    // make the buffer the same size as for files to maximize throughput
    func filterSize(minimum, maximum int64, in <-chan string) <-chan string {
        out := make(chan string, cap(in))
        go func() {
            for filename := range in {
                if minimum == -1 && maximum == -1 {
                    out <- filename // don't do a stat call it not needed
                    continue
                }
                finfo, err := os.Stat(filename)
                if err != nil {
                    continue // ignore files we can't process
                }
                //filter file by the size
                size := finfo.Size()
                if (minimum == -1 || minimum > -1 && minimum <= size) &&
                    (maximum == -1 || maximum > -1 && maximum >= size) {
                    out <- filename
                }
            }
            close(out)
        }()
        return out
    }
    

    最后在主routine中调用sink()输出文件名channel中的内容,由于channel已经关闭,在输出所有内容后,主routine返回,程序结束。sink()除了显示内容外,还有一个功能,就是让主routine等待其他routine完成。

    func sink(in <-chan string) {
        for filename := range in {
            fmt.Println(filename)
        }
    }
    

    最后简单验证一下代码。

    KevinLiudeMacBook-Air:filter KevinLiu$ ls | ./filter *.go
    para -1 -1 [] [filter.go]
    filter.go
    KevinLiudeMacBook-Air:filter KevinLiu$ ls
    filter      filter.go
    

    这个例子简单地演示了流水线模式,在流水线模式下,source()负责分发工作--需要处理的文件名,filterSuffixes()是流水线上的第一道工序,filterSize()是第二道,流水线上每一道工序都并发执行,每一道工序都会通过channel等待上一道完成。最后,sink()会将结果输出,等待流水线上所有的工作完成。

    本文的例子来源于《Programming in Go》Mark Summerfield,源代码请参考
    [https://github.com/KevinACoder/gobook/blob/master/src/filter/filter.go]

    相关文章

      网友评论

          本文标题:流水线设计模式的Go实现

          本文链接:https://www.haomeiwen.com/subject/eswxaqtx.html