美文网首页kafka
Kafka-消费者组

Kafka-消费者组

作者: 我可能是个假开发 | 来源:发表于2023-01-29 20:39 被阅读0次

一、消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

每个消费者都存在一个消费者组中,命令行执行时底层自动生成了groupId。

消费者组原理.png
  • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
  • 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

二、消费者组初始化流程

1.coordinator

辅助实现消费者组的初始化和分区的分配。

2.coordinator的选举

groupid的hashcode值 % 50

  • 50: _consumer_offsets主题的分区数量
  • groupid:用户自己设置的组id

例:groupid的hashcode值 = 1,1% 50 = 1,那么_consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的leader。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

3.消费者组初始化流程


消费者组初始化流程.png
  • 1.每个consumer发送joinGroup请求
  • 2.coordinator选出consumer leader
  • 3.coordinator把消费topic情况发送给leader消费者
  • 4.消费者leader制定消费方案(分区分配策略)
  • 5.消费者leader把消费方案发送给coordinator
  • 6.coordinator把消费方案下发给各个consumer
  • 7.消费者会和coordinator保持心跳(默认每3s发送一次心跳),一旦超时(session.timeout.ms=45s),该消费者会被移除,触发再平衡;消费处理消息的时间过长(max.poll.interval.ms=5min),触发再平衡;

三、消费者组消费流程

消费者组消费流程.png

1.创建消费者网络连接客户端(ConsumerNetworkClient),用来和kafka集群进行通信
2.消费者发送消费请求sendFetches到ConsumerNetworkClient
3.Fetch.min.bytes:每批次最小拉取大小(默认一字节);ConsumerNetworkClient默认从broker拉取一个字节数据。
4.Fetch.max.wait.ms:一批数据最小值未达到的超时时间(默认500ms);如果步骤3配置的每批次最小拉取2字节,生产者只发送了1字节,当达到超时时间后,也会拉取数据。
5.Fetch.max.bytes:每批次最大拉取数据大小;
6.ConsumerNetworkClient调用send发送拉取数据请求。
7.通过回调方法onSuccess将结果拉取回来completedFetches(queue)。
8.消费者从completedFetches队列中拉取数据,一次默认拉取500条
Max.poll.records:一次拉取数据返回的最大消息条数(默认500条)
9.将数据反序列化
10.经过拦截器
11.处理数据

四、代码

1.单个消费者-订阅主题

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumer {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅消费主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<>();
        topics.add("topic1");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

2.单个消费者-订阅分区

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumerPartition {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 消费某个主题的某个分区数据
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("myTopic", 0));
        kafkaConsumer.assign(topicPartitions);

        //消费数据
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

3.消费者组

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumer {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅消费主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<>();
        topics.add("topic1");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

创建2个消费者,都指定同一个组:myGroup;

  • 两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。
  • 重新发送消息到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据。

极客时间《Kafka 核心技术与实战》学习笔记Day15 - http://gk.link/a/11UOV

相关文章

  • Kafka-消费者组

    一、消费者组原理 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条...

  • Kafka-消费者概述

    一、消费方式 1.pull( 拉)模式 consumer采用从broker中主动拉取数据。 Kafka采用这种方式...

  • kafka consumer

    一、消费者和消费者组 P2P所有的消费者隶属同一消费者组 Pub/Sub所有的消费者隶属不同的消费者组 二、参数 ...

  • 消息队列之Kafka-消费者

    1、消费者组 消费者(Consumer)负责订阅 Kafka 中的主题( Topic),并且从订阅的主题上拉取消息...

  • Kafka消息单播与多播的概念介绍

    Kafka引入了消费者组概念,每个消费者都属于一个特定的消费者组,通过消费者组就可以实现消息的单播与多播。本文将详...

  • Kafka使用笔记(三、消费者详解)

    概念 消费者和消费组 kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会接收到...

  • kafka-第三章-消费者

    学习大纲 一、消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者...

  • Kafka 名词解析及关系梳理

    producer生产者:负责向客户端发送消息. Consumer Group:消费者组,消费者组中有多个消费者,一...

  • spark 学习笔记

    Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...

  • Kafka2.0消费者协调器源码

    消费组和消费者 消费组和消费者是一对多的关系。 同一个消费组的消费者可以消费多个分区,且是独占的。 消费者的分区分...

网友评论

    本文标题:Kafka-消费者组

    本文链接:https://www.haomeiwen.com/subject/uguphdtx.html