美文网首页
记录一次kafka订阅异常err:kafka server: R

记录一次kafka订阅异常err:kafka server: R

作者: SailSea | 来源:发表于2021-08-14 19:40 被阅读0次

    问题描述:

    异常信息
    2021-08-11T06:58:41.739Z        error   xiot_kafka/client.go:152        Error from consumer:err:kafka server: Request was for a consumer group that is not coordinated by this broker.    
    2021-08-12T03:50:35.427Z        error   xiot_kafka/client.go:152        Error from consumer:err:kafka server: Request was for a consumer group that is not coordinated by this broker. 
    
    在法兰克福使用的ckafka在最近上述两个时间段总是报错,重启服务才能恢复,查阅相关文档好像是kafka实例重启导致。
    相关问题参考:
    https://www.orchome.com/9861
    https://github.com/Shopify/sarama/issues/1407
    
    地域: 法兰克福
    

    复现:

    使用docker-compose部署kafka

    version: '2'
    services:
      zookeeper:
        container_name: zookeeper
        image: wurstmeister/zookeeper
        volumes:
          - ./zookeeper/data:/data
        ports:
          - "2181:2181"
           
      kafka:
        container_name: kafka
        image: wurstmeister/kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: xxx.xxx.xxx.xxx
          KAFKA_MESSAGE_MAX_BYTES: 2000000
          KAFKA_CREATE_TOPICS: "test:1:1"
          KAFKA_ZOOKEEPER_CONNECT: xxx.xxx.xxx.xxx:2181
        volumes:
          - ./kafka:/kafka
          - /var/run/docker.sock:/var/run/docker.sock
     
      kafka-manager:
        container_name: kafka-manager
        image: sheepkiller/kafka-manager
        ports:
          - 9020:9000
        environment:
          ZK_HOSTS: xxx.xxx.xxx.xxx:2181
    

    部署
    docker-compose up -d

    使用https://github.com/Shopify/sarama 1.19客户端连接kafka

    package main
    
    import (
        "context"
        "fmt"
        "github.com/Shopify/sarama"
        "log"
        "os"
        "os/signal"
        "sync"
        "syscall"
        "time"
        "worth-cloud/pkg/loge"
    )
    
    
    
    func main() {
        server := []string{"xxx.xxx.xxx.xxx:9092"}
        groupID := "yourgroupid"
        topic := []string{"test"}
        config := sarama.NewConfig()
        //指定 Kafka 版本,选择和购买的 CKafka 相对应的版本,如果不指定,sarama 会使用最低支持的版本
        config.Version = sarama.V1_1_0_0
        //config.Net.SASL.Enable = true
        //config.Net.SASL.User = "xxxxxxxxxxxxxx"
        //config.Net.SASL.Password = "xxxxx"
        //producer
        proClient, err := sarama.NewClient(server, config)
        if err != nil {
            log.Fatalf("unable to create kafka client: %q", err)
        }
        defer proClient.Close()
        producer, err := sarama.NewAsyncProducerFromClient(proClient)
        if err != nil {
            log.Fatalln("failed to start Sarama producer:", err)
        }
        defer producer.Close()
        go func() {
            ticker := time.NewTicker(time.Second)
            for {
                select {
                case t := <-ticker.C:
                    //向一个topic生产消息
                    msg := &sarama.ProducerMessage{
                        Topic: topic[0],
                        Key:   sarama.StringEncoder(t.Second()),
                        Value: sarama.StringEncoder("Hello World!"),
                    }
                    producer.Input() <- msg
                }
            }
        }()
        //consumer group
        consumer := Consumer{
            ready: make(chan bool),
        }
        ctx, cancel := context.WithCancel(context.Background())
        client, err := sarama.NewConsumerGroup(server, groupID, config)
        if err != nil {
            log.Panicf("Error creating consumer group client: %v", err)
        }
        wg := &sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                //Consume 需要在一个无限循环中调用,当重平衡发生的时候,需要重新创建 consumer session 来获得新 ConsumeClaim
                if err := client.Consume(ctx, topic, &consumer); err != nil {
                    loge.Error(fmt.Sprintf("Error from consumer:%q err:%s", topic, err))
                    time.Sleep(500 * time.Millisecond)
                }
                //如果 context 设置为取消,则直接退出
                if ctx.Err() != nil {
                    return
                }
                consumer.ready = make(chan bool)
            }
        }()
        log.Println("Sarama consumer up and running!...")
        sigterm := make(chan os.Signal, 1)
        signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
        select {
        case <-ctx.Done():
            log.Println("terminating: context cancelled")
        case <-sigterm:
            log.Println("terminating: via signal")
        }
        cancel()
        wg.Wait()
        if err = client.Close(); err != nil {
            log.Panicf("Error closing client: %v", err)
        }
    }
    
    //Consumer 消费者结构体
    type Consumer struct {
        ready chan bool
    }
    
    //Setup 函数会在创建新的 consumer session 的时候调用,调用时期发生在 ConsumeClaim 调用前
    func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
        // Mark the consumer as ready
        close(consumer.ready)
        return nil
    }
    
    //Cleanup 函数会在所有的 ConsumeClaim 协程退出后被调用
    func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    // ConsumeClaim 是实际处理消息的函数
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        // 注意:
        // 不要使用协程启动以下代码.
        // ConsumeClaim 会自己拉起协程,具体行为见源码:
        // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
        for message := range claim.Messages() {
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
            session.MarkMessage(message, "")
        }
        return nil
    }
    
    

    重启kafka得到短暂的不可用后报错

    Request was for a consumer group that is not coordinated by this broker. 
    

    查询https://github.com/Shopify/sarama最新版本1.29.1有处理该异常

    image.png

    客户端升级最新版本后尝试,已经能正常使用。

    相关文章

      网友评论

          本文标题:记录一次kafka订阅异常err:kafka server: R

          本文链接:https://www.haomeiwen.com/subject/gaeebltx.html