美文网首页
golang实现协程池

golang实现协程池

作者: 柔软的胖 | 来源:发表于2017-02-16 19:50 被阅读0次

    golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。

    Demo中有worker和job。worker是一个协程,在worker中完成一个job。Jobs是一个channel,使用Jobs记录job。当生成一个新任务,就发送到Jobs中。程序启动时,首先启动3个worker协程,每个协程都尝试从Jobs中接收job。如果Jobs中没有job,worker协程就等待。

    基本逻辑如下:

    1. Jobs管道存放job,Results管道存放结果。
    2. 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
    3. 向Jobs管道中发送3个数据。
    4. 关闭Jobs管道。
    5. worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
    6. 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。

    代码如下:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func worker(id int) {
        go func() {
            for {
                fmt.Println("Waiting for job...")
                select {
                // Receive from channel
                case j := <-Jobs :
                    fmt.Println("worker", id, "started  job", j)
                    time.Sleep(time.Second)
                    fmt.Println("worker", id, "finished job", j)
                    Results <- true
                }
            }
        }()
    }
    
    const channelLength = 3
    
    var (
        Jobs chan int
        Results chan bool
    )
    
    func main() {
        Jobs = make(chan int, channelLength)
        Results = make(chan bool, channelLength)
    
        // Start worker goroutines
        for i:= 0; i < channelLength; i++ {
            worker(i)
        }
    
        // Send to channel
        time.Sleep(time.Second)
        for j := 0; j < channelLength; j++ {
            Jobs <- j
        }
        close(Jobs)
    
        for len(Jobs) != 0 || len(Results) != channelLength  {
            time.Sleep(100 * time.Millisecond)
        }
        fmt.Println("Complete main")
    }
    

    运行结果如下:

    Waiting for job...
    Waiting for job...
    Waiting for job...
    worker 1 started  job 2
    worker 2 started  job 0
    worker 0 started  job 1
    worker 0 finished job 1
    Waiting for job...
    worker 0 started  job 0
    worker 2 finished job 0
    Waiting for job...
    worker 2 started  job 0
    worker 1 finished job 2
    Waiting for job...
    worker 1 started  job 0
    Complete main
    

    这个程序出现问题了,bug在哪里?

    开始的3次,协程运行都是正常。

    worker 1 started  job 2
    worker 2 started  job 0
    worker 0 started  job 1
    worker 0 finished job 1
    worker 2 finished job 0
    worker 1 finished job 2
    

    根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。

    for j := 0; j < channelLength; j++ {
            jobs <- j
        }
    close(jobs)
    

    实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。

    worker 0 started  job 0
    worker 2 started  job 0
    worker 1 started  job 0
    

    开始以为问题出在worker()中,j := <- job,只有当job中有返回,才会打印worker started。但是后面的job id都是0,说明没有向jobs管道中发送新数据。

    for {
                fmt.Println("Waiting for job...")
                select {
                case j := <-Jobs :
                    fmt.Println("worker", id, "started  job", j)
                    time.Sleep(time.Second)
                    fmt.Println("worker", id, "finished job", j)
                    Results <- true
                }
            }
    

    研究向Jobs管道发送数据的代码,突发奇想,把close(Jobs)注释掉,看看如何。

    for j := 0; j < channelLength; j++ {
            Jobs <- j
        }
    //close(Jobs)
    

    程序居然正常了。

    Waiting for job...
    Waiting for job...
    Waiting for job...
    worker 1 started  job 0
    worker 0 started  job 2
    worker 2 started  job 1
    worker 1 finished job 0
    worker 0 finished job 2
    Waiting for job...
    Waiting for job...
    worker 2 finished job 1
    Waiting for job...
    Complete main
    

    原来问题出在close()上,马上查注释。close()是在sender中调用,当管道中最后一个数据被接收以后,就关闭管道。此时,不能再向管道中发送数据。否则会报错panic: send on closed channel

    使用x, ok := <-c可以判断一个管道是否关闭,如果管道已经关闭,ok的值为false

    管道关闭以后,并且管道中的数据被接收完以后,居然还能从管道中接收到数据0。于是就造成了后续协程接收到job 0的问题。

    // The close built-in function closes a channel, which must be either
    // bidirectional or send-only. It should be executed only by the sender,
    // never the receiver, and has the effect of shutting down the channel after
    // the last sent value is received. After the last value has been received
    // from a closed channel c, any receive from c will succeed without
    // blocking, returning the zero value for the channel element. The form
    //  x, ok := <-c
    // will also set ok to false for a closed channel.
    func close(c chan<- Type)
    

    如果要使用close,应该怎么做

    管道不用时,close()管道是个好习惯。此时,应该怎么解决这个问题呢?首先要在协程中检查接收到的数据,j:=<-jobs,判断j是否为0。如果Jobs中存放的是非指针数据,不能分辨0是真正的0值,还是close以后接收到的0。因此需要在Jobs管道中存放指针。管道打开时,接收的都是非nil指针。close以后才返回0,也就是nil指针。

    修改程序。新生成一个机构体Job。

    type Job struct {
        JobId int
    }
    

    Jobs保存指向Job的指针。

    Jobs chan *Job
    func main() {
        Jobs = make(chan *Job, channelLength)
        ...
        for j := 0; j < channelLength; j++ {
            Jobs <- &Job{JobId:j}
        }
        close(Jobs)
        ...
    }
    

    在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。

    func worker(id int) {
        go func() {
            for {
                fmt.Println("Waiting for job...")
                select {
                // Receive from channel
                case j := <-Jobs :
                    if j == nil {
                        fmt.Println("Close the worker", id)
                        return
                    }
                    fmt.Println("worker", id, "started  job", j.JobId)
                    time.Sleep(time.Second)
                    fmt.Println("worker", id, "finished job", j.JobId)
                    Results <- true
                }
            }
        }()
    }
    

    运行结果达到预期。

    Waiting for job...
    Waiting for job...
    Waiting for job...
    worker 0 started  job 0
    worker 1 started  job 1
    worker 2 started  job 2
    worker 2 finished job 2
    worker 0 finished job 0
    Waiting for job...
    Waiting for job...
    Close the worker 2
    Close the worker 0
    worker 1 finished job 1
    Waiting for job...
    Close the worker 1
    Complete main
    

    附上最终的代码。

    package main
    
    import (
        "fmt"
        "time"
    )
    
    type Job struct {
        JobId int
    }
    
    func worker(id int) {
        go func() {
            for {
                fmt.Println("Waiting for job...")
                select {
                // Receive from channel
                case j := <-Jobs :
                    if j == nil {
                        fmt.Println("Close the worker", id)
                        return
                    }
                    fmt.Println("worker", id, "started  job", j.JobId)
                    time.Sleep(time.Second)
                    fmt.Println("worker", id, "finished job", j.JobId)
                    Results <- true
                }
            }
        }()
    }
    
    const channelLength = 3
    
    var (
        Jobs chan *Job
        Results chan bool
    )
    
    func main() {
        Jobs = make(chan *Job, channelLength)
        Results = make(chan bool, channelLength)
    
        // Start worker goroutines
        for i:= 0; i < channelLength; i++ {
            worker(i)
        }
    
        // Send to channel
        time.Sleep(time.Second)
        for j := 0; j < channelLength; j++ {
            Jobs <- &Job{JobId:j}
        }
        close(Jobs)
    
        for len(Jobs) != 0 || len(Results) != channelLength  {
            time.Sleep(100 * time.Millisecond)
        }
        fmt.Println("Complete main")
    }
    

    相关文章

      网友评论

          本文标题:golang实现协程池

          本文链接:https://www.haomeiwen.com/subject/vbvhwttx.html