美文网首页
kafka 2021-11-23

kafka 2021-11-23

作者: 9_SooHyun | 来源:发表于2021-11-23 21:11 被阅读0次

    Kafka 基础

    • Broker :一个kafka实例就是一个broker。一般一台服务器起一个kafka实例,那么可以认为一台服务器就是一个broker。一个集群由多个broker组成,又由zookeeper管理集群的元数据

    • Topic: 一个topic可以理解为一个消息队列,一个broker可以容纳多个topic

    • partition: 每个主题可以有多个分区partition,同一topic的不同partition可以分布在不同broker上,从而便于实现服务端的良好伸缩。每个partition是一个有序的队列,partition是kafka的一个有序单位。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序

      因此,要实现topic内的全局有序,就必须使用单分区;要实现局部有序,可以使用hash策略,按key将消息落在特定的分区上,但是扩容时可能会有问题

    • Replica:每个partition又可以拥有多个副本Replica,多副本分布在不同机器上,保证存储可靠。另外,多副本会选择一个作为partition leader。实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。设置N副本,则表示最多允许N-1台broker宕机。

    • message: 每个分区可以存储多条消息message

    • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

    • Producer :消息生产者,就是向kafka broker发消息的客户端。Producer向kafka推消息的时候,需要指定topic和分区选择策略(一般是轮询策略、随机策略或者hash),从而将消息存入目标分区

    • Consumer :消息消费者,向kafka broker取消息的客户端

    • Consumer Group: 可以创建多个消费实例并设置同一个group-id来区分消费组,同一个消费组可以指定一个或多个Topic进行消费:

      • 消费组自平衡(Rebalance),kafka会根据消费组实例数量和分区数量自平衡分配
      • 不会重复消费,同个组内kafka确保一个分区只会发往一个消费实例,避免重复消费
      • 高可用,当一个消费实例挂了,kafka会自动调整消费实例和分区的关系

    Golang sarama使用kafka案例

    基本概念

    • Config. Config是一个通用的配置项,用于将多个配置选项传递给 Sarama 的各个构造函数,获得各种实例

      针对消费者,配置Config.Consumer后,将Config传给消费者的构造函数

      针对生产者,配置Config.Producer

      针对消费者组,配置Config.Consumer.Group

    • Client. Client is a generic Kafka client。Client是一个接口,描述一个通用的用于连接kafka的客户端。生产者和消费者都是更高层的概念,他们通过Client向kafka推送和拉取消息

      It is safe to share a client amongst many users(users means a group of producers or a group of consumers), however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

      多个生产者可以共用一个Client向kafka推消息,多个消费者也可以共用一个Client从kafka拉消息。但推荐单个生产者/消费者配置独立的Client

    • To produce messages, use either the AsyncProducer or the SyncProducer

      通过配置sarama.Config.Producer.RequiredAcks来选择不同模式的生产者

      The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases.

      The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient

      The actual durability guarantees depend on the configured value of Producer.RequiredAcks. And for implementation reasons, the SyncProducer requires Producer.Return.Errors and Producer.Return.Successes to be set to true in its configuration.

      使用SyncProducer时,需要同时设置sarama.Config中的 Producer.RequiredAcks=1. Producer.Return.Errors=trueProducer.Return.Successes=true

    • To consume messages, use Consumer or Consumer-Group.

    // comsumer.go
    // consumer实现了Consumer接口
    type consumer struct {
        conf            *Config
        // PartitionConsumer processes Kafka messages from a given topic and partition.
        children        map[string]map[int32]*partitionConsumer 
        brokerConsumers map[*Broker]*brokerConsumer
        client          Client
        lock            sync.Mutex
    }
    
    // 而consumer的Close()、Topics()、Partitions()实际都是调用client的同名方法。例如:
    func (c *consumer) Topics() ([]string, error) {
        return c.client.Topics()
    }
    
    // sarama提供了两个方法返回一个consumer instance
    NewConsumer & NewConsumerFromClient
    
    // comsumer_group.go
    // 接口ConsumerGroup
    type ConsumerGroup interface {
     Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
     Errors() <-chan error
     Close() error
    }
    // Consume是消费者组进行消息消费的方法
    // Consume joins a cluster of consumers for a given list of topics and 
    // starts a 【blocking ConsumerGroupSession through the ConsumerGroupHandler】.
    
    // consumerGroup是接口ConsumerGroup的具体实现
    type consumerGroup struct {
        //一个consumerGroup包含了一个client和一个consumer,通常是为了继承client和consumer的方法
        client Client
        config   *Config
        consumer Consumer
        groupID  string
        memberID string
        errors   chan error
    
        lock      sync.Mutex
        closed    chan none  // type none struct{}
        closeOnce sync.Once
    
        userData []byte
    }
    
    // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
    // It also provides hooks for your consumer group session life-cycle and allow you to
    // trigger logic before or after the consume loop(s).
    // 指可以自己实现Setup(ConsumerGroupSession)和Cleanup(ConsumerGroupSession)
    //
    // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
    // ensure that all state is safely protected against race conditions.
    type ConsumerGroupHandler interface {
        // Setup is run at the beginning of a new session, before ConsumeClaim.
        // session开始前会调用setup
        Setup(ConsumerGroupSession) error
    
        // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
        // but before the offsets are committed for the very last time.
        // session结束后会调用Cleanup
        Cleanup(ConsumerGroupSession) error
    
        // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
        // Once the Messages() channel is closed, the Handler must finish its processing
        // loop and exit.
        ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
    }
    

    一个comsumergroup会在一个consumerGroupSession内进行消息的消费

    type consumerGroupSession struct {
        parent       *consumerGroup
        memberID     string
        generationID int32
        handler      ConsumerGroupHandler // 就是一个个comsumer
    
        claims  map[string][]int32
        offsets *offsetManager
        ctx     context.Context
        cancel  func()
    
        waitGroup       sync.WaitGroup
        releaseOnce     sync.Once
        hbDying, hbDead chan none
    }
    

    comsumergroup的消费动作调用链如下
    *comsumergroup.Consume()
    -> *comsumergroup.newSession()
    -> *consumerGroupSession.consume(topic, partition)
    -> ConsumerGroupHandler.ConsumeClaim(*consumerGroupSession, claim)

    consumerGroup使用样例
    t := "test_topic"
    
    // #### 实现sarama.ConsumerGroupHandler ####
    // Consumer represents a Sarama consumergroup consumer
    type Consumer struct {
    }
    
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
        fmt.Println("consumer Setup")
        return nil
    }
    
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
        fmt.Println("consumer Cleanup")
        return nil
    }
    
    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        // NOTE:
        // Do not move the code below to a goroutine.
        // The `ConsumeClaim` itself is called within a goroutine, see:
        // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
        for message := range claim.Messages() {
            if message.Topic == t {
                 // ProcessMessage
                 fmt.Println(message.Value)
            }
        }
        return nil
    }
    // #### 实现sarama.ConsumerGroupHandler ####
    
    
    // get a kafka config
    config := sarama.NewConfig()
    // 获得comsumergroup
    brokers := []string{"127.0.0.1:9092"}
    groupname := "my_group"
    consumerG, err := sarama.NewConsumerGroup(brokers, groupname, config)
    
    // Iterate over consumer sessions.
    ctx := context.Background()
    topics := []string{t}
    for {
      handler := Consumer{}
      // `Consume` should be called inside an infinite loop, when a
      // server-side rebalance happens, the consumer session will need to be
      // recreated to get the new claims
      // consumerG.Consume里面会起一个consumerGroupSession,放在for循环里面,每次都是新的session,方便响应rebalance
      err := consumerG.Consume(ctx, topics, &handler)
      if err != nil {
      fmt.Println(err.Error())
      return err
      }
    }
    

    相关文章

      网友评论

          本文标题:kafka 2021-11-23

          本文链接:https://www.haomeiwen.com/subject/rdkgtrtx.html