go 流处理器思路

作者: Best博客 | 来源:发表于2021-01-28 15:32 被阅读0次

    流水线编程,场景适合的话能够让我们的代码结构更加清晰

    还有更多使用方式,可以直接参考go-zero文档 https://github.com/tal-tech/zero-doc/blob/main/doc/fx.md

    
    package main
    
    import (
        "fmt"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "github.com/tal-tech/go-zero/core/fx"
    )
    
    func main() {
        ch := make(chan int)
    
        go inputStream(ch)
        go outputStream(ch)
    
        c := make(chan os.Signal, 1)
        signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
        <-c
    }
    
    func inputStream(ch chan int) {
        count := 0
        for {
            ch <- count
            time.Sleep(time.Millisecond * 500)
            count++
        }
    }
    
    func outputStream(ch chan int) {
        fx.From(func(source chan<- interface{}) {  //生产资源
            for c := range ch {
                source <- c
            }
        }).Walk(func(item interface{}, pipe chan<- interface{}) {  //并发处理上一步生产的资源并写入pipe给下游继续加工(并发处理资源的work个数可以通过配置控制)
            count := item.(int)
            pipe <- count
        }).Filter(func(item interface{}) bool {  //串行的过滤资源
            itemInt := item.(int)
            if itemInt%2 == 0 {
                return true
            }
            return false
        }).ForEach(func(item interface{}) {  //串行的收集结果资源
            fmt.Println(item)
        })
    }
    
    

    相关文章

      网友评论

        本文标题:go 流处理器思路

        本文链接:https://www.haomeiwen.com/subject/dwtxtltx.html