美文网首页
一次golang sarama kafka内存占用大的排查经历

一次golang sarama kafka内存占用大的排查经历

作者: 冥王星的卧底 | 来源:发表于2022-02-09 15:23 被阅读0次

    环境:

    golang sarama kafka
    1.15 1.19 v2.5.0,kafka三节点集群,partition数为3

    现象:golang微服务内存占用超过1G,查看日志发现大量kafka相关错误日志,继而查看kafka集群,其中一个kafka节点容器挂掉了。
    疑问 为什么kafka集群只有一个broker挂了,客户端就大量报错呢

    使用pprof查看内存占用

    通过beego admin页面获取 mem-1.memprof

    go tool pprof mem-1.memprof
    web // 使用web命令查看内存占用情 
    
    image.png

    可以看到调用栈为 withRecover > backgroundMetadataUpdataer > refreshMeaatdata > RefreshMetada > tryRefreshMetadata > ...

    定位问题

    • 通过搜索sarama源码,backgroundMetadataUpdataer 这个函数,只有在sarama NewClient的时候调用。
    • go业务代码在创建Consumer的时候,最终会调用到sarama的NewClient
    • 业务代码中,创建consumer出错会间隔10s一直重试
    • backgroundMetadataUpdataer 中创建一个超时】】定时器,时间间是RefreshFrequency,默认10min ,所以backgroundMetadataUpdataer 会阻塞10min
     // go业务代码中,创建consumer如果出错会间隔10s一直重试
        for {
            consumer, err = cluster.NewConsumer(k.addr, group, topics, k.ccfg)
            if err != nil {
                logs.Error("new kafka consumer is error:", err.Error())
                time.Sleep(10 * time.Second)
                continue
            }
            logs.Info("new kafka consumer is success!")
            break
        }
    

    sarama-cluster: NewClient

    func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
        client, err := NewClient(addrs, config)
        if err != nil {
            return nil, err
        }
        ...
        }
    
    func (client *client) backgroundMetadataUpdater() {
        defer close(client.closed)
    
        if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
            return
        }
    
        ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
        defer ticker.Stop()
    
        for {
            select {
            case <-ticker.C:
                if err := client.refreshMetadata(); err != nil {
                    Logger.Println("Client background metadata update:", err)
                }
            case <-client.closer:
                return
            }
        }
    }
    

    为什么kafka集群只有一个broker,但是NewClient确失败了?
    在kafka容器里查看topic, 发现Replicas和Isr只有一个,找到kafka官方配置说明,自动生成的topic需要配置default.replication.factor这个参数,才会生成3副本。

    image.png

    Review the following settings in the Advanced kafka-broker category, and modify as needed:
    auto.create.topics.enable
    Enable automatic creation of topics on the server. If this property is set to true, then attempts to produce, consume, or fetch metadata for a nonexistent topic automatically create the topic with the default replication factor and number of partitions. The default is enabled.
    default.replication.factor
    Specifies default replication factors for automatically created topics. For high availability production systems, you should set this value to at least 3.
    num.partitions
    Specifies the default number of log partitions per topic, for automatically created topics. The default value is 1. Change this setting based on the requirements related to your topic and partition design.

    相关文章

      网友评论

          本文标题:一次golang sarama kafka内存占用大的排查经历

          本文链接:https://www.haomeiwen.com/subject/csqnhrtx.html