美文网首页
kafka消费者和消费者组

kafka消费者和消费者组

作者: wayyyy | 来源:发表于2022-01-17 01:28 被阅读0次

    上文的环境,我们已经搭建了拥有3个节点的kafka集群,并创建了topic(分区数为4,副本因子为3(3个broker))

    # docker ps
      CONTAINER ID        IMAGE                COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
      4056558ea9ab        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19094->9092/tcp                                kafka3
      fc3e6eb2fdea        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19093->9092/tcp                                kafka2
      dd1536016f31        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19092->9092/tcp                                kafka1
      5e9c4f4911a0        zookeeper            "/docker-entrypoin..."   2 hours ago         Up 2 hours          2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
    
    # ./kafka-topics.sh --zookeeper 192.168.48.134:2181 --describe --topic test
      Topic: test TopicId: _A5YjE72QWeECBPEOFCquw PartitionCount: 4   ReplicationFactor: 3    Configs: 
        Topic: test   Partition: 0    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1
        Topic: test   Partition: 1    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2
        Topic: test   Partition: 2    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3
        Topic: test   Partition: 3    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
    
    消费者

    这里我们使用go语言,kafka 客户端使用 github.com/Shopify/sarama v1.29.1 版本

    • 生产者

      package main
      
      import (
        "fmt"
        "github.com/Shopify/sarama"
      )
      
      var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
      var topic = "test"
      
      func main() {
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
        config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
        config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
      
        // 连接kafka
        client, err := sarama.NewSyncProducer(kafkaAddresses, config)
        if err != nil {
            fmt.Println("producer closed, err:", err)
            return
        }
        defer client.Close()
      
        // 构造一个消息
        msg := &sarama.ProducerMessage{}
        msg.Topic = topic
        msg.Value = sarama.StringEncoder("this is a test log")
      
        // 发送消息
        pid, offset, err := client.SendMessage(msg)
        if err != nil {
            fmt.Println("send msg failed, err:", err)
            return
        }
        fmt.Printf("pid:%v offset:%v\n", pid, offset)
      }
      
    • 消费者

      package main
      
      import (
        "fmt"
        "github.com/Shopify/sarama"
      )
      
      var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
      var topic = "test"
      
      func main() {
          config := sarama.NewConfig()
      
          fmt.Println("clientID: ", config.ClientID)
      
          consumer, err := sarama.NewConsumer(kafkaAddresses, config)
          if err != nil {
              fmt.Printf("fail to start consumer, err:%v\n", err)
              return
          }
          partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
          if err != nil {
              fmt.Printf("fail to get list of partition:err%v\n", err)
              return
          }
          fmt.Println("partitionList:", partitionList)
      
          // 遍历所有的分区
          for partition := range partitionList {
              // 针对每个分区创建一个对应的分区消费者
              pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
              if err != nil {
                  fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
                  return
              }
              defer pc.AsyncClose()
              // 异步从每个分区消费信息
              go func(sarama.PartitionConsumer) {
                  for msg := range pc.Messages() {
                      fmt.Println(fmt.Sprintf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value))
                  }
                }(pc)
          }
      
          for {}
      }
      
    消费者组
    • 生产者
      生产者代码不变

    • 消费者
      代码源自 https://github.com/Shopify/sarama/tree/v1.29.1/examples/consumergroup

      package main
      
      import (
        "context"
        "github.com/Shopify/sarama"
        "log"
        "os"
        "os/signal"
        "sync"
        "syscall"
      )
      
      var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
      var topic = []string{"test"}
      
      func main() {
        config := sarama.NewConfig()
      
        consumer := Consumer{
            ready: make(chan bool),
        }
      
        ctx, cancel := context.WithCancel(context.Background())
        client, err := sarama.NewConsumerGroup(kafkaAddresses, "111", config)
        if err != nil {
            log.Panicf("Error creating consumer group client: %v", err)
        }
      
        wg := &sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                // `Consume` should be called inside an infinite loop, when a
                // server-side rebalance happens, the consumer session will need to be
                // recreated to get the new claims
                if err := client.Consume(ctx, topic, &consumer); err != nil {
                    log.Panicf("Error from consumer: %v", err)
                }
                // check if context was cancelled, signaling that the consumer should stop
                if ctx.Err() != nil {
                    return
                }
                consumer.ready = make(chan bool)
            }
        }()
      
        log.Println("Sarama consumer up and running!...")
      
        sigterm := make(chan os.Signal, 1)
        signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
        select {
        case <-ctx.Done():
            log.Println("terminating: context cancelled")
        case <-sigterm:
            log.Println("terminating: via signal")
        }
        cancel()
        wg.Wait()
        if err = client.Close(); err != nil {
            log.Panicf("Error closing client: %v", err)
        }
      }
      
      // Consumer represents a Sarama consumer group consumer
      type Consumer struct {
        ready chan bool
      }
      
      // Setup is run at the beginning of a new session, before ConsumeClaim
      func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
        // Mark the consumer as ready
        close(consumer.ready)
        return nil
      }
      
      // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
      func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
      }
      
      // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
      func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim   sarama.ConsumerGroupClaim) error {
        // NOTE:
        // Do not move the code below to a goroutine.
        // The `ConsumeClaim` itself is called within a goroutine, see:
        // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
        for message := range claim.Messages() {
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
            session.MarkMessage(message, "")
        }
      
        return nil
      }
      

    相关文章

      网友评论

          本文标题:kafka消费者和消费者组

          本文链接:https://www.haomeiwen.com/subject/ddnqhrtx.html