美文网首页Kafka
高性能分布式消息系统 —— Kafka

高性能分布式消息系统 —— Kafka

作者: 小胡_鸭 | 来源:发表于2022-05-10 17:20 被阅读0次

    一、背景和简介

    1、什么是 kafaka

      对于一个初创公司,系统架构非常简单,面向客户有个数据源系统(Source System),数据会被汇聚到目标系统(Target System),只需要有一个整合环境即可。

      随着业务的发展和流量的增加,数据源系统和目标系统都会有多个,一个数据源系统可能会发送数据给目标系统,同时目标系统可能也会接收来自多个源系统的数据,需要的整合环境会膨胀,且新的系统可能使用了跟旧系统不一样的技术,这又增加了整合环境的成本,且不同系统的流量特点等级不一样,但是整合开发的成本都是一样的,从目前来看架构发展面临以下的瓶颈:

    • 1)如果有4个数据源系统,6个目标数据系统,则需要24个整合系统;

    • 2)每个整合环境可能需要用到不同的技术,例如使用不同的协议:tcp/http/ftp/jdbc,不同的数据格式:csv/json/xml;

    • 3)每个数据系统每天都会产生增量的数据

    帕累托法则,又叫二八法则,是罗马尼亚管理学家约瑟夫·朱兰提出的一条管理学原理,即:20%的人口掌握了80%的社会财富。在软件系统开发领域,同样存在二八法则,即百分之80的功能只需要付出百分之20的开发成本,而另外百分之20的功能需要付出百分之80的成本,所以如果数据源系统和目标数据系统之间如果没有中间件或中间系统,即使把所有的整合系统都开发出来,也需要付出非常高的代价。

      为了解决上面的架构问题,本质上是数据源系统和目标数据系统之间的耦合,kafka 就是为了解决这种系统耦合。

    2、为什么使用 kafka

    项目背景

      kafka 是 LinkedIn 公司基于内部应用系统解耦背景而开发的中间件,并贡献给了开源社区,目前由 Confluent 公司专门负责维护,所以也无需担心使用出了 bug 没人维护的问题。

    系统架构

      一般生产中的 kafka 是一个集群,kafka 会将一个 topic 中的数据分成多个分布在不同的节点上,并且通过副本机制,实现了分布式、带有容错结构的系统架构。

    数据吞吐量

      kafka 的集群是可以水平扩展的,按照 LinkedIn 公司内部使用的情况,可以扩展到上千台服务器,每秒千万级别的消息吞吐,标杆在这里了,实际上一般公司达不到这个量级,不过生产部署上百台服务器还是很常见的,处理每秒百万级的数据吞吐无压力。

    业务延迟

      kafka 是一个高性能的实时消息系统,平均消息延迟仅在 10ms 左右(恐怖如斯~)一般我们潜意识把消息队列当成一个缓存堆积数据的地方,然后目标数据系统再慢慢从队列中消费数据处理,实际上是当成非实时离线计算,但是kafka如此出色的性能,完成可以支撑将其当成实时消息系统来使用。

    3、使用 kafka 的场景

    (1)日志收集

      生产中的日志可以是 txt、bin 或者其他格式,kafka 可以收集各种服务的 log,再以统一接口服务的方式开放给各个系统使用(比如日志中心)。

    (2)消息系统

      解耦生产者和消费者,或者说数据源系统和目标数据系统,减少整合成本,从某种程度上看,可以看成是一个数据总线;如果消费者消费消息的速度跟不上生产者,还可以起到缓存消息的作用,避免消费者处理不过来。

    (3)用户活动跟踪

      kafka 经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动。

    (4)运营指标

      kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的几种反馈,比如报警。

    (5)流式处理

      流式处理,实际上就是实时处理,比如下载一部电影一边下一边看就是流式处理,hadoop、spark 都有对应的 streaming 组件,同时 kafka 自己也提供了 streaming。

    (6)事件源

      kafka 汇聚了来自多个数据源系统的数据,并由目标数据系统消费,对目标数据系统来说它就是数据源。

    4、实际使用性能

      LinkedIn 公司对 kafka 的日常使用情况如下:

      首先 1100+ 台服务器,除了国内几大巨头,一般公司很少需要达到这个级别,所以把心放到肚子里,贵司就这么几十台机器,顶上天了一两百台,完全是小case;31000+ Topic,350000+ 分区,相当于平均每个 Topic 有 10 个以上的分区,只有集群中有大量机器(几台机器就别折腾了),分区才有意义,一般同个 topic 的不同分区会在不同的机器上,这样 topic 的负载可以均衡到分区所在的不同节点,机器数量较少情况下,分区数可以等同于机器数,但对大集群不适用,要根据实际情况来做调整;6万亿条消息数据/天,我滴个乖乖,真是恐怖,对小公司来说一天也未必有上千万;流入流出数据是上百TB的级别,流出数据量是流入的几倍,因为流入的数据可能被多个目标数据系统锁消费,所以数据会被复制,这点有点类似 Flume 的复制 channel 选择器。

      如果我们自己使用三天两头崩,故障,如果是流量涨得太快赶紧向老板申请预算加机器,不然得好好反思琢磨是不是使用哪里出了问题需要调优。


    二、概念

    1、Topic(主题)

      如果把 kafka 看成是一个数据库的话,topic 就相当于数据库中的表,但是这个表没有任何的数据约束条件。

    【用途】topic 可以用来作为不同数据的分类,相同业务功能的数据会汇聚在同一个 topic 主题中。

    【组成】每个 topic 都是由一个或多个分区存储组成,分区(partition)是组成 topic 主题的最小存储单位。

    【使用】kafka 的消息流转模式采用发布订阅模型,topic 就是衔接发布订阅的中间数据接口,消息发送给 topic,又叫主题发布(Topic Publish)。

    2、Partition(分区)

      为了让 kafka 系统可以运行在分布式系统环境中,设计者在设计 topic 的时候采用分区(partition)的方式让每一个 topic 可以被物理地分割成一个或多个分区(partition),从而实现 kafka 消息的跨服务器存储。

      一般情况下,不同的分区会分布在不同的服务器上,每个分区对应一个分区文件,默认情况下 topic 接收到的生产者发布的消息会轮询放到不同的分区,这样可以避免单个分区(机器)负载过大。kafka 也支持自定义消息 key,并且保证相同的 key 的消息放到同一个分区,但是这样可能导致数据倾斜。

    3、Offset(偏移量)

      每个分区内部的数据都是排序状态,并且进入分区都消息都会分配到一个分区内自增的唯一id,这个编号又被称为偏移量(offset)

      每个分区内的偏移量都是有序且互相独立的,但是默认情况下生产者发布的多条消息无法被有序消费,因为可能会被放到不同分区,为了保证消费的有序,生产者可以给消息定义相同的 key,这样就保证消息保存到同个分区中。

      kafka 的数据默认值保存7天,过期则删除,生产可根据实际情况调整,不必担心自增序号用完怎么办,这是杞人忧天的事情。

      对于追求短时延和吞吐量的数据处理组件来说,一般都是要求数据只读,为了保证数据安全这些组件一般会有数据副本机制,如果支持数据的修改,就需要做好数据副本的同步、读取不同副本数据的一致性、不同副本所在机器之间的通讯等,处理复杂度大增,消耗集群资源,还会影响数据的吞吐量得不偿失。

      在实际应用中,车辆GPS跟踪系统是一个典型的场景。

    假如有一个货车车队,每台货车会定期发送GPS数据给kafka系统,kafka 中有个名叫 trucks_gps 的 topic,用来存放所有车辆的 gps 信息。每台货车每隔 20s 发送一笔 gps 数据,每条消息包括车辆 id 编号和 gps 经纬度坐标,在 kafka 系统中创建 10 个分区来承载此业务(提供10台服务器)。

      对于 kafka 来说,每辆车定时发送的 GPS 数据就是数据源,采集到数据后,位置仪表盘应用可以消费 topic 中的信息来渲染地图,监控平台可以看到所有车辆的统计数据。

    4、brokers(服务器)

      kafka 中的服务器称为 broker,集群由多台 broker 组成,也叫 brokers。每个 brokers 都有唯一的 id,topic 的不同分区一般是分布在不同的 broker 上,达到负载均衡的效果,提高集群处理海量数据的处理速度。

      kafka 具有 Bootstrap 功能,即客户端无需知道整个集群所有机器信息,只需要连接其中一台即可,它会变成向导介绍集群所有成员给接入程序,从而保证客户端可以连接整个集群。

      kafka 集群是主从架构,依赖 zk 协调选举出一个 broker 作为主节点,所以集群节点至少要有三台且机器数量最好是奇数避免脑裂。

    5、replication(复制)

      为了保证数据安全,每个主题分区需要有多个副本,这个参数称为复制因子(replication factor),通常在 2-3 之间。复制因子如果是 2 还是会出现一定的数据风险,高安全模式下 3 是首选,如果有 broker 出现故障,额外的 broker 服务器会接替相关 topic 的数据。

      如下,Topic-A 有两个副本,并且设置副本因子为2。

      如果102服务器故障,101、103服务器依然能提供完整的数据服务,任何一台服务器故障都不影响整体服务。

      每个分区都会有一个Leader,因为分区之间涉及到数据复制,生产者发布的数据写入leader分区,再同步到follower分区。

      任何一个时间点,在 kafka 集群中,针对某个分区,只会有一台 broker ,这台服务器被称为分区leader,这里要注意区分集群leader和分区leader的区别,两者是不同的概念。对任一分区来说,都会有一个 Leader 和若干个 Follower,也称为 ISR(in-sync-relica)(在 HDFS 中同样有 ISR类似的概念)。

      推荐的 kafka 复制因子是3,在实战中,使用3的好处如下:

    • 允许整个 Kafka 集群有一台服务器发生故障
    • 同时允许另一台服务器进入维护模式,例如例行安全升级或维护



    三、安装配置

    1、Linux 环境

      安装 Java

    # 解压安装
    sudo mkdir /usr/java
    sudo tar zxvf jdk-8u161-linux-x64.tar.gz  -C /usr/java/
    sudo ln -s /usr/java/jdk1.8.0_161/ /usr/java/default
    
    # 配置环境变量
    sudo nano /etc/profile 
    export JAVA_HOME=/usr/java/default
    export CLASSPATH=.:${JAVA_HOME}/lib
    export PATH=${JAVA_HOME}/bin:$PATH
    
    # 激活环境变量
    source /etc/profile
    
    # 服务器上有多个java环境。此句是设置默认java环境的位置
    sudo update-alternatives --install /usr/bin/java java /usr/java/default/bin/java 300
    sudo update-alternatives --install /usr/bin/javac javac /usr/java/default/bin/javac 300
    sudo update-alternatives --config java      
    sudo update-alternatives --config javac 
    

      安装 kafka

    # 解压安装
    tar -zxvf kafka_2.12-2.3.0.tgz -C /app
    mv /app/kafka_2.12-2.3.0 /app/kafka
    
    # 配置环境变量
    sudo nano /etc/profile 
    export KAFKA_HOME=/app/kafka
    export PATH=${KAFKA_HOME}/bin:$PATH
    
    # 激活环境变量
    source /etc/profile
    
    # 建立kafka目录
    mkdir /app/kafka/data
    mkdir /app/kafka/data/zookeeper
    mkdir /app/kafka/data/kafka
    

      配置 kafka 自带 zookeeper

    vi /app/kafka/config/zookeeper.properties
    dataDir=/app/kafka/data/zookeeper
    

      配置 kafka 参数文件

    vi /app/kafka/config/server.properties
    log.dirs=/app/kafka/data/kafka
    

      启动 zookeeper、kafka 服务

    # 启动 zk
    cd /app/kafka
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # 启动 kafka
    bin/kafka-server-start.sh config/server.properties &
    

      查看进程

      后台启动

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    



    2、Windows 环境

      安装 JDK,配置环境变量,不赘述。

      安装 kafka,配置环境变量

      在安装目录下创建目录 data/zookeeper、data/kafka

      修改配置文件

    # zookeeper.properties
    dataDir=D:/soft/kafka_2.12-2.3.0/data/zookeeper
    
    # server.properties
    log.dirs=D:/soft/kafka_2.12-2.3.0/data/kafka
    

      启动服务

      查看进程




    3、Confluent 自动安装

      Confluent 是平台化的工具,封装了 kafka,让我们可以更方便地安装、使用和监控 kafka,类似于 CDH 对于 hadoop。

      Confluent 是由 LinkedIn 开发出 apache kafka 的团队成员新成立的公司,其产品也是围绕 kafka 做的,基础架构如下:

      Confluent 对于 kafka 核心的功能是不收费的,针对 kafka 服务外围做的监控控制中心和其他功能是收费的,整合包中包含了很多组件:

    • Confluent Platform:包括更多的工具和服务,使构建和管理数据流平台更加容易。

    • Confluent Control Center(闭源):管理和监控 kafka 最全面的 GUI 驱动系统。

    • Confluent Kafka Connectors(开源):连接SQL数据库、Hadoop、Hive。

    • Confluent Kafka Clients(开源):C/C++、Java 等编程语言客户端。

    • Confluent Kafka REST Proxy(开源):允许一些系统通过 HTTP 和 Kafka 之间发送和接收消息。

    • Confluent Schema Registry(开源):帮助确定每一个应用读写数据到 kafka 中时,使用正确的schema。

    • Confluent KSQL(开源):KSQL 降低了流处理世界的入口,提供了一个简单而完全交互的 SQL 接口,用于处理 kafka 中的数据,对于数据开发分析人员来说,不再需要编写代码。

    Confluent 官网下载地址:https://www.confluent.io/

      不选择云托管,下载软件包

      填写邮箱下载

      点击 Quick Start,查看手册(https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html

      安装配置

    # 解压
    tar -zxvf confluent-5.4.0-2.12.tar.gz -C /app
    mv /app/confluent-5.4.0 /app/confluent
    
    # 配置环境变量
    vi /etc/profile
    export CONFLUENT_HOME=/app/confluent
    export PATH=${CONFLUENT_HOME}/bin:$PATH
    
    # 激活配置
    source /etc/profile
    
    # 测试配置是否生效
    confluent --help
    
    # 使用Confluent Hub客户端安装Kafka Connect Datagen源连接器。 该连接器仅用于演示目的而生成模拟数据,不适合生产。
    $CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
    
    # 解压confluent_latest_linux_amd64.tar.gz
    tar -zxvf confluent_latest_linux_amd64.tar
    cd confluent/
    # 启动confluent
    # 这里启动可能会报错,再执行一次启动即可
    ./confluent local start
    
    # 关闭confluent
    ./confluent local stop
    

      可以看到相关的进程

      打开web控制台,访问端口是9021





    四、生产者和消费者

    1、生产者

      生产者是数据源,会向 Topic 写入数据,由于 Topic 会分为多个分区,所以实际是写入某个分区,默认情况下具体写入哪个分区是轮询的,保证负载均衡和避免数据倾斜。

      为了保证数据安全,分区会在不同的 broker 上有副本(ISR),副本数量由复制因子(replication factor)决定。其中客户端直接写入数据的分区为 Leader,并负责将数据同步到 ISR,当分区 Leader 故障时,zk 就会从 ISR 中选举出一个新的 Leader,生产者自动连接到新的 Leader 写入消息数据。

      生产者写入 Topic 时,kafka 有一个 acks 数据回执参数,它有不同的值对应不同的情况:

    • acks = 0:kafka 写入时可能会故障,而生产者不等待确认消息,这可能导致数据的丢失,但可以大幅提升性能,对于一些要求不是很高允许丢失一点数据的场景,比如日志,可以使用这种模式。
    • acks = 1:消息写入 Topic 的分区 Leader 即返回确认消息,这时数据可能还没同步到 ISR,如果同步之前分区 Leader 故障,也有可能导致数据丢失,这个模式兼顾数据安全和性能的。
    • acks = all:等待分区 Leader 和 ISR 都写入数据了才返回确认消息,如果 Leader 故障,则认为消息写入失败,需要生产者重新发布消息,这种模式下安全性最高,但是会增加数据延迟,降低吞吐量。


    2、消息 key

      默认情况下,生产者向 Topic 写入数据时是轮询写入不同分区的,如果要保证消费者消费消息的顺序跟生产者发布消息的顺序是一样的,就要让数据写入同个分区,针对这个需求,kafka 提供了 Message Key (消息键)的特性。

      生产者在发送数据时可以选择发送一个特定的 key 作为额外的一个数据项,这个 key 可以是字符串或数字。

    【应用场景】

      在车辆跟踪系统的案例中,所有车辆每隔20s发送GPS监控数据到kafka的Topic中,为了保证按顺序接收车辆的监控信息,每辆车发送数据时都需要有一个唯一的消息key,假设每辆车都会一个唯一表示的 truckId,则这个id就适合作为消息key。

    3、消费者

    (1)消费者

      消费者(API)根据 Topic 名字从 kafka 读取数据,Topic 有多个分区,所以消费者需要知道 Topic 都有哪些分区并且这些分区都在哪些 broker 上,最后才从 broker 中读取数据。

      如果读取数据的 broker 故障了,kafka 会自动进行故障转移,从分区 ISR 中选举新的分区 Leader,然后消费者从新的分区 Leader 所在 broker 上读取数据。因为消费者消费消息时,分区的偏移量会移动并且作为元数据保存在 kafka 集群中,所以当它从新的分区 Leader 上读取数据时不会重复消费数据,而是从上一次记录的偏移量的位置开始继续读取。

    (2)消费组

      在消费者侧,为了提升处理效率,可能会有多个消费者,如果想要这多个消费者不重复消费同一条数据,就要使用消费者组(Consumer Group)

      消费者组中的每个消费者只会读取 Topic 的其中一个分区。如果组中消费者数量少于分区数,则有部分消费者要处理多个分区;如果组中消费者的数量大于分区数,则有部分消费者会进行空闲待机状态;最理想的情况是,组中消费者数量等于分区数,这样处理效率可以达到最大化。

    (3)偏移量

      任何一个消费者连上 kafka 后,都会被赋予一个特定的 Offset 偏移量,用于区分到底哪个消费者读到了哪些数据。所有消费者的 offset 提交数据都会被提交到一个特殊的主题 Topic 中,这个 Topic 主题的名字叫做 __consumer_offsets。

      在一个消费组中的消费者收到了相应的数据后,相关程序会提交当前数据处理的偏移量给 kafka 服务器,这样保证相关数据被记录到。由于存在提交机制,所以如果有一台服务器或消费者客户机故障,没提交的数据在下一次系统恢复后继续重新获取数据,不会造成数据的丢失。

    (4)消息传递语义

      消费者的消息传递语义(Delivery semantics)指消费者收到数据和提交偏移量的时机遵循的模式,一共有3种模式:

    • at most once(最多一次):消费者收到数据后马上提交偏移量,如果采用这种模式,会导致数据可能在客户端程序处理时发生意外,而此时 kafka 认为数据已取走,使得客户端程序重启恢复后,kafka 中的数据丢失。

      • 比如消费者应用收到消息后要记录到DB中,如果提交了偏移量,但是记录DB前程序故障,实际上就没记录到DB中,再次恢复程序时应该是再次消费刚才没记录成功的消息,但由于偏移量已经提交了,读取 kafka 消息时只会从原来的数据之后的偏移量开始读取,造成数据丢失的效果。
    • at lease once(最少一次):客户端处理完后才提交偏移量,但是如果客户端程序在处理完之后提交之前崩溃或者没有正确提交,会导致程序重启后同一笔数据会二次处理(即发生重复数据读取问题),所以客户端程序要做好自己的数据去重问题,比如保证程序是幂等的。

      处理比较重要的数据,倾向于使用这种模式:

      【优点】避免数据丢失。

      【缺点】可能重复处理数据,程序需要做到幂等处理。

    • exactly once(精确传递一次):消息会被且只会被处理一次。这种模式只会出现在 kafka 和 kafka 的数据交互过程中,例如使用 Kafka Streaming API 进行数据处理时。对于 kafka 和非 kafka 系统间的数据传递,只能选择前两种模式,且 at lease once 是首选模式。

    4、Broker 发现与 ZK

    (1)Broker 发现

      在一个 Kafka 系统中,每一台 Kafka 的服务器被称为 Broker 服务器,当客户端程序连上 Kafka 的时候,需要指定相应的服务器信息,这个服务器被称作 "Bootstrap Server"

      客户端系统连接 Kafka 的系统时,只需要连接其中一台服务器即可找到 Kafka 集群中的所有机器,这个过程是 kafka 程序API自动实现的。

      集群中的每台 Broker 都可以看成是 Bootstrap Server,每一台 Broker 服务器都认识集群中的所有其他 Broker、Topic 和 Partitions(meta元数据)。

    (2)ZK 的作用

      zookeeper 在 kafka 中的作用列举如下:

    ① 管理所有的 Broker 服务器,了解集群中都有哪些服务器(保持一个服务器列表);

    ② Topic 分区 Leader 选举;

    ③ 利用 watch 监视机制维持一个消息通知功能,例如 Broker 的新增、宕机、重启,Topic 的新增、删除等;

      kafka 依赖 zk 来管理集群元数据,如果 zk 故障,会导致 kafka 不可用。

      kafka 可以看成是 zk 的客户端应用,与 zk 的连接是随机产生的,不需要一一对应,zk 会维持一个 broker 连接的客户端会话,并且即使连接的 zk server 故障,只要在会话超时时间内将连接成功故障转移到另一个 zk server 上,就不影响 kafka 的使用。

      在 zk 内部,一般使用奇数台服务器来组成集群,这样是为了避免选举时发生脑裂,很多分布式系统都依赖其选举功能。zookeeper 在 kafka < 0.10 版本时还负责存储 _consumer_offset 数据,因为这个时期的 kafka 集群数量顶多一两百台,将消费者偏移量数据直接保存在 zk 中很方便,但随着 kafka 的演化,新版本中消费者偏移量是直接保存在 kafka 服务器上,因为 zk 不适合大数据量的操作。

      【专栏】关于 zookeeper 的详细介绍可参考:zookeeper


    五、Kafka Cli

    1、Topic 相关

    # 创建一个主题,需要指定zk、主题名、分区数、复制因子
    kafka-topics.sh --zookeeper localhost:2181 \
    --create --topic first_topic --partitions 2 --replication-factor 1
    
    # 查看kafka中所有主题列表
    kafka-topics.sh --zookeeper localhost:2181 --list
    
    # 查看某个主题的详情
    kafka-topics.sh --zookeeper localhost:2181 --topic first_topic --describe
    
    # 删除分区
    kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --delete
    
    # 修改主题(只能修改分区数,不能修改复制因子)
    kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --alter --partitions 5 
    

      演示

      每个分区对应 kafka 数据目录下的一个文件夹,当删除主题时,默认情况下对应的文件夹并没有马上被删除,而是被重命名标识为要被删除的数据,如果要删除主题时马上删除对应的磁盘文件目录,则需要设置参数 delete.topic.enable 为true。

      如果被删除的主题被重新创建,则会马上删除之前被标识为要删除的文件目录,并创建新的主题分区文件目录。

      修改主题时,不允许修改复制因子




    2、生产者相关

    # 发送数据到指定主题
    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic
    
    # 指定 acks 参数
    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all
    

      从日志文件中可以看到对应的消息数据

      kafka 的 server.properties 配置文件中,有个参数表示默认要创建多少个分区

    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    

      但是在显示创建主题是应该必须要指定分区数,该参数只有在生产者发送数据给不存在的主题,触发主题自动创建时才生效。

      现在修改上述配置参数为4,并重启 kafka 服务,往不存在的主题发送数据,可以看到生产者第一次发送数据时因主题不存在会有一条报警日志,然后自动创建主题分区文件目录,查看数据目录可以看到自动创建了4个分区目录。




    3、消费者相关

    # 从指定主题消费数据
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic 
    

      默认情况下,消费者只会从消费者启动后生产者新发送到 topic 的数据开始消费,如上图中,消费者2是在生产者发送完前4条数据后才启动的,则只会消费到第五第六条数据,如果想从头开始消费,可以指定 --from-beginning 参数。

      在实际生产中,应用服务器可能会因版本更新短暂停机,如何保证应用容器更新版本后再次连接上 kafka 消费数据时从上一次开始消费起?答案是使用 --group 参数,即使用消费组。

      这里可能还会存在一点问题,消费组并没有从头开始消费,假如需求是消费 topic 中的所有数据,则应该配合 --from-beginning 使用。




    4、消费组和 offset 相关

      将 first_topic 删除并重新创建,分区数为 2

    kafka-topics.sh --zookeeper 127.0.0.1:2181 \
    > --topic first_topic \
    > --partitions 2 \
    > --replication-factor 1 \
    > --create
    

      启动一个生产者发送数据,启动一个消费者消费数据,消费者组为 new_group,此时主题分区数大于消费者数,因此 topic 的数据会全部被消费者读取。

      启动新的消费者,消费组相同,这时 topic 的数据一个分区对应一个消费者,消费组内负载均衡

      再次启动一个 new_group 组的消费者,此时消费者数量大于分区数,这时会有一个消费者处理空闲状态

      当正在工作的消费者故障时,空闲的消费者就接手实现故障转移



      消费者组相关命令

    # 查看当前都有哪些消费者组连接 kafka
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
    
    # 查看消费者组连接情况
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group new_group
    
    # 重置偏移量(到起点)
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
    --topic first_topic --group new_group \
    --reset-offsets --to-earliest --execute
    
    #修改偏移量 +2
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --topic first_topic --group new_group \
    -reset-offsets --shift-by 2 --execute
    
    #修改偏移量 -2
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --topic first_topic --group new_group \
    --reset-offsets --shift-by -2 --execute
    

      查看消费者组连接情况时,可以看到每个消费者对应哪个分区,以及对该分区数据消费的偏移量,可以将偏移量重置,则消费者组将从重置后的位置重新读取数据。

      关掉所有消费者,将偏移量往前移两位,再重新启动一个消费者,可以看到该消费者会读取后面四条数据,并在分区内逐个有序读取。





    六、kafka tool

      kafka tool 是一个图形化工具,可以通过图形工具进行 kafka 数据的调试,让使用者更好地了解整个 kafka 系统的运作模式,包括生产者、消费者、偏移量等概念。

      下载安装打开,自动提示连接配置

      可以方便地查看集群里所有的 brokers、topics、consumers

      使用工具给 first_topic 添加数据,31 是字符1的ASCII码,所以给分区0添加 "313131313131" 就是写入数据 "111111"。

      查看 new_group 消费者组的偏移量,可以看到分区1当前偏移量落后了1条数据

      启动一个 consumer,马上读取回显数据

      再看 new_group 分区0的偏移量,Lag 更新为了 0





    七、Kafka API

    1、生产者 API


      在生产者模型中,生产者发送的记录,必须指定主题和值,然后序列化为二进制数据,再经过分区器决定要发送到主题的哪个分区;分区key和指定分区器是可选的,一旦指定了分区key,相同key的数据会进入同一分区,而分区器决定了具体是哪个分区,如果不显式指定分区器,则默认是使用轮训的策略。

      数据在生产者端,并不会马上发出去,而是以主题分区为粒度,堆积一些批次,再发送,如果数据量比较少,则会在缓存一定时间阈值后发送,批处理可以分摊单条消息发送的开销,并提高数据吞吐量。

    (1)发送模式

    同步发送

      同步就是一个任务的完成需要依赖另一个任务,只有等待被依赖的任务完成后,依赖的任务才能算完成。这是一种可靠的任务序列,要么都成功,要么都失败,两个任务的状态可以保持一致。

      最基础的样例代码如下

    package api.producer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class ProducerDemo {
        
        private static final String BOOTSTRAP_SERVERS = "192.168.72.131:9092";
        private static final String TOPIC = "example01";
        
        private static Producer<String, String> createProducer() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG, "0");
            return new KafkaProducer<String, String>(props);
        }
        
        // 同步发送,不等待 
        static void runProducer() {
            final Producer<String, String> producer = createProducer();
            //final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Hello world!");
            final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "111");  // 313131 
            producer.send(record);
            producer.flush();
            producer.close();
            System.out.println("Send OK!!!!");
        }
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            System.out.println("Hello world!!!!");
            runProducer();
        }
    }
    

    【说明】

      首先要创建一个 Producer 对象,并通过 Properties 对象设置参数属性,必备的几个参数有连接的 kafka 服务器 bootstrap.servers 、消息key-value 的序列化器、acks 参数,然后创建 ProducerRecord 对象,并通过 Producer 发送出去,send() 方法有很多重载方法,这里使用最简单的发送后不管,适用于生产者只发送不关心 kafka 是否已接收到的场景。

      如果发送完要同步等待发送结果,并且可以从发送结果中了解数据被发送到哪个分区、在分区中的偏移量是多少,则可以参考以下代码:

    // 同步等待结果
    static void runProducer2() throws InterruptedException, ExecutionException {
        final Producer<String, String> producer = createProducer();     
        final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "333");  // 333333
        
        try {
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());            
        } finally {
            producer.flush();
            producer.close();
        }
    }   
    

      执行结果

    Hello world!!!!
    Send OK!!!!
    record (values = 333 ) partition=0  offset=-1
    

      还需要设置 ProducerConfig.ACKS_CONFIG 参数值为1或all,否则不会返回分区偏移量。

    异步发送

      异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

      异步发送指,生产者发送数据的线程发送完就不等返回记录元数据,而是在回调函数中处理,回调函数对象可作为参数传递给 send() 方法,当 kafka 返回消息回执时,就会触发调用回调对象方法。

      代码如下:

    package api.producer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class ProducerAsyncDemo {
    
        private static final String BOOTSTRAP_SERVERS = "192.168.72.131:9092";
        private static final String TOPIC = "example01";
        
        private static Producer<String, String> createProducer() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            return new KafkaProducer<String, String>(props);
        }
        
        static void runProducer() throws InterruptedException, ExecutionException {
            final Producer<String, String> producer = createProducer();     
            final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "444");  // 343434
            
            try {
                for (int i = 0; i < 100; i++) {
                    producer.send(record, new DemoCallback("444"));
                }
                System.out.println("Finish work!!!!");      
            } finally {
                producer.flush();
                producer.close();
            }
        }       
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            System.out.println("Hello world!!!!");
            runProducer();
        }
        
    }
    
    // 异步回调
    class DemoCallback implements Callback {
    
        private String message;
        
        public DemoCallback(String message) {
            this.message = message;
        }
        
        // 
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", message, metadata.partition(), metadata.offset());           
        }
    }
    

      从执行结果可以看到,发送线程打印的日志先于回调对象的调用。

    (2)序列化

      发送给 kafka 的数据如果不是简单地字符串,需要定义数据格式,这个过程被称为数据自定义序列化。对于简单的数据类型,kafka 自带了序列化器,而对于应用自定义的数据对象,就需要自定义序列化器了。

      常见的自定义序列化格式有 JSON、Avro、Thrift、ProtoBuf 等,通常序列化器和反序列化器是成对出现的,前者负责将数据对象转化为二进制数据流,后者则读取二进制流并恢复为数据对象以便在应用系统中使用。

    Apache Avro

      Apache Avro 就是一个高效的数据序列化系统,设计用于支持大批量数据交换的应用。

    【特点】

      支持二进制序列化方式,可以方便、快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。

    【模式】

      Avro 依赖模式(Schema)来实现数据结构定义。比如 Java 的 POJO(只定义属性和 getter/setter 的类)就是一种模式,定义了每个实例的结构,可以包含哪些属性。可以根据类来产生任意多个实例对象,对实例序列化操作时必须需要知道它的基本结构,也就需要参考类的信息。

    【编码方式】

      Avro 支持两种序列化编码方式:二进制编码和 JSON 编码。使用二进制编码会高效序列化,序列化后得到的结果会比较小;而 JSON 一般用于调试系统或是基于 WEB 的应用。

      定义 kafka 序列化器需要实现 org.apache.kafka.common.serialization.Serializer 接口,现在定义一个 Customer 的 Java 类,代码如下:

    package api.producer.serialization;
    
    import java.util.Date;
    
    public class Customer {
        
        private String name;
        private String role;
        private String city;
        private String Street;
        private Date in_date;
        private Integer Age;
        
        public Customer(String name, String role, String city, String street, Date in_date, Integer age) {
            super();
            this.name = name;
            this.role = role;
            this.city = city;
            Street = street;
            this.in_date = in_date;
            Age = age;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getRole() {
            return role;
        }
    
        public void setRole(String role) {
            this.role = role;
        }
    
        public String getCity() {
            return city;
        }
    
        public void setCity(String city) {
            this.city = city;
        }
    
        public String getStreet() {
            return Street;
        }
    
        public void setStreet(String street) {
            Street = street;
        }
    
        public Date getIn_date() {
            return in_date;
        }
    
        public void setIn_date(Date in_date) {
            this.in_date = in_date;
        }
    
        public Integer getAge() {
            return Age;
        }
    
        public void setAge(Integer age) {
            Age = age;
        }
    }
    

      定义一个序列化器

    package api.producer.serialization;
    
    import org.apache.kafka.common.serialization.Serializer;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    // Customer 对象的序列化器
    public class CusSerilizer implements Serializer<Customer> {
    
        // 序列化方法将POJO转化为字节数组
        @Override
        public byte[] serialize(String topic, Customer data) {
            byte[] retVal = null;
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                retVal = objectMapper.writeValueAsString(data).getBytes();
                System.out.println("Using CusSerilizer!!!");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return retVal;
        }
    
    }
    

      使用序列化器需要在创建 Producer 对象时指定属性 value.serializer ,测试代码如下:

    package api.producer.serialization;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import com.github.javafaker.Faker;
    
    
    // 发送Customer的序列化给kafka的示例程序
    public class SendCustSerDemo {
        
        private static final String BOOTSTRAP_SERVERS = "192.168.72.131:9092";
        private static final String TOPIC = "customer01";
        
        private static Producer<String, Customer> createProducer() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CusSerilizer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            return new KafkaProducer<String, Customer>(props);
        }
        
        static void runProducer(Customer userdata) throws InterruptedException, ExecutionException {
            final Producer<String, Customer> producer = createProducer();       
            final ProducerRecord<String, Customer> record = new ProducerRecord<String, Customer>(TOPIC, userdata);  // 333333
            
            try {
                RecordMetadata metadata = producer.send(record).get();
                System.out.println("Send OK!!!!");
                System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());    
                System.out.println();
            } finally {
                producer.flush();
                producer.close();
            }
        }       
        
        public static void main(String[] args) throws ParseException, InterruptedException, ExecutionException {
            System.out.println("Hello world!!!!");
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
            Date date = sdf.parse("2020-05-10");
            Customer customer = new Customer("david", "manager", "new york", "ccc", date, 35);
            runProducer(customer);
            System.out.println("Send OK");
        }
        
    }
    
    

      从打印日志可以看出,发送的消息时一个对象,经过序列化之后,分配的分区是0,在分区中的偏移量是3

      查看 kafka tool,可以看到对应的数据

    (3)自定义分区

    消息键(message key)

      kafka 通过 message key 来保证,相同 key 的数据进入同一分区,从而保证了有序性。

    分区器(partitioner)

      即使不显式指定分区器,kafka 也会有默认分区器来决定消息要进入哪个分区。

    • 对于不带消息键的情况 :kafka 针对同一个 producer 第一次发送数据时会生成一个随机数,然后随机数对主题分区数取余即为发送分区,后面的数据发送在此随机数的基础上累加,并继续取余计算分区,达到轮询的效果。
        // 轮询均匀发布到所有分区
        private static void runProducer() throws InterruptedException, ExecutionException {
            Producer<String, String> producer = createProducer();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "aaaa");
            
            try {
                // 不使用分区器的情况下,默认消息会均衡地发到各个分区
                for (int i = 0; i < 12; i++) {
                    RecordMetadata metadata = producer.send(record).get();
                    System.out.println("Send OK!!!!");
                    System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());    
                    System.out.println();
                }
            } finally {
                producer.flush();
                producer.close();
            }
            
        }
    


    • 对于带消息键的情况:kafka 先将消息键转化为字节数组,然后针对字节数组使用 mermer2 哈希算法得到一个整数值,再将整数对分区数取余得到发送分区,所以相同消息键的数据总会被发到同一分区。
        // 带 message key 的消息只会发到同个分区
        private static void runProducer2() throws InterruptedException, ExecutionException {
            Producer<String, String> producer = createProducer();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "gps", "bbbb");
            
            try {
                // 不使用分区器的情况下,默认消息会均衡地发到各个分区
                for (int i = 0; i < 12; i++) {
                    RecordMetadata metadata = producer.send(record).get();
                    System.out.println("Send OK!!!!");
                    System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());    
                    System.out.println();
                }
            } finally {
                producer.flush();
                producer.close();
            }   
        }
    

    源码如下(DefaultPartitioner):

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    

      现在自定义一个分区器,需求如下:

    1)消息不带键时抛出异常
    2)消息为 "China" 时直接指定分区2

      实现代码如下:

    package api.producer.partition;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    
    public class CustomerPartitioner2 implements Partitioner {
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
        
        @Override
        public void configure(Map<String, ?> configs) {}
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            System.out.println("In CustomerPartitioner Class");
            
            int p;
            
            if ((keyBytes == null) || (!(key instanceof String)))
                throw new InvalidRecordException("All messages must have key");
            
            if ("China".equals(String.valueOf(key))) {
                p = 2;
                System.out.println("Using special partition");
            } else {
                p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions) ;
            }
            
            System.out.println("Key = " + (String) key + " Partition = " + p);
            return p;     
        }
        
        @Override
        public void close() {}
    }
    

      测试代码:

    private static void runProducer3() throws InterruptedException, ExecutionException, ParseException {
        Producer<String, Customer> producer = createProducerWithP();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
        Date date = sdf.parse("2020-05-10");
        ProducerRecord<String, Customer> record = new ProducerRecord<String, Customer>(TOPIC, "gps", new Customer("david", "manager", "new york", "ccc", date, 35));
        ProducerRecord<String, Customer> record2 = new ProducerRecord<String, Customer>(TOPIC, "gps", new Customer("tony", "hr", "new york", "ccc", date, 30));
        ProducerRecord<String, Customer> record3 = new ProducerRecord<String, Customer>(TOPIC, "China", new Customer("gasbi", "boss", "new york", "ccc", date, 40));
        ProducerRecord<String, Customer> record4 = new ProducerRecord<String, Customer>(TOPIC, "China", new Customer("susam", "waiter", "new york", "ccc", date, 20));
        ProducerRecord<String, Customer> record5 = new ProducerRecord<String, Customer>(TOPIC, new Customer("susam", "waiter", "new york", "ccc", date, 20));
        
        try {
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());    
            System.out.println();       
            
            RecordMetadata metadata2 = producer.send(record2).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record2.value(), metadata2.partition(), metadata2.offset()); 
            System.out.println();   
            
            RecordMetadata metadata3 = producer.send(record3).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record3.value(), metadata3.partition(), metadata3.offset()); 
            System.out.println();   
            
            RecordMetadata metadata4 = producer.send(record4).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record4.value(), metadata4.partition(), metadata4.offset()); 
            System.out.println();   
            
            RecordMetadata metadata5 = producer.send(record5).get();
            System.out.println("Send OK!!!!");
            System.out.printf("record (values = %s ) partition=%d  offset=%d", record5.value(), metadata5.partition(), metadata5.offset()); 
            System.out.println();               
        } finally {
            producer.flush();
            producer.close();
        }   
    }
    

      测试结果:

      前两条数据被发送到分区3,后两条数据被指定发送到分区2,最后一条数据没有指定 key 直接抛出异常


    2、消费者 API

      kafka 消费者的工作流如上图所示,应用侧先创建一个消费者对象,从 bootstrap server 获取元数据(包括集群中都有哪些 broker、有哪些 topic、每个 topic 都有几个 partitions、每个 partition 的 leader 和 ISR 分别在哪些 broker 上、消费者的分区偏移量的位置等),然后创建网络连接连接到 kafka 服务器,并通过消费者协调器来协调(比如分区 Leader 故障会触发选举,选出新的 Leader 后还要通知消费该分区的消费者,这些需要协调器来完成对接),最终将数据从服务器批量拉取到本地并消费处理。

    (1)消费模式

    简单的消费者

        // 往指定主题发布指定数量的消息
        public static void pubMsg(String topic, String message, int count) throws InterruptedException, ExecutionException {
            Producer<String, String> producer = createProducer();   
            try {
                for (int i = 0; i < count; i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
                    RecordMetadata metadata = producer.send(record).get();
                    System.out.println("Send OK!!!!");
                    System.out.printf("record (values = %s ) partition=%d  offset=%d", record.value(), metadata.partition(), metadata.offset());                    
                }           
            } finally {
                producer.flush();
                producer.close();
            }
        }
    
    // 简单的消费者例子
    public class ConsumerDemo {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";
        private final static String TOPIC = "consumer_example01";
        private static Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
        
        // 创建Consumer对象
        private static Consumer<String, String> createConsumer() {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            
            return new KafkaConsumer<String, String>(props);
        }
        
        private static void runConsumer() {
            final Consumer<String, String> consumer = createConsumer();
            consumer.subscribe(Collections.singletonList(TOPIC));
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());              
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ProducerDemo.pubMsg(TOPIC, "hello world", 10);
            
            log.info("Program start!!!");
            runConsumer();
    
        }
    }
    

      生产者先往主题中发送 10 条数据,消费者再拉取数据消费,执行结果如下:

    多个消费者接收数据

      还是上述的代码,先启动两个消费者,指定不同的消费者组 groupexample01、groupexample02,生产者发布消息 ProducerDemo.pubMsg("consumer_example01", "multi consumer", 5);

      消费者马上从 topic 中拉取数据

      在 kafka 服务端,对不同的消费者组会单独保存它们的分区读取偏移量,不互相影响



    消费者接收多个 topic 数据

      可以直接指定 topic 列表,或者用模式匹配(适用于主题比较多的时候),代码如下:

    // 消费者消费多组数据
    public class MultiTopicConsumer {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";  
        private final static String GROUP_ID = "groupexample";
        private static Logger log = LoggerFactory.getLogger(ConsumerDemo2.class);
        
        // 创建Consumer对象
        private static Consumer<String, String> createConsumer() {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());      
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            
            return new KafkaConsumer<String, String>(props);
        }
        
        private static void runConsumer() {
            final Consumer<String, String> consumer = createConsumer();
            consumer.subscribe(Arrays.asList("consumer_example01","consumer_example02"));
            //consumer.subscribe(Pattern.compile("consumer_example.*"));
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());              
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {                
            log.info("Program start!!!");
            runConsumer();
        }       
    }
    

      先启动消费者,然后生产者分别往 consumer_example01、consumer_example02 主题写入数据

            ProducerDemo.pubMsg("consumer_example01", "consumer_example01 msg", 5);
            ProducerDemo.pubMsg("consumer_example02", "consumer_example02 msg", 5);
    

      消费者接收处理多组数据

    多个消费者加入同一组接收数据

      对于同个消费者组中消费者来说:

    • 每个消费者负责一个分区
    • 消费者数小于分区数则部分消费者要读取多个分区数据
    • 消费者数大于分区数则部分消费者空闲
    • 非空闲消费者发生故障,则空闲的消费者会顶替其作用

    首先创建一个有两个分区的主题

    root@:/app/kafka# bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 \
    > --topic group_topic --partitions 2 --replication-factor 1 \
    > --create
    

      启动两个消费者,它们同属于组 consumer_group ,生产者往 group_topic 中发送10条数据

      可以看到数据被轮询着发到两个分区,并分别被两个消费者读取

      将其中一个消费者停掉,再次发送10笔数据,可以看到都被剩下的那个消费者读取了

      再次启动两个消费者加入到消费者组,再次发送10条数据,可以看到其中一个消费者读取了一般的数据另一个消费者处于空闲状态

      关闭读取数据的消费者,再次发送10条数据,验证空闲消费者实现故障转移的效果



    (2)偏移量

    初始偏移量

      当主题中存在数据,而消费者从来没有读取过该主题时,可以选择从头开始读取数据,这样不会丢失对存量数据的读取;也可以选择从最末尾开始读取这样,这样消费者只会读取到它连接上服务器之后生产者新发送进来的数据。

      消费者第一次上线会如何处理是由参数 auto.offset.reset 参数决定的,若设置为 earliest 则从主题最开始读取数据;若设置为 latest 则从主题末尾开始读取数据;若设置为 none 则当主题中没有存量的数据时消费者会抛出异常。不设置默认为 latest

      测试代码如下:

    public class AutoOffsetResetDemo {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";  
        private static Logger log = LoggerFactory.getLogger(AutoOffsetResetDemo.class);
        private static Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition,OffsetAndMetadata>();  
        
        // 创建Producer对象
        private static Producer<String, String> createProducer() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG, "1");
            return new KafkaProducer<String, String>(props);
        }
        
        // 创建Consumer对象
        private static Consumer<String, String> createConsumer(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // 测试手工提交打开
            //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            
            return new KafkaConsumer<String, String>(props);
        }   
        
        // 往指定主题发布指定数量的消息
        private static void runProducer(String topic, String message, int count) throws InterruptedException, ExecutionException {
            Producer<String, String> producer = createProducer();   
            try {
                for (int i = 0; i < count; i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
                    RecordMetadata metadata = producer.send(record).get();              
                    System.out.printf("record (values = %s ) partition=%d  offset=%d ", record.value(), metadata.partition(), metadata.offset());
                    System.out.println("Send OK!!!!");
                }           
            } finally {
                producer.flush();
                producer.close();
            }
        }   
        
        // 以消费组名义读取某主题数据
        private static void runConsumer(String topic, String groupId) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            consumer.subscribe(Collections.singletonList(topic));
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());      
                         // 测试手工提交打开
                         // consumer.commitSync();
                      
                         // 实现精确提交偏移量
                         // currentOffsets.put(new TopicPartition(record.topic(), record.partition() ), new OffsetAndMetadata(record.offset() + 1,"no metadata"));
                         // consumer.commitSync(currentOffsets);
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }   
      
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            log.info("Producer send data!!!");
            runProducer("offset_example1", "111", 10);
            
            log.info("Consumer read data!!!");
            runConsumer("offset_example1", "offset_group1");
        }  
    }
    

      生产者先单独发送 10 条数据,然后再单独启动消费者,并没有实际读取到数据,查看消费组的偏移量,直接被更新到最新

      注意,该参数只针对第一次读取的组有效,一旦第一次读取在服务器留下来消费者偏移量数据,再次执行即使改成 earliest 也不会从头开始读取了。

      将消费组名改为 offset_group12,再启动消费者,可以看到读取了数据



    手工提交

      默认情况下,消费者会自动提交偏移量,但无法实现精确的提交,会导致数据处理遗漏或丢失。消费者拉取数据时是批量拉取下来的,然后应用程序对批量数据逐条处理,如果在某批数据处理到一半时,程序发生故障(比如程序更新、服务器故障、程序崩溃等),而 kafka 客户端 API 已经将这批数据的最末尾的偏移量提交到了服务端,我们希望在应用程序恢复时从上一次还没处理的数据开始处理,但是从 kafka 读取到的数据已经是新的一批数据了,之前的那批数据还没处理的数据就永远不能再读取到了。

      如上图所示,应用从 kafka 拉取第一批数据时,处理完偏移量为 2 的数据后,还没来得及处理下一个数据就崩溃了,但是在崩溃前已经自动提交了偏移量 5;重新恢复上线后,会从偏移量为 5 开始读取数据,而 3、4 数据就永远处理不到了。

      为了解决该问题,需要将自动提交改为手工提交,并在代码中显式调用提交方法。

      生产者先发10条数据,可以看到 offset_group1 落后10条数据

      在消费者提交偏移量出打个断电,debug 启动代码,会发现 commitSync() 方法并非与我们预料的一样,每次提交的偏移量+1,其内部处理机制会在所有数据在消费者端完整接收到后直接提交本批次数据的最后一次数据的偏移量+1,如下图所示:

      落后的偏移量直接变为0

      如果此时程序崩溃,依然没有解决一开始提出的问题,因此要指定提交的偏移量是多少,修改代码如下

    currentOffsets.put(new TopicPartition(record.topic(), record.partition() ), new OffsetAndMetadata(record.offset() + 1,"no metadata"));
    consumer.commitSync(currentOffsets);
    

      再次启动生产者发10笔数据,再启动消费者,从测试结果可以看出,每次提交偏移量只会加1,实现了一开始预设的精确提交偏移量的需求。


      如果读取每条数据都要提交一次偏移量,效率较低,可以使用批量提交的办法,获得性能提升,如下代码所示,没五条数据提交一次

        private static void runConsumer(String topic, String groupId) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            consumer.subscribe(Collections.singletonList(topic));
            
            int count = 0;
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        count++;
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());
                        
                        // 实现批量提交
                        if (count % 5 == 0) {
                            currentOffsets.put(new TopicPartition(record.topic(), record.partition() ), new OffsetAndMetadata(record.offset() + 1,"no metadata"));
                            consumer.commitSync(currentOffsets);
                        }
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }   
    

    同步提交和异步提交

      同步提交每次都要等待提交的结果,而异步提交不等待,如果有需要等待成功提交后处理的操作,可通过回调实现,代码如下:

    // 测试异步提交偏移量
    public class AsyncCommitOffsetDemo {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";  
        private static Logger log = LoggerFactory.getLogger(AsyncCommitOffsetDemo.class);
        private static Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition,OffsetAndMetadata>();  
        
        // 创建Consumer对象
        private static Consumer<String, String> createConsumer(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            System.out.println(ConsumerConfig.AUTO_OFFSET_RESET_DOC);
            
            return new KafkaConsumer<String, String>(props);
        }       
        
        // 以消费组名义读取某主题数据
        private static void runConsumer(String topic, String groupId) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            consumer.subscribe(Collections.singletonList(topic));
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());
                        
                        // 实现精确的偏移量提交
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition() ), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                        consumer.commitAsync(currentOffsets, new AsyncCommitCallback());
    
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }       
        
        public static void main(String[] args) {
            log.info("start consumer!!!");
            runConsumer("offset_example1", "offset_group2");
        }
    
    }
    
    class AsyncCommitCallback implements OffsetCommitCallback {
    
        private static Logger log = LoggerFactory.getLogger(AsyncCommitCallback.class);
        
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            for (OffsetAndMetadata data : offsets.values()) {
                log.info("Commit completed, offset" + data.offset());           
            }       
        }
        
    }
    

      可以看到消费者读取了所有数据后,才收到第一条数据的回调



    (3)组协调器和反序列化

    组协调器

      任何一个消费者连上 kafka 后,都会被赋予一个指定的分区,在程序正常工作时,当前分区保持不变;如果组内成员发生变化,第一个加入的消费者自动称为组长,负责协调后面新成员加入和老成员退出后分区的协调分配,这个过程被称为 Rebalance(重平衡)。

      使用以下代码来测试重平衡:

    // 重平衡示例
    public class RebalanceDemo {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";  
        private static Logger log = LoggerFactory.getLogger(RebalanceDemo.class);   
        
        // 创建Consumer对象
        private static Consumer<String, String> createConsumer(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");     
            System.out.println(ConsumerConfig.AUTO_OFFSET_RESET_DOC);
            
            return new KafkaConsumer<String, String>(props);
        }
        
        // 以消费组名义读取某主题数据
        private static void runConsumer(String topic, String groupId) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            consumer.subscribe(Collections.singletonList(topic), new RebalanceListener(consumer));
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());              
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }       
        
        public static void main(String[] args) {
            runConsumer("rb_topic", "rb_group");
    
        }
    }
    
    class RebalanceListener implements ConsumerRebalanceListener {
    
        private Consumer<String, String> consumer;
        static Logger log = LoggerFactory.getLogger(RebalanceListener.class.getName());
        
        public RebalanceListener(Consumer<String, String> con) {        
            this.consumer = con;
        }   
        
        // 分配的分区被撤销时调用本方法
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Following Partitions Revoked ......");
            for (TopicPartition partition :partitions) {            
                log.info(partition.partition() + ",");
            }
        }
    
        // 分区被重新分配时调用本方法
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            log.info("Following Partitions Assigned ......");
            for (TopicPartition partition :partitions) {            
                log.info(partition.partition() + ",");
            }
        }   
    }
    

    【代码分析】

      每启动一个线程就是一个消费者,订阅时传入消费者平衡监听器对象,这个对象会在消费者的分区被重平衡分配时调用触发通知。

    【演示】

      先创建一个包含3个分区的主题 rb_topic,然后依次启动属于 rb_group 消费组的三个消费者。

      可以看到第一个消费者启动时直接被分配了3个分区;第二个消费者启动时,第一个消费者原来的分配被取消并重分配了分区0、1,第二个消费者被分配了分区2;第三个消费者启动时,第一个消费者原来分配的分区0、1被取消,并重新分配了分区0,第二个消费者原来分配的分区2被取消,重新分配还是分区2,最后一个消费者被分配了分区1。

    反序列化

      首先要定义一个反序列化器,需要实现 org.apache.kafka.common.serialization.Deserializer 接口,代码如下:

    // 反序列化器
    public class CusDeserializer implements Deserializer<Customer> {
        @Override
        public Customer deserialize(String topic, byte[] data) {
            ObjectMapper objectMapper = new ObjectMapper();
            Customer customer = null;
            try {
                customer = objectMapper.readValue(data, Customer.class);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return customer;
        }
    }
    

      读取序列化对象并反序列化的消费者程序如下:

    // 读取对象的序列化数据
    public class ReadCustSerDemo {
    
        private final static String BOOTSTRAP_SERVERS = "192.168.72.131:9092";  
        private static Logger log = LoggerFactory.getLogger(ReadCustSerDemo.class); 
        
        // 创建Consumer对象
        private static Consumer<String, Customer> createConsumer(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CusDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");             
            
            return new KafkaConsumer<String, Customer>(props);
        }       
        
        // 以消费组名义读取某主题数据
        private static void runConsumer(String topic, String groupId) {
            final Consumer<String, Customer> consumer = createConsumer(groupId);
            consumer.subscribe(Collections.singletonList(topic));
            
            try {
                while (true) {
                    ConsumerRecords<String, Customer> records = consumer.poll(100);
                    for (ConsumerRecord<String, Customer> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());  
                          // log.info(record.value().info());
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }       
        
        public static void main(String[] args) {
            log.info("Start read customer data!!!");
            runConsumer("customer_topic", "customer_group");
        }
    }
    

      使用上述序列化的代码发送10笔数据到 customer_topic,消费者正常反序列化 customer 对象

      为了更方便显示反序列化的 customer 对象,为对象添加以下方法,并在消费者代码中调用

    public String info() {
        return "Customer [name=" + name + ", role=" + role + ", city=" + city + ", Street=" + Street + ", in_date="
                + in_date + ", Age=" + Age + "]";
    }
    

      启动消费者,生产者再次发10笔数据

    (4)消费指定分区位置数据

    消费指定偏移量之后的数据

      代码如下:

    // 指定位置消费的例子
    public class AssignConsumerDemo {
    
        private static final String BOOTSTRAP_SERVERS = "192.168.72.131:9092";
        private static Logger log = LoggerFactory.getLogger(AssignConsumerDemo.class);  
        
        // 创建Consumer对象
        private Consumer<String, String> createConsumer(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");             
            
            return new KafkaConsumer<String, String>(props);
        }   
        
        // 消费某个主题分区,指定偏移量后的数据
        private void runConsumer(String groupId, String topic, int partition, long offsetread) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            TopicPartition tp = new TopicPartition(topic, partition);
            consumer.assign(Arrays.asList(tp));     // 指定要消费哪个分区的数据
            consumer.seek(tp, offsetread);          // 指定消费偏移量之后的数据
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + "  , value = " + record.value() + ", partition = " + record.partition() + ", offset = " + record.offset());                  
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }       
        
        @Test
        public void assignOffset() {
            runConsumer("assign_group", "assign_topic", 0, 5);      
        }
        
    }
    

      不管消费组的偏移量到哪,都可以直接读取指定位置之后的数据

      消费完后,不管之前的偏移量是多少,现在都被更新到了最末尾

    消费指定时间之后的数据

      先根据事件找到偏移量,接下来的处理跟上面相同。从上面的测试结果可知,偏移量为10的数据对应的时间戳是1652424933159,所以指定的时间戳会从10号数据开始读取,代码如下:

        // 消费某个主题分区,某个时间之后的数据
        private void runConsumer2(String groupId, String topic, int partition, long time) {
            final Consumer<String, String> consumer = createConsumer(groupId);
            TopicPartition tp = new TopicPartition(topic, partition);
            consumer.assign(Arrays.asList(tp));     // 指定要消费哪个分区的数据
            
            // 根据事件计算出最近的偏移量
            Map<TopicPartition,Long> timestampsToSearch = new HashMap<TopicPartition,Long>(); 
            timestampsToSearch.put(tp, time);
            Map<TopicPartition, OffsetAndTimestamp>  outoffsets = consumer.offsetsForTimes(timestampsToSearch);     
            Long seekOffset = outoffsets.get(tp).offset();
            
            consumer.seek(tp, seekOffset);          // 指定消费偏移量之后的数据
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        log.info("topic = " + record.topic() + ", value = " + record.value() + ", partition = " + record.partition() + 
                                 ", offset = " + record.offset() + ", time = " + record.timestamp());                   
                    }
                }
            } finally {
                consumer.close();
                log.info("Program close!!!");
            }
        }
        
        @Test
        public void assignTime() {
            runConsumer2("assign_group", "assign_topic", 0, 1652424933159L);
        }
    

      测试结果如下:

    相关文章

      网友评论

        本文标题:高性能分布式消息系统 —— Kafka

        本文链接:https://www.haomeiwen.com/subject/ypnlurtx.html