Go提供了对多线程编程的原生支持,用Go编写多线程程序和用C编写单线程程序一样自然。下面我们在Go中实践一下经典的多线程设计模式之一,流水线模式(Pipeline)。目标是实现一个文件过滤器,将输入文件按照文件后缀和文件大小过滤后显示。
在开始工作之前,我们先学习一下Go中的数据队列(channel),channel是Go中引入的用于同步访问的数据类型。当多个线程(routine)要共享数据时,传统上我们需要加锁以保证数据访问是“原子”的,不出现“竞争”情形,也就是要将潜在的并发访问串行化。但是channel本身可以保证访问的串行性,不需要额外加锁。同时,channel的使用类似于管道(Pipe),通常从一端送入数据(生产者),从另一端拿到数据(消费者),这样,当生产者需要等待消费者(或者反之)时,channel可以实现阻塞等待。总而言之,channel在多数场合可以替代"加锁数据"和“同步事件”的概念,同时使得代码逻辑更加简单直观。
首先导入依赖的库
import (
"flag"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strings"
)
解析输入命令行参数的部分按下不表,以下是主线程中实现功能的部分,首先是调用source,将待分析的文件files分发到 channel1中,然后在filterSuffixes中启动一个线程(go routine),将后缀不匹配的文件去除,再在filterSize中启动一个routine,将大小不匹配的文件滤除,最后在主routine中将滤除剩下的文件信息显示。
channel1 := source(files) //input file names
channel2 := filterSuffixes(suffixes, channel1)
channel3 := filterSize(minSize, maxSize, channel2)
sink(channel3)
首先是source()的实现,顺便介绍一下channel的使用。source()将文件名一一送入channel中,完成后关闭channel,channel被关闭后,channel另一端的routine不再会为了等待更多数据而阻塞。
func source(files []string) <-chan string {
out := make(chan string, 1000) //make a channel
go func() {
//put files into channel
for _, filename := range files {
out <- filename
}
close(out) //close channel, and no more data would be sent
}()
return out //return channel
}
文件名channel作为输入传入filterSuffixes(),同时传入的还有后缀清单,filterSuffixes()用一个匿名函数作为入口启动routine,将文件名channel中的文件一一比较后缀,进行过滤。
// make the buffer the same size as for files to maximize throughput
func filterSuffixes(suffixes []string, in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
for filename := range in {
if len(suffixes) == 0 {
out <- filename
continue
}
//filter file by extension
ext := strings.ToLower(filepath.Ext(filename))
for _, suffix := range suffixes {
if ext == suffix {
out <- filename //save match file
break
}
}
}
close(out)
}()
return out
}
filterSize()和filterSuffixes()的思路相同,检查输入channel中文件的大小,将满足条件的放入输出channel。
// make the buffer the same size as for files to maximize throughput
func filterSize(minimum, maximum int64, in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
for filename := range in {
if minimum == -1 && maximum == -1 {
out <- filename // don't do a stat call it not needed
continue
}
finfo, err := os.Stat(filename)
if err != nil {
continue // ignore files we can't process
}
//filter file by the size
size := finfo.Size()
if (minimum == -1 || minimum > -1 && minimum <= size) &&
(maximum == -1 || maximum > -1 && maximum >= size) {
out <- filename
}
}
close(out)
}()
return out
}
最后在主routine中调用sink()输出文件名channel中的内容,由于channel已经关闭,在输出所有内容后,主routine返回,程序结束。sink()除了显示内容外,还有一个功能,就是让主routine等待其他routine完成。
func sink(in <-chan string) {
for filename := range in {
fmt.Println(filename)
}
}
最后简单验证一下代码。
KevinLiudeMacBook-Air:filter KevinLiu$ ls | ./filter *.go
para -1 -1 [] [filter.go]
filter.go
KevinLiudeMacBook-Air:filter KevinLiu$ ls
filter filter.go
这个例子简单地演示了流水线模式,在流水线模式下,source()负责分发工作--需要处理的文件名,filterSuffixes()是流水线上的第一道工序,filterSize()是第二道,流水线上每一道工序都并发执行,每一道工序都会通过channel等待上一道完成。最后,sink()会将结果输出,等待流水线上所有的工作完成。
本文的例子来源于《Programming in Go》Mark Summerfield,源代码请参考
[https://github.com/KevinACoder/gobook/blob/master/src/filter/filter.go]
网友评论