CPU 的优化
未优化前的代码:
func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
defer lq.Close()
goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
for {
select {
case <-s.ctx.Done():
s.logger.Debug("Schedule context done")
return
case <-ctx.Done():
s.logger.Debug("Context done %s", goroutineFlag)
return
case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
if ok {
task.Execute(ctx)
if err := lq.Push(task); err != nil {
s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
return
}
}
}
}
}
调用代码:
for i, lq := 0, newLoopQueue(tasks); i < concurrency; i++ {
go s.handleLoopQueue(context.WithValue(ctx, common.ConcurrencyKey, ip+"-"+strconv.Itoa(i)), lq)
}
逻辑是,生成一个任务队列,然后根据并发数共同消费这个队列,每次任务执行完毕后,都会将任务重新放回队列中,这样循环使用。同时,每个任务执行完毕后,若失败,或空执行(即,任务无实际数据的处理),会延长其下次执行时间。每次执行时,若未到执行时间,则直接执行完毕。
然而根据实测效果,这段代码几乎能跑满cpu,原因是cpu几乎都耗在select等待上,并且总能等到队列出来任务(因为总有其他的任务执行完了然后被放回去,或未到执行时间立即被放回去)。
后来优化了半天没有好的思路,后来灵机一动,给外层for循环增加一个ticker,以增加任务间的等待时间,降低cpu空转率。
优化后代码如下:
func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
defer lq.Close()
goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
for range time.Tick(time.Second) { // 每个任务执行间隔一秒钟,极大降低cpu空转率
select {
case <-s.ctx.Done():
s.logger.Debug("Schedule context done")
return
case <-ctx.Done():
s.logger.Debug("Context done %s", goroutineFlag)
return
case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
if ok {
task.Execute(ctx)
if err := lq.Push(task); err != nil {
s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
return
}
}
}
}
}
即将for {
改成for range time.Tick(time.Second) {
,部署后,比原先cpu使用率缩水10倍以上。
内存的优化
优化前的代码:
var (
_timedOut time.Duration = 5 * time.Second
)
func (lq *loopQueue) Push(task task.ScheduleTask) error {
lq.RLock()
defer lq.RUnlock()
// 如果队列关闭,则停止入队
if lq.closed == 1 {
return nil
}
select {
case lq.queue <- task:
return nil
case time.After(_timedOut):
return errors.New("push timed out")
}
}
在大并发的情况下,time.After非常消耗内存,因为每次都会创建一个chan。每秒6000个并发情况下,几秒钟就花费了几十M内存,几分钟内存上升至G级别。因此考虑采用内存池(sync.Pool)的方式,尽量将计时器回收利用,以节省内存空间。
优化后的代码:
func newLoopQueue(tasks map[string]task.ScheduleTask) *loopQueue {
queue := make(chan task.ScheduleTask, len(tasks))
for _, task := range tasks {
queue <- task
}
pool := &sync.Pool{
New: func() interface{} {
return time.NewTimer(_timedOut)
},
}
return &loopQueue{tasks: tasks, queue: queue, pool: pool}
}
func (lq *loopQueue) Push(task task.ScheduleTask) error {
lq.RLock()
defer lq.RUnlock()
// 如果队列关闭,则停止入队
if lq.closed == 1 {
return nil
}
// 从池中取出计时器
timer, ok := lq.pool.Get().(*time.Timer)
if !ok {
return errors.Errorf("Invalid pool type, want[*time.Timer], actul[%s]", reflect.TypeOf(timer))
}
// 使用前重置计时器的超时时间
if !timer.Reset(_timedOut) {
return errors.New("Reset timer")
}
select {
case lq.queue <- task:
// 计时器未被使用,放回池中重复利用
lq.pool.Put(timer)
return nil
case <-timer.C:
return errors.New("push timed out")
}
}
注意:如果timer被使用(<-time.C)或者被手动停止后,time.Reset是会报错的。
部署后,运行3个小时+查看,内存使用前30名已然看不到人影,实际占用内存不足1M可以忽略不计~
网友评论