美文网首页
go kafka 的使用(修正消费未消费的数据)

go kafka 的使用(修正消费未消费的数据)

作者: MorningandSun | 来源:发表于2019-07-26 17:50 被阅读0次

    首先去下载sarama

    1.生产者

    package main
    
    import (
        "fmt"
    
        "github.com/Shopify/sarama"
    )
    
    func produce(value, msgType string) bool {
        config := sarama.NewConfig()
        // 等待服务器所有副本都保存成功后的响应
        config.Producer.RequiredAcks = sarama.WaitForAll
        // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        // 是否等待成功和失败后的响应
        config.Producer.Return.Successes = true
    
        // 使用给定代理地址和配置创建一个同步生产者
        producer, err := sarama.NewSyncProducer([]string{"192.168.0.121:9092"}, config)
        if err != nil {
            panic(err)
        }
    
        defer producer.Close()
    
        //构建发送的消息,
        msg := &sarama.ProducerMessage{
            Topic:     "test",                      //包含了消息的主题
            Partition: int32(10),                   //
            Key:       sarama.StringEncoder("key"), //
        }
    
        for {
            // _, err := fmt.Scanf("%s", &value)
            // if err != nil {
            //  a := err.Error
            //  fmt.Println(a)
            // }
            // fmt.Scanf("%s", &msgType)
            fmt.Println("msgType = ", msgType, ",value = ", value)
            msg.Topic = msgType
            //将字符串转换为字节数组
            msg.Value = sarama.ByteEncoder(value)
            //fmt.Println(value)
            //SendMessage:该方法是生产者生产给定的消息
            //生产成功的时候返回该消息的分区和所在的偏移量
            //生产失败的时候返回error
            partition, offset, err := producer.SendMessage(msg)
    
            if err != nil {
                fmt.Println("Send message Fail")
                return false
            }
            fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
            return true
        }
    }
    

    2.消费者

    package main
    
    import (
        "encoding/json"
        "fmt"
        "sync"
    
        "github.com/Shopify/sarama"
    )
    
    var (
        wg sync.WaitGroup
    )
    
    func Consumer(topics []string, ip []string, grouid string) {
        defer wg.Done()
        config := cluster.NewConfig()
        config.Consumer.Return.Errors = true
        config.Group.Return.Notifications = true
        config.Consumer.Offsets.Initial = sarama.OffsetNewest
    
        // init consumer
        consumer, err := cluster.NewConsumer(ip, grouid, topics, config)
        if err != nil {
            log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", grouid, err)
            return
        }
        defer consumer.Close()
    
        // trap SIGINT to trigger a shutdown
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
    
        // consume errors
        go func() {
            for err := range consumer.Errors() {
                log.Printf("%s:Error: %s\n", grouid, err.Error())
            }
        }()
    
        // consume notifications
        go func() {
            for ntf := range consumer.Notifications() {
                log.Printf("%s:Rebalanced: %+v \n", grouid, ntf)
            }
        }()
    
        // consume messages, watch signals
        var successes int
    Loop:
        for {
            select {
            case msg, ok := <-consumer.Messages():
                if ok {
    fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", grouid, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") //这里是给消费过的offset 打上标记 下次启动从这里进行消费
                }
            case <-signals:
                break Loop
            }
    }
    

    3.以上经过测试可以使用 来源 (稍微有一点点改动)https://studygolang.com/articles/17912

    相关文章

      网友评论

          本文标题:go kafka 的使用(修正消费未消费的数据)

          本文链接:https://www.haomeiwen.com/subject/phwdrctx.html