这部分将介绍一些使用channel进行数据流操控的案例。
整体来说,一个数据流应用包含许多模块。不同的模块做不同的工作。每个模块有一个或多个worker,这些worker并发的做同样的工作。下面是实践中的模块工作清单:
- 数据生成、搜集,加载
- 数据服务或保存
- 数据计算或分析
- 数据验证和过滤
- 数据聚会和切分
- 数据组合和解耦
- 数据去重或扩散
一个worker可能从多个其他模块接受数据,并把数据作为输出发送给其他模块。换句话说,一个模块既可以是数据生产者,也可以是消费者。一个只给其他模块发送数据的模块称为producer-only模块。反之,是consumer-only模块。
许多模块一起构成一个数据流系统。
下面将展示一些数据流模块的实现。这些实现被用于说明目的,所以他们很简单,但是不高效。
数据生成,搜集和加载
有各种各样的producer-only模块。一个producer-only模块可以通过下面的方式生成数据:
- 加载文件,读取数据库,爬web
- 从软件系统或各种硬件中搜集度量数据
- 通过生成随机数
- 等等其他方式。
在这里,我们使用随机数生成器作为示例。生成器函数返回一个结果但不带参数。
import (
"crypto/rand"
"encoding/binary"
)
func RandomGenerator() <-chan uint64 {
c := make(chan uint64)
go func() {
rnds := make([]byte, 8)
for {
_, err := rand.Read(rnds)
if err != nil {
close(c)
}
c <- binary.BigEndian.Uint64(rnds)
}
}()
return c
}
事实上,随机数生成器是一个多返回值的future/promise.
一个数据生产商随时可以关闭输出的channel来结束数据生产。
数据聚合
数据聚合模块工作者将相同数据类型的若干数据流聚合到一个流中。
假设数据类型为int64,以下函数将任意数量的数据流聚合为一个。
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
output := make(chan uint64)
for _, in := range inputs {
in := in // this line is essential
go func() {
for {
output <- <-in // <=> output <- (<-in)
}
}()
}
return output
}
更好的实现应该考虑输入流是否已关闭。 (也适用于以下其他模块工作者实现。)
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
output := make(chan uint64)
var wg sync.WaitGroup
for _, in := range inputs {
wg.Add(1)
in := in // this line is essential
go func() {
for {
x, ok := <-in
if ok {
output <- x
} else {
wg.Done()
}
}
}()
}
go func() {
wg.Wait()
close(output)
}()
return output
}
如果聚合数据流的数量非常小(两个或三个),我们可以使用select块来聚合这些数据流。
// Assume the number of input stream is two.
...
output := make(chan uint64)
go func() {
inA, inB := inputs[0], inputs[1]
for {
select {
case v := <- inA: output <- v
case v := <- inB: output <- v
}
}
}
...
数据分割
数据分割模块工作者与数据聚合模块工作者相反。实施分工很容易,但在实践中,分工不是很有用,很少使用。
func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
for _, out := range outputs {
out := out // this line is essential
go func() {
for {
out <- <-input // <=> out <- (<-input)
}
}()
}
}
数据组合
数据组合工作者将来自不同输入数据流的若干数据合并为一个数据。
以下是组合工作者示例,其中来自一个流的两个uint64值和来自另一个流的一个uint64值组成一个新的uint64值。当然,这些流通道元件类型在实践中通常是不同的
func Composor(inA <-chan uint64, inB <-chan uint64) <-chan uint64 {
output := make(chan uint64)
go func() {
for {
a1, b, a2 := <-inA, <-inB, <-inA
output <- a1 ^ b & a2
}
}()
return output
}
数据重复/扩散
数据复制(扩散)可视为特殊数据分解。将复制一个数据,并将每个复制数据发送到不同的输出数据流。
func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
outA, outB := make(chan uint64), make(chan uint64)
go func() {
for {
x := <-in
outA <- x
outB <- x
}
}()
return outA, outB
}
数据计算/分析
数据计算和分析模块的功能变化很大,每个都非常具体。通常,这种模块的工作者功能将每条输入数据转换成另一条输出数据。
出于简单的演示目的,这里显示了一个工作器示例,它反转每个传输的uint64值的每个位.
func Calculator(input <-chan uint64, output chan uint64) (<-chan uint64) {
if output == nil {
output = make(chan uint64)
}
go func() {
for {
x := <-input
output <- ^x
}
}()
return output
}
数据验证/过滤
数据验证或过滤模块丢弃流中的一些传输数据。例如,以下工作函数会丢弃所有非素数。
import "math/big"
func Filter(input <-chan uint64, output chan uint64) <-chan uint64 {
if output == nil {
output = make(chan uint64)
}
go func() {
bigInt := big.NewInt(0)
for {
x := <-input
bigInt.SetUint64(x)
if bigInt.ProbablyPrime(1) {
output <- x
}
}
}()
return output
}
数据服务/保存
通常,数据服务或保存模块是数据流系统中的最后或最终输出模块。这里只提供一个简单的工作器,它打印从输入流接收的每个数据。
import "fmt"
func Printer(input <-chan uint64) {
for {
x, ok := <-input
if ok {
fmt.Println(x)
} else {
return
}
}
}
数据流系统组装
现在,让我们使用上面的模块工作器函数来组装几个数据流系统。组装数据流系统只是为了创建一些不同模块的工作者,并为每个工作者指定输入流。
数据流系统示例1(线性管道):
package main
... // the worker functions declared above.
func main() {
Printer(
Filter(
Calculator(
RandomGenerator(),
),
),
)
}
网友评论