背景:应用需要在某个事件完成后写表记录,以前这个事件有kafka消息,于是监听即可。
自己也不是特别了解,经过摸索可以正常使用,并添加了一些注释,如有不还正确请指正,上代码吧!
//导入头文件
"github.com/Shopify/sarama"
"github.com/astaxie/beego"
cluster "github.com/bsm/sarama-cluster"
var kafkaTag = "kafka message"
func StartKafka() {
// 配置
config := cluster.NewConfig()
// 设定是否需要返回错误信息
config.Consumer.Return.Errors = true
// 为成员分配主题分区的策略
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
// 这里 sarama.OffsetNewest 就从最新的开始消费,即该 consumer 启动之前产生的消息都无法被消费
// 如果改为 sarama.OffsetOldest 则会从最旧的消息开始消费,即每次重启 consumer 都会把该 topic 下的所有消息消费一次
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// 每 1 秒钟提交一次 offset
config.Consumer.Offsets.CommitInterval = 1 * time.Second
//Notifications 返回消费者期间发生的通知的通道
// 重新平衡。 如果您的配置是,通知只会通过此通道发出
config.Group.Return.Notifications = true
// 创建消费者
brokers := strings.Split(beego.AppConfig.String("kafka_url"), ",")
groupId := "kafka-groupId-01" // 消息者组,每个后台应用创建者独有,如果你使用其他后台代码中,可能会导致自己或其他人收不消息
topics := []string{"kafka-topics-01"} // 订阅主题
consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
if err != nil {
logs.Error(kafkaTag, "new consumer error: ", err.Error())
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// 接收错误
go func() {
for err := range consumer.Errors() {
logs.Error(kafkaTag, "consumer error:", err.Error())
}
}()
// 打印一些rebalance的信息
go func() {
for ntf := range consumer.Notifications() {
logs.Info(kafkaTag, "consumer notification:", ntf)
}
}()
// 循环从通道中获取消息
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
logs.Info(kafkaTag, "Topic = ", msg.Topic, "Partition = ", msg.Partition, "Offset = ", msg.Offset, "Key = ", string(msg.Key), "Value", string(msg.Value))
if msg.Topic == "kafka-topics-01" {
// 自己解析 使用json库解析msg.Value
// 然后写表
}
consumer.MarkOffset(msg, "") // 上报offset
} else {
logs.Error(kafkaTag, "监听服务失败")
}
case <-signals:
return
}
}
}
配置文件中
kafka_url = kafka地址1,kafka地址2,kafka地址3
如果不使用strings.Split进行分个割,启动会报错,报错如下:
kafka message new consumer error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
其实就是把多个地址当一个地址去启动了。
网友评论