美文网首页
go 实现一个简易的线程池(二)

go 实现一个简易的线程池(二)

作者: 嘉磊 | 来源:发表于2019-05-14 15:05 被阅读0次

    针对(一)中所出现的问题,现在使用一个两级channel系统,一个用来存放任务队列,另一个用来控制任务队列上执行操作的"工人"数量([参考文章]:(http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/))。

    • 先将具体的业务操作抽取出来,里面定义一个具体的业务方法
    // MyService 业务接口
    type MyService struct {
    }
    
    // WriteInfo 写日志
    func (s *MyService) WriteInfo() {
        time.Sleep(1 * time.Second)
        t := time.Now()
        logFile, err := os.OpenFile("syslog.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
        defer logFile.Close()
        if err != nil {
            panic(err)
        }
        infoLog := log.New(logFile, "[INFO]", log.LstdFlags)
        infoLog.Print("time=" + strconv.FormatInt(t.UTC().UnixNano(), 10))
    }
    
    • 定义一个代表工作的结构体Job,和一个代表工人的结构体Worker
    // Job 表示要执行的作业
    type Job struct {
        MyService MyService
    }
    
    // Worker 执行作业的工人
    type Worker struct {
        WorkerPool chan chan Job
        JobChannel chan Job
    }
    // NewWorker 新建一个工人
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool,
            JobChannel: make(chan Job),
        }
    }
    
    • 给工人定义一个Start方法,表示监听自己的工作任务,有活儿来了就开始工作
    func (w Worker) Start() {
        go func() {
            for {
                w.WorkerPool <- w.JobChannel
                select {
                case job := <-w.JobChannel:
                    //有工作任务时,开始执行业务接口的方法
                    job.MyService.WriteInfo()
                }
            }
        }()
    }
    
    • 初始化池
    var maxWorkers = 20
    
    // JobQueue 作业队列
    var JobQueue = make(chan Job, maxWorkers)
    
    // InitPool 给池中初始化一定量的工人,以及开启任务队列的监听
    func InitPool() {
        // 创建工作池
        pool := make(chan chan Job, maxWorkers)
        // 创建一定数量的工人(可以看做:创建了N个工人,每个工人能并发处理N件工作)
        for i := 0; i < maxWorkers; i++ {
            worker := NewWorker(pool)
            worker.Start()
        }
        //监听JobQueue上是否有新任务
        go func() {
            for {
                select {
                case job := <-JobQueue:
                    go func(job Job) {
                        // 获取可用的工人channel,若没有,则阻塞
                        jobChannel := <-pool
                        jobChannel <- job
                    }(job)
                }
            }
        }()
    }
    
    • 提供一个http接口,用于测试
    func init() {
        InitPool()
    }
    
    func main() {
        http.HandleFunc("/test/pool/", indexHandler)
        http.ListenAndServe(":9000", nil)
    }
    
    func indexHandler(w http.ResponseWriter, r *http.Request) {
        service := MyService{}
        //service.WriteInfo()
        work := Job{MyService: service}
        JobQueue <- work
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        result := "{\"msg\":\"SUCCESS\",\"code\":0}"
        fmt.Fprintln(w, result)
    }
    
    • 使用jmeter进行测试,和(一)中一样,100个样本,循环10次执行


      聚合报告.png
    图形结果.png

    可以看到处理请求的能力还是非常不错的。其实这种测试方法并不能和(一)的结果进行横向比对,毕竟(一)中只开启了20个协程,而在(二)中由于双队列的存在,处理请求的协程数量肯定是要比(一)中的多的多。
    但(二)的模式肯定是要优于(一)的,这是毋容置疑的。我只是为了给自己做个笔记,加深对go中channel的理解,毕竟,好记性不如烂笔头么 (ง •̀_•́)ง
    源码我上传到了github,地址:https://github.com/wleirock/studygo/tree/master/pool

    相关文章

      网友评论

          本文标题:go 实现一个简易的线程池(二)

          本文链接:https://www.haomeiwen.com/subject/biusaqtx.html