美文网首页
Go-高并发之管道模型

Go-高并发之管道模型

作者: 洛杉矶银河 | 来源:发表于2020-05-14 01:11 被阅读0次

即多个函数同时从同一个channel里读取数据。直至channel被关闭
可以更好的利用多核
这里演示案例是从mysql一次读取1000条并写入csv文件
传统做法是读取和写入是串行,即,读出1000条,写入1000条,
管道模式可以实现一边读取,一边写入csv,二者可以同时进行
而且还可对管道模型进行改造,实现多个写入函数同时执行,
当然这里也可以写入redis等等

第一步

定义数据类型

1、入参(也就是管道连接点)
#数据类型可以自定义
     type InChan chan *BookList
2、定义结果集
     type Result struct{
         Page int
         Err error
     }
3、管道数据输出
       type OutChan chan *Result
4、这里读取是数据类型是书籍数据
type Book struct {
    BookId int `gorm:"column:book_id"`
    BookName string `gorm:"column:book_name"`
    BookPress string `gorm:"column:book_press"`
}
type BookList struct {
    Data []*Book
    Page int
}

第二步:定义管道命令类型

type DataCmd func() InChan
type DataPipeCmd  func(in InChan) OutChan

第三步:管道函数

//管道函数,cs 可多路复用,多个相同函数并行操作
func Pipe(c1 DataCmd,cs ...DataPipeCmd)OutChan{
    in:=c1()   //c1是个普通函数
    out:=make(OutChan)
    wg:=sync.WaitGroup{}
    //cs是多个相同的执行函数
    for _,c:=range cs{
        wg.Add(1)
        getChan:=c(in)
        //合并输出out
        go func(input OutChan) {
            defer  wg.Done()
            for v:=range input{
                out<-v
            }
        }(getChan)
    }

    go func() {
        defer close(out)
        wg.Wait()
    }()
    return out//有点类似grpc的流模式
}

普通函数 从数据库读取数据 一次1000条存入InChan

const sql = "select * from books order by book_id limit ? offset ? "

func ReadData() InChan  {
    page:= 1
    pagesize:= 1000
    in:= make(InChan)
    go func() {
        defer close(in)
        for{
            booklist:= &BookList{make([]*Book,0),page}
            db:= AppInit.GetDB().Raw(sql,pagesize,(page-1)*pagesize).Find(&booklist.Data)
            if db.Error!= nil || db.RowsAffected == 0{
                break
            }
            in<-booklist
            page++
        }
    }()
    return  in
}

执行函数 从InChan 读取数据写入csv

func WriteData(in InChan)OutChan{
    out :=make(OutChan)
    go func() {
        defer close(out)
        for d:=range in{
            out<- &Result{Page: d.Page,Err:SaveData(d)}
        }
    }()
    return out
}

写入csv

//写入csv
func SaveData(data *BookList)error{
    time.Sleep(time.Millisecond * 500)
    file:=fmt.Sprintf("./pipeline/csv/%d.csv", data.Page)
    csvFile,err:=os.OpenFile(file,os.O_RDWR|os.O_CREATE|os.O_TRUNC,0666)
    if err !=nil{
        return err
    }
    defer csvFile.Close()
    w:=csv.NewWriter(csvFile)

    header:=[]string{"book_id","book_name","book_press"}
    export:=[][]string{
        header,
    }
    for _,d:=range data.Data{
        cnt:=[]string{
            strconv.Itoa(d.BookId),
            d.BookName,
            d.BookPress,
        }
        export = append(export,cnt)
    }
    err = w.WriteAll(export)
    if err !=nil{
        return err
    }
    w.Flush()
    return nil
}

测试函数

func Test(){
    start:=time.Now().Unix()
    out:=Pipe(ReadData,WriteData,WriteData,WriteData,WriteData)
    for o:=range out{
        fmt.Printf("文件执行完成 %d ,错误:%v \n",o.Page,o.Err)
    }
    end:=time.Now().Unix()
    fmt.Printf("测试--用时:%d\r\n",end-start)
}

执行结果

执行结果.gif

相关文章

网友评论

      本文标题:Go-高并发之管道模型

      本文链接:https://www.haomeiwen.com/subject/paganhtx.html