美文网首页
golang kafka 消费者

golang kafka 消费者

作者: 東玖零 | 来源:发表于2023-07-25 16:09 被阅读0次

    背景:应用需要在某个事件完成后写表记录,以前这个事件有kafka消息,于是监听即可。

    自己也不是特别了解,经过摸索可以正常使用,并添加了一些注释,如有不还正确请指正,上代码吧!

    //导入头文件
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego"
    cluster "github.com/bsm/sarama-cluster"
    
    var kafkaTag = "kafka message"
    
    func StartKafka() {
        // 配置
        config := cluster.NewConfig()
        // 设定是否需要返回错误信息
        config.Consumer.Return.Errors = true
        // 为成员分配主题分区的策略
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
        // 这里 sarama.OffsetNewest 就从最新的开始消费,即该 consumer 启动之前产生的消息都无法被消费
        // 如果改为 sarama.OffsetOldest 则会从最旧的消息开始消费,即每次重启 consumer 都会把该 topic 下的所有消息消费一次
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
        // 每 1 秒钟提交一次 offset
        config.Consumer.Offsets.CommitInterval = 1 * time.Second
        //Notifications 返回消费者期间发生的通知的通道
        // 重新平衡。 如果您的配置是,通知只会通过此通道发出
        config.Group.Return.Notifications = true
    
        // 创建消费者
        brokers := strings.Split(beego.AppConfig.String("kafka_url"), ",")
        groupId := "kafka-groupId-01" // 消息者组,每个后台应用创建者独有,如果你使用其他后台代码中,可能会导致自己或其他人收不消息
        topics := []string{"kafka-topics-01"} // 订阅主题
        consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
        if err != nil {
            logs.Error(kafkaTag, "new consumer error: ", err.Error())
            return
        }
        defer consumer.Close()
    
        // trap SIGINT to trigger a shutdown.
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
    
        // 接收错误
        go func() {
            for err := range consumer.Errors() {
                logs.Error(kafkaTag, "consumer error:", err.Error())
            }
        }()
    
        // 打印一些rebalance的信息
        go func() {
            for ntf := range consumer.Notifications() {
                logs.Info(kafkaTag, "consumer notification:", ntf)
            }
        }()
    
        // 循环从通道中获取消息
        for {
            select {
            case msg, ok := <-consumer.Messages():
                if ok {
                    logs.Info(kafkaTag, "Topic = ", msg.Topic, "Partition = ", msg.Partition, "Offset = ", msg.Offset, "Key = ", string(msg.Key), "Value", string(msg.Value))
                    if msg.Topic == "kafka-topics-01" {
                        // 自己解析 使用json库解析msg.Value
                        // 然后写表
                    }
                    consumer.MarkOffset(msg, "") // 上报offset
                } else {
                    logs.Error(kafkaTag, "监听服务失败")
                }
            case <-signals:
                return
            }
        }
    }
    

    配置文件中
    kafka_url = kafka地址1,kafka地址2,kafka地址3

    如果不使用strings.Split进行分个割,启动会报错,报错如下:

    kafka message new consumer error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    

    其实就是把多个地址当一个地址去启动了。

    相关文章

      网友评论

          本文标题:golang kafka 消费者

          本文链接:https://www.haomeiwen.com/subject/gwrhpdtx.html