管道模式
一、类Unix串行管道:使用通道实现串行管道功能
我们在使用类Unix系统时常常用到管道命令,如"ls |grep 'path/to' ",它可以让数据在多个命令操作中串行处理。Go的通道也可以做到如此,利用通道通信的特性我们可以创建多个连续通道,让一个函数的输出作为另一个函数的输入,而另一个函数的输出也可以作为其他函数的输入。
Go标准库中的io.Pipe()可以创建类Unix风格管道,它适合纯粹的IO系统原语的管道操作。然而go语言原语中的通道也可以做到类似的操作,以下是Go并发编程的范式之一,可以普适到更多的应用场景。
// 管道过滤器范式
func Demo() {
a(b(c(source("source1", "source2", "source3"))))
}
func a(in <-chan string) {
for i := range in {
fmt.Println("a" + i)
}
}
func b(in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
defer close(out)
for i := range in {
out <- "b" + i
}
}()
return out
}
func c(in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
defer close(out)
for i := range in {
out <- "c" + i
}
}()
return out
}
// 管道输入源
func source(inputs ...string) <-chan string {
out := make(chan string, len(inputs))
go func() {
defer close(out)
for _, item := range inputs {
out <- item
fmt.Println("source input:", item)
}
}()
return out
}
运行输出:
=== RUN TestDemo21
source input: source1
source input: source2
source input: source3
abcsource1
abcsource2
abcsource3
--- PASS: TestDemo21 (0.00s)
PASS
二、构建管道最佳实践
以上为最简单的模拟管道,但仔细一看还是存在问题的,我们在《防止Goroutine泄露》中讨论过,管道每个阶段都有可能出现协程泄露的风险,我们可以引入done管道解决这个问题,下面看一个比较有实质性的例子:通过一个生成器产生一些值并进入管道运算,最后输出值。
// 生成器
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
// 乘法阶段
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
// 加法阶段
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
// 防止协程泄露的done管道
done := make(chan interface{})
defer close(done)
// 生成一些值,并进行管道运算
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
// 输出管道运算结果
for v := range pipeline {
fmt.Println(v)
}
在这个管道模式中,我们看到两件事:
- 在管道的末尾,可以使用range语句来提取值;
- 在每个阶段可以安全地并发执行,因为输入和输出在并发上下文中是安全的。
看到关闭done通道是如何影响到管道的了么?这是通过管道每个阶段的两件事情实现的:
- 对传入的频道进行遍历。当输入通道关闭时,遍历操作将退出。
- 发送操作与done通道共享select语句。
无论流水线阶段处于等待数据通道的状态,还是处在等待发送通道关闭的状态,都会强制管道各阶段终止。这里有一个复发关系。在管道开始时,我们已经确定必须将传入的切片值转换为通道。在这个过程中有两点必须是可抢占的:
- 在生成器通道上创建值。
- 在其频道上发送离散值。
在管道开始和结束之间,代码总是在一个通道上遍历,并在包含done通道的select语句内的另一个通道上发送。如果某个阶段在传入通道检索到值时被阻塞,则该通道关闭时它将变为未阻塞状态。 如果某个阶段在发送值时被阻塞,则由于select语句而可抢占。因此,整个管道始终可以通过关闭done通道来抢占。
三、生成器模式:
在上面的generator中我们看到一个简单的生成器,咋一看还比较死板,我们可以利用通道构建一个可获取特定重复值的生成器。
以下函数会重复你传给它的值,直到你告诉它停止:
var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
以下函数会从其传入的valueStream中取出第一个元素然后退出:
var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
OK,让我们组合它们使用,看一个用例:
func Demo() {
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
}
在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。
除了生成特定的固定数量的值,我们还可以扩展一下,如果把repeat扩展成repeatFn,我们可以生成任何数据:
var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
继续看用例:
func Demo() {
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
}
网友评论