美文网首页
2020-06-30 一次极大的优化cpu和内存使用的记录

2020-06-30 一次极大的优化cpu和内存使用的记录

作者: 天地一小儒 | 来源:发表于2020-06-30 22:56 被阅读0次

    CPU 的优化

    未优化前的代码:

    func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
        defer lq.Close()
        goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
        for {
            select {
            case <-s.ctx.Done():
                s.logger.Debug("Schedule context done")
                return
            case <-ctx.Done():
                s.logger.Debug("Context done %s", goroutineFlag)
                return
            case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
                if ok {
                    task.Execute(ctx)
                    if err := lq.Push(task); err != nil {
                        s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
                        return
                    }
                }
            }
        }
    }
    

    调用代码:

    for i, lq := 0, newLoopQueue(tasks); i < concurrency; i++ {
                go s.handleLoopQueue(context.WithValue(ctx, common.ConcurrencyKey, ip+"-"+strconv.Itoa(i)), lq)
    }
    

    逻辑是,生成一个任务队列,然后根据并发数共同消费这个队列,每次任务执行完毕后,都会将任务重新放回队列中,这样循环使用。同时,每个任务执行完毕后,若失败,或空执行(即,任务无实际数据的处理),会延长其下次执行时间。每次执行时,若未到执行时间,则直接执行完毕。

    然而根据实测效果,这段代码几乎能跑满cpu,原因是cpu几乎都耗在select等待上,并且总能等到队列出来任务(因为总有其他的任务执行完了然后被放回去,或未到执行时间立即被放回去)。

    后来优化了半天没有好的思路,后来灵机一动,给外层for循环增加一个ticker,以增加任务间的等待时间,降低cpu空转率。

    优化后代码如下:

    func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
        defer lq.Close()
        goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
        for range time.Tick(time.Second) { // 每个任务执行间隔一秒钟,极大降低cpu空转率
            select {
            case <-s.ctx.Done():
                s.logger.Debug("Schedule context done")
                return
            case <-ctx.Done():
                s.logger.Debug("Context done %s", goroutineFlag)
                return
            case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
                if ok {
                    task.Execute(ctx)
                    if err := lq.Push(task); err != nil {
                        s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
                        return
                    }
                }
            }
        }
    }
    

    即将for {改成for range time.Tick(time.Second) {,部署后,比原先cpu使用率缩水10倍以上。

    内存的优化

    优化前的代码:

    var (
        _timedOut time.Duration = 5 * time.Second
    )
    
    func (lq *loopQueue) Push(task task.ScheduleTask) error {
        lq.RLock()
        defer lq.RUnlock()
        // 如果队列关闭,则停止入队
        if lq.closed == 1 {
            return nil
        }
        select {
        case lq.queue <- task:
            return nil
        case time.After(_timedOut):
            return errors.New("push timed out")
        }
    }
    

    在大并发的情况下,time.After非常消耗内存,因为每次都会创建一个chan。每秒6000个并发情况下,几秒钟就花费了几十M内存,几分钟内存上升至G级别。因此考虑采用内存池(sync.Pool)的方式,尽量将计时器回收利用,以节省内存空间。

    优化后的代码:

    func newLoopQueue(tasks map[string]task.ScheduleTask) *loopQueue {
        queue := make(chan task.ScheduleTask, len(tasks))
        for _, task := range tasks {
            queue <- task
        }
        pool := &sync.Pool{
            New: func() interface{} {
                return time.NewTimer(_timedOut)
            },
        }
        return &loopQueue{tasks: tasks, queue: queue, pool: pool}
    }
    
    func (lq *loopQueue) Push(task task.ScheduleTask) error {
        lq.RLock()
        defer lq.RUnlock()
        // 如果队列关闭,则停止入队
        if lq.closed == 1 {
            return nil
        }
        // 从池中取出计时器
        timer, ok := lq.pool.Get().(*time.Timer)
        if !ok {
            return errors.Errorf("Invalid pool type, want[*time.Timer], actul[%s]", reflect.TypeOf(timer))
        }
        // 使用前重置计时器的超时时间
        if !timer.Reset(_timedOut) {
            return errors.New("Reset timer")
        }
        select {
        case lq.queue <- task:
            // 计时器未被使用,放回池中重复利用
            lq.pool.Put(timer)
            return nil
        case <-timer.C:
            return errors.New("push timed out")
        }
    }
    

    注意:如果timer被使用(<-time.C)或者被手动停止后,time.Reset是会报错的。
    部署后,运行3个小时+查看,内存使用前30名已然看不到人影,实际占用内存不足1M可以忽略不计~

    相关文章

      网友评论

          本文标题:2020-06-30 一次极大的优化cpu和内存使用的记录

          本文链接:https://www.haomeiwen.com/subject/jybfqktx.html