流水线编程,场景适合的话能够让我们的代码结构更加清晰
还有更多使用方式,可以直接参考go-zero文档 https://github.com/tal-tech/zero-doc/blob/main/doc/fx.md
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/tal-tech/go-zero/core/fx"
)
func main() {
ch := make(chan int)
go inputStream(ch)
go outputStream(ch)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
<-c
}
func inputStream(ch chan int) {
count := 0
for {
ch <- count
time.Sleep(time.Millisecond * 500)
count++
}
}
func outputStream(ch chan int) {
fx.From(func(source chan<- interface{}) { //生产资源
for c := range ch {
source <- c
}
}).Walk(func(item interface{}, pipe chan<- interface{}) { //并发处理上一步生产的资源并写入pipe给下游继续加工(并发处理资源的work个数可以通过配置控制)
count := item.(int)
pipe <- count
}).Filter(func(item interface{}) bool { //串行的过滤资源
itemInt := item.(int)
if itemInt%2 == 0 {
return true
}
return false
}).ForEach(func(item interface{}) { //串行的收集结果资源
fmt.Println(item)
})
}
网友评论