美文网首页Golang知识图谱
Golang高并发工作池

Golang高并发工作池

作者: 君致 | 来源:发表于2019-02-20 09:51 被阅读235次

    go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.
    直接上代码(JobPool.go)

    package utils
    
    import (
        "fmt"
    )
    
    // 定义任务接口,所有实现该接口的均实现工作池
    type Task interface {
        DoTask() error
    }
    
    // 定义工作结构体
    type Job struct {
        Task Task
    }
    
    // 定义全部的工作队列
    var JobQueue chan Job
    
    // 定义工作者
    type Worker struct {
        WorkerPool chan chan Job    // 工人对象池
        JobChannel  chan Job        // 管道里面拿Job
        quit chan bool
    }
    
    // 新建一个工作者
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool, // 工人对象池
            JobChannel: make(chan Job), //工人的任务
            quit:       make(chan bool),
        }
    }
    
    // 工作池启动主函数
    func(w *Worker)Start(){
        // 开一个新的协程
        go func() {
            for{
                // 注册任务到工作池
                w.WorkerPool <- w.JobChannel
                select {
                // 接收到任务
                case job := <- w.JobChannel:
                    // 执行任务
                    err := job.Task.DoTask()
                    if err != nil {
                        fmt.Println("任务执行失败")
                    }
                // 接收退出的任务, 停止任务
                case <- w.quit:
                    return
                }
            }
        }()
    }
    
    // 退出执行工作
    func (w *Worker) Stop(){
        go func(){
            w.quit <- true
        }()
    }
    
    // 定义任务发送者
    type Sender struct {
        maxWorkers int  // 最大工人数
        WorkerPool chan chan Job    // 注册工作通道
        quit chan bool  // 退出信号
    }
    
    // 注册新发送者
    func NewSender(maxWorkers int) *Sender{
        Pool := make(chan chan Job, maxWorkers)
        return &Sender{
            WorkerPool: Pool,       // 将工作者放到一个工作池中
            maxWorkers: maxWorkers, // 最大工作者数量
            quit: make(chan bool),
        }
    }
    
    // 工作分发器
    func(s *Sender)Run(){
        for i:=0; i<s.maxWorkers; i++{
            worker := NewWorker(s.WorkerPool)
            worker.Start()
        }
        // 监控任务发送
        go s.Send()
    }
    
    // 退出发放工作
    func (s *Sender) Quit(){
        go func(){
            s.quit <- true
        }()
    }
    
    func(s *Sender)Send(){
        for {
            select {
            // 接收到任务
            case job :=<- JobQueue:
                go func(job Job) {
                    jobChan := <- s.WorkerPool
                    jobChan <- job
                }(job)
            }
        }
    }
    
    // 初始化对象池
    func InitPool()  {
        maxWorkers := 4
        maxQueue := 20
        // 初始化一个任务发送者,指定工作者数量
        send := NewSender(maxWorkers)
            // 指定任务的队列长度
        JobQueue = make(chan Job,maxQueue) 
        // 一直运行任务发送
        send.Run()
    }
    

    使用方法

    package main
    
    import (
        "fmt"
        "os"
        "test/utils"    //引用: JobPool是放在test项目的utils包下
        "time"
    )
    
    type Test struct {
        num int
    }
    
    //  任务,实现JobPool的Task接口
    func(t *Test)DoTask() error{
        f, err := os.OpenFile("log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
        if err != nil {
            return err
        }
        defer f.Close()
        f.WriteString(fmt.Sprintf("这是任务:%d号,执行时间为:%s \n", t.num, fmt.Sprintf("%s", time.Now())))
        return nil
    }
    
    func main(){
        // 初始化对象池
        utils.InitPool()
        for i:=1;i<40 ;i++{
            // 注册任务到Job队列中
            task := &Test{i}
            utils.JobQueue <- utils.Job{
                Task:task,
            }
        }
        // time.Sleep(180 * time.Second)
        // 执行结束,关闭管道
        close(utils.JobQueue)
    }
    

    参考文章: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

    相关文章

      网友评论

        本文标题:Golang高并发工作池

        本文链接:https://www.haomeiwen.com/subject/kqscyqtx.html