美文网首页Go探险GO相关
【转载】Golang处理大数据时使用高效的Pipeline(流水

【转载】Golang处理大数据时使用高效的Pipeline(流水

作者: 承诺一时的华丽 | 来源:发表于2019-08-06 15:14 被阅读0次

    并发是件技术活

    Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。

    想象这样的应用情景:

    1. 从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);
    2. 根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;
    3. 调用NLP服务(自然语言处理),处理每条评论;
    4. 将处理结果写入数据库C(ElasticSearch)。

    由于应用中遇到的各种问题,归纳出这些需求:
    需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。
    需求二:每个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽可能占用更多资源以提高ETL性能)。例如,步骤(1)-(4)分别设置并发数1、4、8、2。

    这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。

    你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。

    可重用的Pipeline模块

    为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。

    package main
    
    import "sync"
    
    func HasClosed(c <-chan struct{}) bool {
        select {
        case <-c: return true
        default: return false
        }
    }
    
    type SyncFlag interface{
        Wait()
        Chan() <-chan struct{}
        Done() bool
    }
    
    func NewSyncFlag() (done func(), flag SyncFlag) {
        f := &syncFlag{
            c : make(chan struct{}),
        }
        return f.done, f
    }
    
    type syncFlag struct {
        once sync.Once
        c chan struct{}
    }
    
    func (f *syncFlag) done() {
        f.once.Do(func(){
            close(f.c)
        })
    }
    
    func (f *syncFlag) Wait() {
        <-f.c
    }
    
    func (f *syncFlag) Chan() <-chan struct{} {
        return f.c
    }
    
    func (f *syncFlag) Done() bool {
        return HasClosed(f.c)
    }
    
    type pipelineThread struct {
        sigs []chan struct{}
        chanExit chan struct{}
        interrupt SyncFlag
        setInterrupt func()
        err error
    }
    
    func newPipelineThread(l int) *pipelineThread {
        p := &pipelineThread{
            sigs : make([]chan struct{}, l),
            chanExit : make(chan struct{}),
        }
        p.setInterrupt, p.interrupt = NewSyncFlag()
    
        for i := range p.sigs {
            p.sigs[i] = make(chan struct{})
        }
        return p
    }
    
    type Pipeline struct {
        mtx sync.Mutex
        workerChans []chan struct{}
        prevThd *pipelineThread
    }
    
    //创建流水线,参数个数是每个任务的子过程数,每个参数对应子过程的并发度。
    func NewPipeline(workers ...int) *Pipeline {
        if len(workers) < 1 { panic("NewPipeline need aleast one argument") }
    
        workersChan := make([]chan struct{}, len(workers))
        for i := range workersChan {
            workersChan[i] = make(chan struct{}, workers[i])
        }
    
        prevThd := newPipelineThread(len(workers))
        for _,sig := range prevThd.sigs {
            close(sig)
        }
        close(prevThd.chanExit)
    
        return &Pipeline{
            workerChans : workersChan,
            prevThd : prevThd,
        }
    }
    
    //往流水线推入一个任务。如果第一个步骤的并发数达到设定上限,这个函数会堵塞等待。
    //如果流水线中有其它任务失败(返回非nil),任务不被执行,函数返回false。
    func (p *Pipeline) Async(works ...func()error) bool {
        if len(works) != len(p.workerChans) {
            panic("Async: arguments number not matched to NewPipeline(...)")
        }
    
        p.mtx.Lock()
        if p.prevThd.interrupt.Done() {
            p.mtx.Unlock()
            return false
        }
        prevThd := p.prevThd
        thisThd := newPipelineThread(len(p.workerChans))
        p.prevThd = thisThd
        p.mtx.Unlock()
    
        lock := func(idx int) bool {
            select {
            case <-prevThd.interrupt.Chan(): return false
            case <-prevThd.sigs[idx]: //wait for signal
            }
            select {
            case <-prevThd.interrupt.Chan(): return false
            case p.workerChans[idx]<-struct{}{}: //get lock
            }
            return true
        }
        if !lock(0) {
            thisThd.setInterrupt()
            <-prevThd.chanExit
            thisThd.err = prevThd.err
            close(thisThd.chanExit)
            return false
        }
        go func() { //watch interrupt of previous thread
            select {
            case <-prevThd.interrupt.Chan():
                thisThd.setInterrupt()
            case <-thisThd.chanExit:
            }
        }()
        go func() {
            var err error
            for i,work := range works {
                close(thisThd.sigs[i]) //signal next thread
                if work != nil {
                    err = work()
                }
                if err != nil || (i+1 < len(works) && !lock(i+1)) {
                    thisThd.setInterrupt()
                    break
                }
                <-p.workerChans[i] //release lock
            }
    
            <-prevThd.chanExit
            if prevThd.interrupt.Done() {
                thisThd.setInterrupt()
            }
            if prevThd.err != nil {
                thisThd.err = prevThd.err
            } else {
                thisThd.err = err
            }
            close(thisThd.chanExit)
        }()
        return true
    }
    
    //等待流水线中所有任务执行完毕或失败,返回第一个错误,如果无错误则返回nil。
    func (p *Pipeline) Wait() error {
        p.mtx.Lock()
        lastThd := p.prevThd
        p.mtx.Unlock()
        <-lastThd.chanExit
        return lastThd.err
    }
    

    使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:

    package main
    
    import "log"
    
    func main() {
        //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
        checkpoint := loadCheckpoint()
    
        //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
        pipeline := NewPipeline(4, 8, 2, 1) 
        for {
            //(1)
            //加载100条数据,并修改变量checkpoint
            //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
            data, err := extractReviewsFromA(&checkpoint, 100) 
            if err != nil {
                log.Print(err)
                break
            }
    
            //这里有个Golang著名的坑。
            //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
            //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
            curCheckpoint := checkpoint
    
            ok := pipeline.Async(func() error {
                //(2)
                return joinUserFromB(data)
            }, func() error {
                //(3)
                return nlp(data)
            }, func() error {
                //(4)
                return loadDataToC(data)
            }, func() error {
                //(5)保存checkpoint
                log.Print("done:", curCheckpoint)
                return saveCheckpoint(curCheckpoint)
            })
            if !ok { break }
    
            if len(data) < 100 { break } //处理完毕
        }
        err := pipeline.Wait()
        if err != nil { log.Print(err) }
    }
    

    示意图:

    image.png

    每个方格表示一批数据,黄色表示正在执行所属工序,白色表示已经完成工序但堵塞等待中。

    Pipeline的工作方式:

    1. Pipeline分别控制每一个工序的并发数。

      • 如图:(4)的并发数已满,<14>(3)已经完成并堵塞等待(继续占有(3)的并发数),直到<12>(4)完成。
    2. 如果第一个工序的并发数已满,Async会堵塞,直到有线程第一个工序完成。

      • 如图:循环体内的<25>正在等待<21>(2)进入下一个工序。
    3. 每个线程的每个工序的调度,不早于上一个线程同一个工序的调度。

      • 如图:<22>(2)早于<21>(2)完成,<22>须堵塞等待,直到<21>(2)完成。
    4. 如果某个线程的某个工序处理失败(例如数据库故障),那之后的线程都会中止执行,下一次调用Async返回false,pipeline.Wait()返回第一个错误,整个流水线作业可控中断。

      • 例如:<12>(4)失败,那<13>、<14>……无论正在执行到哪一个工序,都不会进入下一个工序而中断。<11>不会受到影响,会一直执行完毕。Wait()等待全部完成或中止,返回<12>(4)的错误。
    5. 无法避免中断过程中有checkpoint后的数据写入。下次重启程序将重新写入、覆盖这些数据。

      • 例如:<12>(4)失败、<13>(4)执行成功(已写入数据),那<12>(5)和<13>(5)都不会被执行,checkpoint的最新状态是<11>写入的,下次重启程序将从<12>开始,<13>的数据会再次写入,所以写入应该按照记录ID作覆盖写入。

    Pipeline解决了这些问题:

    1. 控制每个工序的并发数;
    2. 控制整体并发数,不会因为in fly数据太多无限占用内存。
    3. 任何工序出现故障(数据库操作失败),整个流水线可控中断,不会漏处理任何一批记录,也不会导致太多的重新执行。你也可以随时Ctrl+C、微调代码、重启程序,所有事情都会继续有序执行。
    4. 任何工序发生堵塞(例如数据库缓慢),整个流水线都会慢下来等待,不会强行加塞。
    5. 你可以随意修改每个工序的并发数,直到找到最佳值。

    用channel在上下游间传递数据是件笨拙的事

    如果你刚开始学习Golang,你一定觉得channel这东西好棒。但当你理所当然地用一堆channel来串联一条流水线,就是把自己逼疯的开始。实际上Golang有更棒的东西,我不知道那叫什么,反正你可以在func开启一个goroutine的时候,里面调用外面的变量。

    package main
    
    import (
        "fmt"
        "time"
        "sync"
    )
    
    func main() {
        var wg sync.WaitGroup
        for i := 0 ; i < 10 ; i++ {
            my_var := i * 10
            wg.Add(1)
            go func() {
                defer wg.Done()
                time.Sleep(time.Second)
                fmt.Println(my_var)
            }()
        }
        wg.Wait()
    }
    

    程序会在启动1秒后不按顺序输出0、10、20、…… 90。Runtime创建了10个my_var,每个goroutine各有一个,所以每个goroutine输出不一样的值。

    看起来很简单的东西,实际上是Golang的独有特性,涉及到Go runtime的机制,其他语言不得不定义一个对象来解决类似的问题。当我从C++转Go开发时就惊讶:还有这种操作?

    上面的Pipeline模块利用了这个特性,它根本不需要任何channel来传递数据,使用者在一个在循环体内定义一个变量来储存一整批的数据,在异步的goroutine中读取、修改这些数据。在goroutine间用channel传递数据的思路转变为:每一批数据由一个goroutine处理,多个gouroutine竞争各个工序的并发数。

    来源:https://segmentfault.com/a/1190000014788594

    相关文章

      网友评论

        本文标题:【转载】Golang处理大数据时使用高效的Pipeline(流水

        本文链接:https://www.haomeiwen.com/subject/psxwdctx.html