美文网首页GO与微服务
手撸golang GO与微服务 ChatServer之4 内存泄

手撸golang GO与微服务 ChatServer之4 内存泄

作者: 老罗话编程 | 来源:发表于2021-03-10 07:59 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    案例需求(聊天服务器)

    • 用户可以连接到服务器。
    • 用户可以设定自己的用户名。
    • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

    目标(Day 4)

    • 诊断并修复内存泄漏

    诊断

    • 在day 3的代码基础上, 使用go tool pprof查看heap日志
    $ go tool pprof ~/chat_server_mem.profile 
    File: chat_server.test
    Type: inuse_space
    Time: Mar 10, 2021 at 7:35am (CST)
    Entering interactive mode (type "help" for commands, "o" for options)
    (pprof) top
    Showing nodes accounting for 9495.99kB, 100% of 9495.99kB total
    Showing top 10 nodes out of 12
          flat  flat%   sum%        cum   cum%
     7287.48kB 76.74% 76.74%  7287.48kB 76.74%  time.startTimer
     1184.27kB 12.47% 89.21%  1184.27kB 12.47%  runtime/pprof.StartCPUProfile
      512.19kB  5.39% 94.61%   512.19kB  5.39%  runtime.malg
      512.05kB  5.39%   100%  7799.53kB 82.13%  learning/gooop/chat_server.(*tChatClient).beginWrite
             0     0%   100%  1184.27kB 12.47%  command-line-arguments.Test_ChatServer
             0     0%   100%   512.19kB  5.39%  runtime.mstart
             0     0%   100%   512.19kB  5.39%  runtime.newproc.func1
             0     0%   100%   512.19kB  5.39%  runtime.newproc1
             0     0%   100%   512.19kB  5.39%  runtime.systemstack
             0     0%   100%  1184.27kB 12.47%  testing.tRunner
    (pprof) 
    
    • 疑似有两个泄漏点, 一个是time.startTimer, 一个是(*tChatClient).beginWrite
    • 由于(*tChatClient).beginWrite才是业务代码, 且cum% > time.startTimer的cum%
    • 因此可以怀疑:
      • (*tChatClient).beginWrite是内存泄漏的根本点
      • 主要泄漏原因是调用了太多次time.startTimer

    复查代码

    • 复查tChatClient.beginWrite的代码, 导致不断分配内存的点可能有两个:
      • Logging.Logf, 不断追加日志.
      • 解决方法: 改造Logging, 限制最多日志条数(使用容量有限的队列)
      • for循环中不断调用time.After, 导致大量创建timer.
      • 解决方法: 不使用time.After, 而使用独立的routine和timer检测读超时
    func (me *tChatClient) beginWrite() {
        Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
        writer := io.Writer(me.conn)
        for {
            select {
            case <- me.closeChan:
                Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
                _ = me.conn.Close()
                me.closeFlag = 2
                me.postConnClosed()
                return
    
            case msg := <- me.sendChan:
                atomic.AddInt32(&me.pendingSend, -1)
                _,e := writer.Write([]byte(msg.Encode()))
                if e != nil {
                    Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                    me.closeConn()
    
                } else {
                    me.sendLogs = append(me.sendLogs, msg)
                }
                break
    
            case <- time.After(time.Duration(5) * time.Second):
                me.postRecvTimeout()
                break
            }
        }
    }
    

    改造Logging

    主要是将日志数组改造为容量有上限的日志队列, 防止诊断日志的采集, 导致内存无限增长.

    package chat_server
    
    import (
        "fmt"
        "sync"
    )
    
    type ILoggingService interface {
        Logf(f string, args... interface{})
        AllLogs() []string
    }
    
    type tLoggingService struct {
        mutex *sync.Mutex
    
        logs []string
        capacity int
        rindex int
        windex int
    
    }
    
    var gMaxLogs = 10_000
    var gEmptyString = ""
    
    func newLoggingService() ILoggingService {
        return &tLoggingService{
            mutex: new(sync.Mutex),
    
            logs: make([]string, gMaxLogs*2),
            //logs: make([]string, 0),
            capacity: gMaxLogs,
            rindex: 0,
            windex: 0,
        }
    }
    
    func (me *tLoggingService) size() int {
        return me.windex - me.rindex
    }
    
    
    func (me *tLoggingService) Logf(f string, args... interface{}) {
        log := fmt.Sprintf(f, args...)
    
        me.mutex.Lock()
        //me.logs = append(me.logs, log)
        me.ensureSpace()
        me.logs[me.windex] = log
        me.windex++
        me.mutex.Unlock()
    
        fmt.Println(log)
    }
    
    func (me *tLoggingService) ensureSpace() {
        for me.size() >= me.capacity {
            // dequeue head items
            me.logs[me.rindex] = gEmptyString
            me.rindex++
        }
    
        if me.rindex >= me.capacity {
            // move data to offset 0
            for i,n := 0, me.size();i < n;i++ {
                me.logs[i], me.logs[i + me.rindex] = me.logs[i + me.rindex], gEmptyString
            }
    
            // reset read and write index
            me.windex, me.rindex = me.windex - me.rindex, 0
        }
    }
    
    func (me *tLoggingService) AllLogs() []string {
        return me.logs
    }
    
    var Logging = newLoggingService()
    

    改造tChatClient

    • 去掉写循环中, time.After的调用
    • 使用专门的routine和读计数器, 检测读超时的状况
    func (me *tChatClient) open(){
        if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
            return
        }
    
        go me.beginWrite()
        go me.beginRead()
        
        // 读超时检测
        go me.beginWatchRecvTimeout()   
    }
    
    func (me *tChatClient) beginWatchRecvTimeout() {
        duration := time.Duration(5)
        for range time.Tick(duration * time.Second) {
            if me.isClosed() {
                break
            }
    
            me.timeoutCounter++
            if me.timeoutCounter >= 3 {
                me.postRecvTimeout()
            }
        }
    }
    
    func (me *tChatClient) beginWrite() {
        Logging.Logf("tChatClient.beginWrite, %v, serverFlag=%v", me.name, me.serverFlag)
        writer := io.Writer(me.conn)
        for {
            select {
            case <- me.closeChan:
                Logging.Logf("tChatClient.beginWrite, <- closeChan, %v, serverFlag=%v", me.name, me.serverFlag)
                _ = me.conn.Close()
                me.closeFlag = 2
                me.postConnClosed()
                return
    
            case msg := <- me.sendChan:
                atomic.AddInt32(&me.pendingSend, -1)
                _,e := writer.Write([]byte(msg.Encode()))
                if e != nil {
                    Logging.Logf("tChatClient.beginWrite, write error, %v, serverFlag=%v", me.name, me.serverFlag)
                    me.closeConn()
    
                } else {
                    me.sendLogs = append(me.sendLogs, msg)
                }
                break
            
            //case <- time.After(time.Duration(5) * time.Second):
            //  me.postRecvTimeout()
            //  break
            }
        }
    }
    
    func (me *tChatClient) beginRead() {
        reader := bufio.NewReader(me.conn)
        for {
            line, err := reader.ReadString('\n')
            if err != nil {
                Logging.Logf("tChatClient.beginRead, read error, %v, serverFlag=%v", me.name, me.serverFlag)
                me.closeConn()
                break
            }
    
            // 重置读超时计数
            me.timeoutCounter = 0
    
            ok, msg := MsgDecoder.Decode(line)
            if ok {
                fn := me.recvHandler
                if fn != nil {
                    fn(me, msg)
                }
    
                me.recvLogs = append(me.recvLogs, msg)
            }
        }
    }
    

    复测

    • 重跑测试, 查pprof, 现在内存清爽多了, 已经看不到业务代码导致的泄漏点, 修复有效
    $ go tool pprof ~/chat_server_mem.profile 
    File: chat_server.test
    Type: inuse_space
    Time: Mar 10, 2021 at 7:55am (CST)
    Entering interactive mode (type "help" for commands, "o" for options)
    (pprof) top
    Showing nodes accounting for 2.66MB, 100% of 2.66MB total
          flat  flat%   sum%        cum   cum%
        1.50MB 56.47% 56.47%     1.50MB 56.47%  runtime.malg
        1.16MB 43.53%   100%     1.16MB 43.53%  runtime/pprof.StartCPUProfile
             0     0%   100%     1.16MB 43.53%  command-line-arguments.Test_ChatServer
             0     0%   100%     1.50MB 56.47%  runtime.mstart
             0     0%   100%     1.50MB 56.47%  runtime.newproc.func1
             0     0%   100%     1.50MB 56.47%  runtime.newproc1
             0     0%   100%     1.50MB 56.47%  runtime.systemstack
             0     0%   100%     1.16MB 43.53%  testing.tRunner
    (pprof) 
    
    

    (end)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 ChatServer之4 内存泄

        本文链接:https://www.haomeiwen.com/subject/qzwiqltx.html