美文网首页
kafka消费者和消费者组

kafka消费者和消费者组

作者: wayyyy | 来源:发表于2022-01-17 01:28 被阅读0次

上文的环境,我们已经搭建了拥有3个节点的kafka集群,并创建了topic(分区数为4,副本因子为3(3个broker))

# docker ps
  CONTAINER ID        IMAGE                COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
  4056558ea9ab        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19094->9092/tcp                                kafka3
  fc3e6eb2fdea        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19093->9092/tcp                                kafka2
  dd1536016f31        wurstmeister/kafka   "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:19092->9092/tcp                                kafka1
  5e9c4f4911a0        zookeeper            "/docker-entrypoin..."   2 hours ago         Up 2 hours          2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper

# ./kafka-topics.sh --zookeeper 192.168.48.134:2181 --describe --topic test
  Topic: test TopicId: _A5YjE72QWeECBPEOFCquw PartitionCount: 4   ReplicationFactor: 3    Configs: 
    Topic: test   Partition: 0    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1
    Topic: test   Partition: 1    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2
    Topic: test   Partition: 2    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3
    Topic: test   Partition: 3    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
消费者

这里我们使用go语言,kafka 客户端使用 github.com/Shopify/sarama v1.29.1 版本

  • 生产者

    package main
    
    import (
      "fmt"
      "github.com/Shopify/sarama"
    )
    
    var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
    var topic = "test"
    
    func main() {
      config := sarama.NewConfig()
      config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
      config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
      config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
      // 连接kafka
      client, err := sarama.NewSyncProducer(kafkaAddresses, config)
      if err != nil {
          fmt.Println("producer closed, err:", err)
          return
      }
      defer client.Close()
    
      // 构造一个消息
      msg := &sarama.ProducerMessage{}
      msg.Topic = topic
      msg.Value = sarama.StringEncoder("this is a test log")
    
      // 发送消息
      pid, offset, err := client.SendMessage(msg)
      if err != nil {
          fmt.Println("send msg failed, err:", err)
          return
      }
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
    
  • 消费者

    package main
    
    import (
      "fmt"
      "github.com/Shopify/sarama"
    )
    
    var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
    var topic = "test"
    
    func main() {
        config := sarama.NewConfig()
    
        fmt.Println("clientID: ", config.ClientID)
    
        consumer, err := sarama.NewConsumer(kafkaAddresses, config)
        if err != nil {
            fmt.Printf("fail to start consumer, err:%v\n", err)
            return
        }
        partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
        if err != nil {
            fmt.Printf("fail to get list of partition:err%v\n", err)
            return
        }
        fmt.Println("partitionList:", partitionList)
    
        // 遍历所有的分区
        for partition := range partitionList {
            // 针对每个分区创建一个对应的分区消费者
            pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
            if err != nil {
                fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
                return
            }
            defer pc.AsyncClose()
            // 异步从每个分区消费信息
            go func(sarama.PartitionConsumer) {
                for msg := range pc.Messages() {
                    fmt.Println(fmt.Sprintf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value))
                }
              }(pc)
        }
    
        for {}
    }
    
消费者组
  • 生产者
    生产者代码不变

  • 消费者
    代码源自 https://github.com/Shopify/sarama/tree/v1.29.1/examples/consumergroup

    package main
    
    import (
      "context"
      "github.com/Shopify/sarama"
      "log"
      "os"
      "os/signal"
      "sync"
      "syscall"
    )
    
    var kafkaAddresses = []string{"192.168.48.134:19092", "192.168.48.134:19093", "192.168.48.134:19094"}
    var topic = []string{"test"}
    
    func main() {
      config := sarama.NewConfig()
    
      consumer := Consumer{
          ready: make(chan bool),
      }
    
      ctx, cancel := context.WithCancel(context.Background())
      client, err := sarama.NewConsumerGroup(kafkaAddresses, "111", config)
      if err != nil {
          log.Panicf("Error creating consumer group client: %v", err)
      }
    
      wg := &sync.WaitGroup{}
      wg.Add(1)
      go func() {
          defer wg.Done()
          for {
              // `Consume` should be called inside an infinite loop, when a
              // server-side rebalance happens, the consumer session will need to be
              // recreated to get the new claims
              if err := client.Consume(ctx, topic, &consumer); err != nil {
                  log.Panicf("Error from consumer: %v", err)
              }
              // check if context was cancelled, signaling that the consumer should stop
              if ctx.Err() != nil {
                  return
              }
              consumer.ready = make(chan bool)
          }
      }()
    
      log.Println("Sarama consumer up and running!...")
    
      sigterm := make(chan os.Signal, 1)
      signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
      select {
      case <-ctx.Done():
          log.Println("terminating: context cancelled")
      case <-sigterm:
          log.Println("terminating: via signal")
      }
      cancel()
      wg.Wait()
      if err = client.Close(); err != nil {
          log.Panicf("Error closing client: %v", err)
      }
    }
    
    // Consumer represents a Sarama consumer group consumer
    type Consumer struct {
      ready chan bool
    }
    
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
      // Mark the consumer as ready
      close(consumer.ready)
      return nil
    }
    
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
      return nil
    }
    
    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim   sarama.ConsumerGroupClaim) error {
      // NOTE:
      // Do not move the code below to a goroutine.
      // The `ConsumeClaim` itself is called within a goroutine, see:
      // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
      for message := range claim.Messages() {
          log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
          session.MarkMessage(message, "")
      }
    
      return nil
    }
    

相关文章

  • Kafka消费者:读消息从Kafka

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用kafka consume...

  • Kafka消费者:读消息从Kafka

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用kafka consume...

  • Kafka消费者:读消息从Kafka

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用kafka consume...

  • kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用kafka consume...

  • Kafka使用笔记(三、消费者详解)

    概念 消费者和消费组 kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会接收到...

  • kafka-第三章-消费者

    学习大纲 一、消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者...

  • Kafka Consumer

    1.消费者和消费组 消费者( Consumer )负责订阅Kafka 中的主题( Topic ),并且从订阅的主题...

  • Kafka消费者

    1 消费者概念 1.1 消费者与消费者组 应用程序--->kafka--->应用程序 生产者 主题 消费者...

  • Kafka消息单播与多播的概念介绍

    Kafka引入了消费者组概念,每个消费者都属于一个特定的消费者组,通过消费者组就可以实现消息的单播与多播。本文将详...

  • 深入学习Kafka数据消费大致流程(如何创建并使用Kafka消费

    一、概念入门 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消...

网友评论

      本文标题:kafka消费者和消费者组

      本文链接:https://www.haomeiwen.com/subject/ddnqhrtx.html