美文网首页
channel使用案例--数据流操控

channel使用案例--数据流操控

作者: golang推广大使 | 来源:发表于2019-03-13 23:35 被阅读0次

    这部分将介绍一些使用channel进行数据流操控的案例。
    整体来说,一个数据流应用包含许多模块。不同的模块做不同的工作。每个模块有一个或多个worker,这些worker并发的做同样的工作。下面是实践中的模块工作清单:

    • 数据生成、搜集,加载
    • 数据服务或保存
    • 数据计算或分析
    • 数据验证和过滤
    • 数据聚会和切分
    • 数据组合和解耦
    • 数据去重或扩散
      一个worker可能从多个其他模块接受数据,并把数据作为输出发送给其他模块。换句话说,一个模块既可以是数据生产者,也可以是消费者。一个只给其他模块发送数据的模块称为producer-only模块。反之,是consumer-only模块。
      许多模块一起构成一个数据流系统。
      下面将展示一些数据流模块的实现。这些实现被用于说明目的,所以他们很简单,但是不高效。

    数据生成,搜集和加载

    有各种各样的producer-only模块。一个producer-only模块可以通过下面的方式生成数据:

    • 加载文件,读取数据库,爬web
    • 从软件系统或各种硬件中搜集度量数据
    • 通过生成随机数
    • 等等其他方式。
      在这里,我们使用随机数生成器作为示例。生成器函数返回一个结果但不带参数。
    import (
        "crypto/rand"
        "encoding/binary"
    )
    
    func RandomGenerator() <-chan uint64 {
        c := make(chan uint64)
        go func() {
            rnds := make([]byte, 8)
            for {
                _, err := rand.Read(rnds)
                if err != nil {
                    close(c)
                }
                c <- binary.BigEndian.Uint64(rnds)
            }
        }()
        return c
    }
    

    事实上,随机数生成器是一个多返回值的future/promise.
    一个数据生产商随时可以关闭输出的channel来结束数据生产。

    数据聚合

    数据聚合模块工作者将相同数据类型的若干数据流聚合到一个流中。
    假设数据类型为int64,以下函数将任意数量的数据流聚合为一个。

    func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
        output := make(chan uint64)
        for _, in := range inputs {
            in := in // this line is essential
            go func() {
                for {
                    output <- <-in // <=> output <- (<-in)
                }
            }()
        }
        return output
    }
    

    更好的实现应该考虑输入流是否已关闭。 (也适用于以下其他模块工作者实现。)

    func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
        output := make(chan uint64)
        var wg sync.WaitGroup
        for _, in := range inputs {
            wg.Add(1)
            in := in // this line is essential
            go func() {
                for {
                    x, ok := <-in
                    if ok {
                        output <- x
                    } else {
                        wg.Done()
                    }
                }
            }()
        }
        go func() {
            wg.Wait()
            close(output)
        }()
        return output
    }
    

    如果聚合数据流的数量非常小(两个或三个),我们可以使用select块来聚合这些数据流。

    // Assume the number of input stream is two.
    ...
        output := make(chan uint64)
        go func() {
            inA, inB := inputs[0], inputs[1]
            for {
                select {
                case v := <- inA: output <- v
                case v := <- inB: output <- v
                }
            }
        }
    ...
    

    数据分割

    数据分割模块工作者与数据聚合模块工作者相反。实施分工很容易,但在实践中,分工不是很有用,很少使用。

    func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
        for _, out := range outputs {
            out := out // this line is essential
            go func() {
                for {
                    out <- <-input // <=> out <- (<-input)
                }
            }()
        }
    }
    

    数据组合

    数据组合工作者将来自不同输入数据流的若干数据合并为一个数据。
    以下是组合工作者示例,其中来自一个流的两个uint64值和来自另一个流的一个uint64值组成一个新的uint64值。当然,这些流通道元件类型在实践中通常是不同的

    func Composor(inA <-chan uint64, inB <-chan uint64) <-chan uint64 {
        output := make(chan uint64)
        go func() {
            for {
                a1, b, a2 := <-inA, <-inB, <-inA
                output <- a1 ^ b & a2
            }
        }()
        return output
    }
    

    数据重复/扩散

    数据复制(扩散)可视为特殊数据分解。将复制一个数据,并将每个复制数据发送到不同的输出数据流。

    func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
        outA, outB := make(chan uint64), make(chan uint64)
        go func() {
            for {
                x := <-in
                outA <- x
                outB <- x
            }
        }()
        return outA, outB
    }
    

    数据计算/分析

    数据计算和分析模块的功能变化很大,每个都非常具体。通常,这种模块的工作者功能将每条输入数据转换成另一条输出数据。
    出于简单的演示目的,这里显示了一个工作器示例,它反转每个传输的uint64值的每个位.

    func Calculator(input <-chan uint64, output chan uint64) (<-chan uint64) {
        if output == nil {
            output = make(chan uint64)
        }
        go func() {
            for {
                x := <-input
                output <- ^x
            }
        }()
        return output
    }
    

    数据验证/过滤

    数据验证或过滤模块丢弃流中的一些传输数据。例如,以下工作函数会丢弃所有非素数。

    import "math/big"
    
    func Filter(input <-chan uint64, output chan uint64) <-chan uint64 {
        if output == nil {
            output = make(chan uint64)
        }
        go func() {
            bigInt := big.NewInt(0)
            for {
                x := <-input
                bigInt.SetUint64(x)
                if bigInt.ProbablyPrime(1) {
                    output <- x
                }
            }
        }()
        return output
    }
    

    数据服务/保存

    通常,数据服务或保存模块是数据流系统中的最后或最终输出模块。这里只提供一个简单的工作器,它打印从输入流接收的每个数据。

    import "fmt"
    
    func Printer(input <-chan uint64) {
        for {
            x, ok := <-input
            if ok {
                fmt.Println(x)
            } else {
                return
            }
        }
    }
    

    数据流系统组装

    现在,让我们使用上面的模块工作器函数来组装几个数据流系统。组装数据流系统只是为了创建一些不同模块的工作者,并为每个工作者指定输入流。
    数据流系统示例1(线性管道):

    package main
    
    ... // the worker functions declared above.
    
    func main() {
        Printer(
            Filter(
                Calculator(
                    RandomGenerator(),
                ),
            ),
        )
    }
    

    相关文章

      网友评论

          本文标题:channel使用案例--数据流操控

          本文链接:https://www.haomeiwen.com/subject/mboxmqtx.html