美文网首页Kafka
【kafka学习笔记】Go接入kafka

【kafka学习笔记】Go接入kafka

作者: 快乐的提千万 | 来源:发表于2022-02-21 14:39 被阅读0次

    需要借助的库

    github.com/Shopify/sarama // kafka主要的库*
    github.com/bsm/sarama-cluster // kafka消费组
    

    生产者

    package producer
    
    import (
        "fmt"
        "github.com/HappyTeemo7569/teemoKit/tlog"
        "github.com/Shopify/sarama"
        "kafkaDemo/define"
    )
    
    var (
        ProducerId = 1
    )
    
    type Producer struct {
        Producer   sarama.SyncProducer
        Topic      string //主题
        ProducerID int    //生产者Id
        MessageId  int
    }
    
    func (p *Producer) InitProducer() {
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
        config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
        config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
        // 连接kafka
        client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
        if err != nil {
            tlog.Error("producer closed, err:", err)
            return
        }
    
        p.Producer = client
        p.Topic = define.TOPIC
        p.ProducerID = ProducerId
        p.MessageId = 1
    
        ProducerId++
    }
    
    func (p *Producer) SendMessage() {
        // 构造一个消息
        msg := &sarama.ProducerMessage{}
        msg.Topic = p.Topic
        txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
            p.ProducerID, p.MessageId)
        msg.Value = sarama.StringEncoder(txt)
    
        // 发送消息
        pid, offset, err := p.Producer.SendMessage(msg)
        //_, _, err := client.SendMessage(msg)
        if err != nil {
            fmt.Println("send msg failed, err:", err)
            return
        }
        tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
            p.ProducerID, pid, offset, txt))
    
        p.MessageId++
    }
    
    func (p *Producer) Close() {
        p.Producer.Close()
    }
    
    

    消费者

    package consumer
    
    import (
        "github.com/HappyTeemo7569/teemoKit/tlog"
        "github.com/Shopify/sarama"
        "kafkaDemo/define"
    )
    
    type Consumer struct {
        Consumer   sarama.Consumer
        Topic      string
        ConsumerId int //消费者Id
    }
    
    func (c *Consumer) InitConsumer() error {
        consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
        if err != nil {
            return err
        }
        c.Consumer = consumer
        c.Topic = define.TOPIC
        c.ConsumerId = ConsumerId
        ConsumerId++
        return nil
    }
    
    //指定partition
    //offset 可以指定,传-1为获取最新offest
    func (c *Consumer) GetMessage(partitionId int32, offset int64) {
        if offset == -1 {
            offset = sarama.OffsetNewest
        }
        pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
        if err != nil {
            tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
            //That topic/partition is already being consumed
            return
        }
    
        // 异步从每个分区消费信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
            }
        }(pc)
    }
    
    //遍历所有分区
    func (c *Consumer) GetMessageToAll(offset int64) {
        partitionList, err := c.Consumer.Partitions(c.Topic) // 根据topic取到所有的分区
        if err != nil {
            tlog.Error("fail to get list of partition:err%v", err)
            return
        }
        tlog.Info("所有partition:", partitionList)
    
        for partition := range partitionList { // 遍历所有的分区
            c.GetMessage(int32(partition), offset)
        }
    }
    
    
    

    主函数

    func main() {
        tlog.Info("开始")
    
        go producer.Put()
        go consumer.Get()
    
        for {
            time.Sleep(time.Hour * 60)
        }
    }
    
    
    func Put() {
        producer := new(Producer)
        producer.InitProducer()
        go func() {
            for {
                producer.SendMessage()
                time.Sleep(1 * time.Second)
            }
        }()
    }
    
    func Get() {
    
        offest := int64(0)
    
        consumer := new(Consumer)
        err := consumer.InitConsumer()
        if err != nil {
            tlog.Error("fail to init consumer, err:%v", err)
            return
        }
        consumer.GetMessageToAll(offest)
    }
    
    
    

    具体源码可以查看:
    https://github.com/HappyTeemo7569/kafkaDemo

    相关文章

      网友评论

        本文标题:【kafka学习笔记】Go接入kafka

        本文链接:https://www.haomeiwen.com/subject/dkhilrtx.html