![](https://img.haomeiwen.com/i11438314/b7a09e6e8c7401d3.png)
即多个函数同时从同一个channel里读取数据。直至channel被关闭
可以更好的利用多核
这里演示案例是从mysql一次读取1000条并写入csv文件
传统做法是读取和写入是串行,即,读出1000条,写入1000条,
管道模式可以实现一边读取,一边写入csv,二者可以同时进行
而且还可对管道模型进行改造,实现多个写入函数同时执行,
当然这里也可以写入redis等等
第一步
定义数据类型
1、入参(也就是管道连接点)
#数据类型可以自定义
type InChan chan *BookList
2、定义结果集
type Result struct{
Page int
Err error
}
3、管道数据输出
type OutChan chan *Result
4、这里读取是数据类型是书籍数据
type Book struct {
BookId int `gorm:"column:book_id"`
BookName string `gorm:"column:book_name"`
BookPress string `gorm:"column:book_press"`
}
type BookList struct {
Data []*Book
Page int
}
第二步:定义管道命令类型
type DataCmd func() InChan
type DataPipeCmd func(in InChan) OutChan
第三步:管道函数
//管道函数,cs 可多路复用,多个相同函数并行操作
func Pipe(c1 DataCmd,cs ...DataPipeCmd)OutChan{
in:=c1() //c1是个普通函数
out:=make(OutChan)
wg:=sync.WaitGroup{}
//cs是多个相同的执行函数
for _,c:=range cs{
wg.Add(1)
getChan:=c(in)
//合并输出out
go func(input OutChan) {
defer wg.Done()
for v:=range input{
out<-v
}
}(getChan)
}
go func() {
defer close(out)
wg.Wait()
}()
return out//有点类似grpc的流模式
}
普通函数 从数据库读取数据 一次1000条存入InChan
const sql = "select * from books order by book_id limit ? offset ? "
func ReadData() InChan {
page:= 1
pagesize:= 1000
in:= make(InChan)
go func() {
defer close(in)
for{
booklist:= &BookList{make([]*Book,0),page}
db:= AppInit.GetDB().Raw(sql,pagesize,(page-1)*pagesize).Find(&booklist.Data)
if db.Error!= nil || db.RowsAffected == 0{
break
}
in<-booklist
page++
}
}()
return in
}
执行函数 从InChan 读取数据写入csv
func WriteData(in InChan)OutChan{
out :=make(OutChan)
go func() {
defer close(out)
for d:=range in{
out<- &Result{Page: d.Page,Err:SaveData(d)}
}
}()
return out
}
写入csv
//写入csv
func SaveData(data *BookList)error{
time.Sleep(time.Millisecond * 500)
file:=fmt.Sprintf("./pipeline/csv/%d.csv", data.Page)
csvFile,err:=os.OpenFile(file,os.O_RDWR|os.O_CREATE|os.O_TRUNC,0666)
if err !=nil{
return err
}
defer csvFile.Close()
w:=csv.NewWriter(csvFile)
header:=[]string{"book_id","book_name","book_press"}
export:=[][]string{
header,
}
for _,d:=range data.Data{
cnt:=[]string{
strconv.Itoa(d.BookId),
d.BookName,
d.BookPress,
}
export = append(export,cnt)
}
err = w.WriteAll(export)
if err !=nil{
return err
}
w.Flush()
return nil
}
测试函数
func Test(){
start:=time.Now().Unix()
out:=Pipe(ReadData,WriteData,WriteData,WriteData,WriteData)
for o:=range out{
fmt.Printf("文件执行完成 %d ,错误:%v \n",o.Page,o.Err)
}
end:=time.Now().Unix()
fmt.Printf("测试--用时:%d\r\n",end-start)
}
执行结果
![](https://img.haomeiwen.com/i11438314/ac6d13313accb24a.gif)
网友评论