问题描述:
异常信息
2021-08-11T06:58:41.739Z error xiot_kafka/client.go:152 Error from consumer:err:kafka server: Request was for a consumer group that is not coordinated by this broker.
2021-08-12T03:50:35.427Z error xiot_kafka/client.go:152 Error from consumer:err:kafka server: Request was for a consumer group that is not coordinated by this broker.
在法兰克福使用的ckafka在最近上述两个时间段总是报错,重启服务才能恢复,查阅相关文档好像是kafka实例重启导致。
相关问题参考:
https://www.orchome.com/9861
https://github.com/Shopify/sarama/issues/1407
地域: 法兰克福
复现:
使用docker-compose部署kafka
version: '2'
services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
volumes:
- ./zookeeper/data:/data
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: xxx.xxx.xxx.xxx
KAFKA_MESSAGE_MAX_BYTES: 2000000
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: xxx.xxx.xxx.xxx:2181
volumes:
- ./kafka:/kafka
- /var/run/docker.sock:/var/run/docker.sock
kafka-manager:
container_name: kafka-manager
image: sheepkiller/kafka-manager
ports:
- 9020:9000
environment:
ZK_HOSTS: xxx.xxx.xxx.xxx:2181
部署
docker-compose up -d
使用https://github.com/Shopify/sarama 1.19客户端连接kafka
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"worth-cloud/pkg/loge"
)
func main() {
server := []string{"xxx.xxx.xxx.xxx:9092"}
groupID := "yourgroupid"
topic := []string{"test"}
config := sarama.NewConfig()
//指定 Kafka 版本,选择和购买的 CKafka 相对应的版本,如果不指定,sarama 会使用最低支持的版本
config.Version = sarama.V1_1_0_0
//config.Net.SASL.Enable = true
//config.Net.SASL.User = "xxxxxxxxxxxxxx"
//config.Net.SASL.Password = "xxxxx"
//producer
proClient, err := sarama.NewClient(server, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
defer proClient.Close()
producer, err := sarama.NewAsyncProducerFromClient(proClient)
if err != nil {
log.Fatalln("failed to start Sarama producer:", err)
}
defer producer.Close()
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
//向一个topic生产消息
msg := &sarama.ProducerMessage{
Topic: topic[0],
Key: sarama.StringEncoder(t.Second()),
Value: sarama.StringEncoder("Hello World!"),
}
producer.Input() <- msg
}
}
}()
//consumer group
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(server, groupID, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
//Consume 需要在一个无限循环中调用,当重平衡发生的时候,需要重新创建 consumer session 来获得新 ConsumeClaim
if err := client.Consume(ctx, topic, &consumer); err != nil {
loge.Error(fmt.Sprintf("Error from consumer:%q err:%s", topic, err))
time.Sleep(500 * time.Millisecond)
}
//如果 context 设置为取消,则直接退出
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
//Consumer 消费者结构体
type Consumer struct {
ready chan bool
}
//Setup 函数会在创建新的 consumer session 的时候调用,调用时期发生在 ConsumeClaim 调用前
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
//Cleanup 函数会在所有的 ConsumeClaim 协程退出后被调用
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim 是实际处理消息的函数
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 注意:
// 不要使用协程启动以下代码.
// ConsumeClaim 会自己拉起协程,具体行为见源码:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
重启kafka得到短暂的不可用后报错
Request was for a consumer group that is not coordinated by this broker.
查询https://github.com/Shopify/sarama最新版本1.29.1有处理该异常

客户端升级最新版本后尝试,已经能正常使用。
网友评论