title: Kafka常见问题
date: 2020-04-01 16:25:49
update: 2020-04-01 20:31:30
excerpt: Kafka 面试中常见问题
toc_min_depth: 3
tags:
- Kafka
- 大数据框架
categories: - [Kafka]
- [大数据框架]
kafka
kafka的定义
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
消息队列有什么好处
1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
3)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
消费队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
kafka中的相关概念
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。
kafka配置文件
位置
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties
内容
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
kafka分布式的broker.id配置
修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复
kafka的群起脚本
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
done
kafka的命令行操作命令
启动
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
删除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
消费消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties 指定消费者的配置文件(可将多个消费者放置在一个组内)
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first
注 : --from-beginning:会把主题中以往所有的数据都读取出来。
查看某个Topic的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
修改分区数
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
kafka工作流程
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
如下
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index和log文件以当前segment的第一条消息的offset命名。
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
kafka生产者的分区分配策略
1)分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2)分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
kafka如何保证数据可靠性
为保证producer发送的数据能可靠的发送到指定的topic,
topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),
如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
都有哪些副本数据同步策略 优缺点是什么
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
kafka的副本同步策略是什么 这个策略会出现什么问题
Kafka选择了第二种方案,原因如下:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
kafka中的ISR是什么
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。
当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。
如果follower长时间未向leader同步数据,则该follower将被踢出ISR,
该时间阈值由replica.lag.time.max.ms参数设定。
Leader发生故障之后,就会从ISR中选举新的leader。
kafka中的ack应答机制是什么
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
acks:
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
kafka如何进行故障处理
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
kafka消费者的消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
kafka消费者的分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是RoundRobin,一是Range。
kafka消费者如何维护offset
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
1)修改配置文件consumer.properties
exclude.internal.topics=false
2)读取offset
0.11.0.0之前版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0之后版本(含):
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
kafka中的消费者组是什么
配置config/consumer.properties文件中的group.id
然后在启动消费者时候使用同一个配置文件 就可以让消费者在一个组内
同一个消费者组中的消费者,同一时刻只能有一个消费者消费。
如果消费者组中的消费者多于当前的分区数 会有警告提醒
No broker partitions consumed by consumer thread ...
如果停止了所有的消费者 那么offset会维护在我们选择的地方(zk中或者是本地)
再次启动消费者会根据选择的GTP(group topic partition所维护的offset位置进行继续消费)
下图为zk中维护的信息
image
kafka为什么能够高效读写数据
- 分布式框架
- 分区
- 顺序写磁盘
- Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 零复制技术
- image
kafka的零拷贝技术如何实现
kafka中的消费者在读取服务端的数据时,需要将服务端的磁盘文件通过网络发送到消费者进程,网络发送需要经过几种网络节点。如下图所示:
image传统的读取文件数据并发送到网络的步骤如下:
(1)操作系统将数据从磁盘文件中读取到内核空间的页面缓存;
(2)应用程序将数据从内核空间读入用户空间缓冲区;
(3)应用程序将读到数据写回内核空间并放入socket缓冲区;
(4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。
通常情况下,Kafka的消息会有多个订阅者,生产者发布的消息会被不同的消费者多次消费,为了优化这个流程,Kafka使用了“零拷贝技术”,如下图所示:
image“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。
如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。
传统的文件拷贝通常需要从用户态去转到核心态,经过read buffer,然后再返回到用户态的应用层buffer,然后再从用户态把数据拷贝到核心态的socket buffer,然后发送到网卡。
image传统的数据传输需要多次的用户态和核心态之间的切换,而且还要把数据复制多次,最终才打到网卡。
如果减少了用户态与核心态之间的切换,是不是就会更快了呢?
image此时我们会发现用户态“空空如也”。数据没有来到用户态,而是直接在核心态就进行了传输,但这样依然还是有多次复制。首先数据被读取到read buffer中,然后发到socket buffer,最后才发到网卡。虽然减少了用户态和核心态的切换,但依然存在多次数据复制。
如果可以进一步减少数据复制的次数,甚至没有数据复制是不是就会做到最快呢?
DMA
别急,这里我们先介绍一个新的武器:DMA。
DMA,全称叫Direct Memory Access,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。如下图:
image而DMA,则可以绕过CPU,硬件自己去直接访问系统主内存。如下图:
image很多硬件都支持DMA,这其中就包括网卡。
image零拷贝
回到本文中的文件传输,有了DMA后,就可以实现绝对的零拷贝了,因为网卡是直接去访问系统主内存的。如下图:
imageJava的零拷贝实现
在Java中的零拷贝实现是在FileChannel中,其中有个方法transferTo(position,fsize,src)。
传统的文件传输是通过java.io.DataOutputStream,java.io.FileInputStream来实现的,然后通过while循环来读取input,然后写入到output中。
image零拷贝则是通过java.nio.channels.FileChannel中的transferTo方法来实现的。transferTo方法底层是基于操作系统的sendfile这个system call来实现的(不再需要拷贝到用户态了),sendfile负责把数据从某个fd(file descriptor)传输到另一个fd。
sendfile:
image imageJava的transferTo:
image传统方式与零拷贝性能对比
image可以看出速度快出至少三倍多。Kafka在文件传输的过程中正是使用了零拷贝技术对文件进行拷贝。建议以后多用FileChannel的transferTo吧。
总结
- 传统的文件传输有多次用户态和内核态之间的切换,而且文件在多个buffer之间要复制多次最终才被发送到网卡。
- DMA是一种硬件直接访问系统主内存的技术。
- 多种硬件都已使用了DMA技术,其中就包括网卡(NIC)。
- DMA技术让CPU得到解放,让CPU可以不用一直守着来完成文件传输。
- 零拷贝技术减少了用户态与内核态之间的切换,让拷贝次数降到最低,从而实现高性能。
- Kafka使用零拷贝技术来进行文件的传输。
zk在kafka中的作用
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。
人话:
每个broker都会在zk进行注册
然后KafkaController会实时监听zk中的/brokers/ids下的节点情况[0,1,2]
如果broker0宕机 ids中的节点会实时变化为[1,2]
KafkaController会更新topic中的leader和isr队列
KafkaController会获取当前可用的isr并从中选出新的leader
kafka的消息发送流程是什么样的
Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,
以及一个线程共享变量——RecordAccumulator(这个里面有分区)
main线程将消息发送给RecordAccumulator,
Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
注意这里面 先走拦截器 再走序列化器 再走分区器
达到batch.size大小或者是linger.ms时间就发到RecordAccumulator中
sender线程去拉取
image
相关参数:
batch.size:只有数据积累到batch.size之后,sender才会发送数据。(默认16kb)
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。
如何使用kafka API 实现异步消息发送
准备知识
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord对象
几个比较重要的配置项
//kafka集群,broker-list
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- kafka集群位置
- 批次大小
- 批次等待时间
- 重试次数
- 缓冲区大小
- 序列化器(
org\apache\kafka\common\serialization\Serializer.java
)
org\apache\kafka\clients\producer\ProducerConfig.java
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String BATCH_SIZE_CONFIG = "batch.size";
public static final String ACKS_CONFIG = "acks";
public static final String LINGER_MS_CONFIG = "linger.ms";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
public static final String RETRIES_CONFIG = "retries";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
org\apache\kafka\clients\consumer\ConsumerConfig.java
public static final String GROUP_ID_CONFIG = "group.id";
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
public static final int DEFAULT_FETCH_MAX_BYTES = 52428800;
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String CHECK_CRCS_CONFIG = "check.crcs";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
public static final String DEFAULT_ISOLATION_LEVEL;
org\apache\kafka\clients\CommonClientConfigs.java
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
不带回调的API
imagepackage com.atguigu.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 整个配置中的key可以使用ProducerConfig中定义的常量
//kafka集群,broker-list
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
// 轮循 这个会用到分区器
producer.send(new ProducerRecord("second","value++>"+i));
// 根据给的key进行hash 然后放在不同的分区 这个会使用到分区器
producer.send(new ProducerRecord("second","key"+i,"value==>"+i));
// 具体指定了分区号 就不再使用到key 这个不会用到分区器
if(i<5){
producer.send(new ProducerRecord("second","key"+i,"value**>"+i));
}else{
producer.send(new ProducerRecord("second","key"+i,"value^^>"+i));
}
}
producer.close();
}
}
// 分区器源码解读
org\apache\kafka\clients\producer\Partitioner.java
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
}
//唯一实现类
org\apache\kafka\clients\producer\internals\DefaultPartitioner.java
public class DefaultPartitioner implements Partitioner {
// 传进来的是topic key 还有序列化后的key value 序列化后的value
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//如果key是空的 后面的逻辑用了自增然后对分区取余 其实就是轮循
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 如果key不是空的 将keyBytes传进去然后做hash murmur2是一种哈希算法
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
}
带回调的API
image跟上面不同的就是在使用send方法时候 带上一个回调函数
// 回调方法:当前消息发出后 不管是消息成功发送还是发送失败 都会执行该回调方法
// metadata 当前消息的元数据
// metadata能拿到当前分区的各种数据 如下图所示
// 偏移量 分区 主题 时间戳 等等
// exception 当消息发送失败 会返回该异常
image
// org\apache\kafka\clients\producer\Callback.java
public interface Callback {
public void onCompletion(RecordMetadata metadata, Exception exception);
}
// 这是一个接口 里面有一个方法
它有两个实现类
image
// 示例 带回调的API
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MyCallBackProducer {
public static void main(String[] args) throws Exception {
//1. 创建配置对象
Properties props = new Properties();
//kafka集群的位置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//ack级别
props.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//批次大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//等待时间
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
//缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//k v 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2. 创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
//3.生产数据
for (int i = 0; i < 10000 ; i++) {
producer.send(new ProducerRecord<>("second", "atguigu@@@@@" + i), new Callback() {
/**
* 回调方法: 当前的消息发送出去以后,会执行回调方法。
* @param metadata 当前消息的元数据信息。
* @param exception 当发送失败,会返回异常。
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
//发送成功
System.out.println(metadata.topic() + " -- " + metadata.partition() + " -- " + metadata.offset());
}
}
});
}
// TimeUnit.MILLISECONDS.sleep(100);
//关闭
producer.close();
}
}
kafka API中没有写producer.close()为什么读不到数据 也没有回调方法
这是因为异步发送消息的原因
main线程在发送完数据之后就结束了 这个时间小于了批次拉取设置的时间1ms
sender线程去拉取数据的同时需要执行main线程中的回调方法
但是现在main线程已经关闭 所以无法执行回调方法
如果我们不写close方法 而是让main线程休眠100ms 这时sender就能在这个时间内拉取到数据并执行回调方法
所以close方法肯定会等待sender线程拉取数据完成后再进行关闭
具体实现可以看close()方法的源码 如下
// org\apache\kafka\clients\producer\KafkaProducer.java
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
//关闭此生产者。 此方法一直阻塞所有以前发送的请求完成。 此方法等效于close(Long.MAX_VALUE, TimeUnit.MILLISECONDS) 如果关闭()被从调用Callback ,警告消息将被记录并关闭(0,TimeUnit.MILLISECONDS)将被代替调用。 我们这样做是因为发件人线程否则将尝试加入自己和永远阻塞。
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
* This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
* <p>
* If the producer is unable to complete all requests before the timeout expires, this method will fail
* any unsent and unacknowledged records immediately.
* <p>
* If invoked from within a {@link Callback} this method will not block and will be equivalent to
* <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
* blocking the I/O thread of the producer.
*
* @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
* non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
* @param timeUnit The time unit for the <code>timeout</code>
* @throws InterruptException If the thread is interrupted while blocked
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*/
// 这种方法最多等待timeout的生产者完成所有未完成的请求的发送。
// 如果生产者是无法完成所有请求超时到期之前,此方法将立即失败任何未发送和未确认的记录。
// 如果从内调用Callback此方法不会阻止和将等效于close(0, TimeUnit.MILLISECONDS) 这样做是因为同时阻断生产者的I/O线程没有进一步的发送会发生
@Override
public void close(long timeout, TimeUnit timeUnit) {
close(timeout, timeUnit, false);
}
private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
if (timeout < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
// this will keep track of the first encountered exception
AtomicReference<Throwable> firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeout > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeUnit.toMillis(timeout));
} catch (InterruptedException t) {
firstException.compareAndSet(null, t);
log.error("Interrupted while joining ioThread", t);
}
}
}
}
if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests could not be completed " +
"within timeout {} ms.", timeout);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
// 仅当不从回调调用时才加入发送者线程。
if (!invokedFromCallback) {
try {
this.ioThread.join();
} catch (InterruptedException e) {
firstException.compareAndSet(null, e);
}
}
}
ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
log.debug("The Kafka producer has closed.");
if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to close kafka producer", firstException.get());
}
如何使用kafka API 实现同步消息发送
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方法即可。
区别就在于在send方法处拿到返回值future
然后调用future中的get方法
调用此方法就会阻塞当前线程 一直等到结果返回
java\util\concurrent\Future.java
image
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MyCallBackProducer {
public static void main(String[] args) throws Exception {
//1. 创建配置对象
Properties props = new Properties();
//kafka集群的位置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//ack级别
props.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//批次大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//等待时间
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
//缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//k v 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2. 创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
//3.生产数据
for (int i = 0; i < 10000 ; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("second", "atguigu@@@@@" + i), new Callback() {
/**
* 回调方法: 当前的消息发送出去以后,会执行回调方法。
* @param metadata 当前消息的元数据信息。
* @param exception 当发送失败,会返回异常。
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
//发送成功
System.out.println(metadata.topic() + " -- " + metadata.partition() + " -- " + metadata.offset());
}
}
});
// 发送一个之后阻塞线程等待返回结果才继续发送下一个
// 阻塞等待 , 同步发送
// 此时会发现结果严格按照发送的顺序
RecordMetadata recordMetadata = future.get();
}
//关闭
producer.close();
}
}
kafka的分区器怎么写 如何自定义分区器
-
继承
Partitioner
-
重写三个方法
configure() partition() close()
可以根据传进的key分区 也可根据value分区
在定义好自己的分区器之后 还要再配置中添加分区器的全类名 否则会走默认的分区器
系统默认分区器
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
// 简单实现一个分区器
public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (key == null) { // key为空 到0号分区
return 0;
} else { // key不为空 到1号分区
return 1;
}
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
// 如果要使用自己定义的分区器 要在配置中指定分区器并传入分区器的全类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.partitioner.MyPartitioner");
kafka的消费者需要注意的主要问题是什么
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
如何使用kafka API 实现消息接收(消费者)
准备知识
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔
几个比较重要的配置项
- 自动提交offset功能
- 自动提交时间间隔
- 消费者组
- 反序列化器(对应生产者端的序列化
org\apache\kafka\common\serialization\Deserializer.java
)
自动提交offset
package fun.hoffee.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*/
public class MyConsumer {
public static void main(String[] args) {
//1. 创建配置对象
Properties props = new Properties();
//指定kafka集群的位置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//开启自动提交offset
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交offset的间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
//指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "atguigu");
//指定kv的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//2. 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//3. 订阅主题
consumer.subscribe(Arrays.asList("first", "second", "third"));
//4. 消费数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() + " -- " + record.key() + " -- " + record.value());
}
}
}
}
// 此时创建的是新组 不能消费到之前的数据
// 如果想要消费之前的数据 需要重置offset
// 由auto.offset.rest参数(ConsumerConfig中的AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";)控制 默认值为latest
// 可以配置为 earliest | latest | none
---
// 文档说明如下 :
// What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
// earliest: automatically reset the offset to the earliest offset
// latest: automatically reset the offset to the latest offset
// none: throw exception to the consumer if no previous offset is found for the consumer's group
// anything else: throw exception to the consumer.
// 当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除),该怎么办:
// 最早:自动将偏移量重置为最早的偏移量
// 最新:自动将偏移量重置为最新偏移量
// 无:如果未找到消费者组的先前偏移量,则向消费者抛出异常
// 其他:向消费者抛出异常
// 人话: 如果这个是一个新的组 或者是 这个组拿了一个kafka中不存在的偏移量去消费数据时候 kafka就会自动帮忙重置offset 如果配置过这个参数 就按这个参数配置的来 如果没有配置过 默认重置为latest
重置offset
具体说明见上一节代码末尾
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*/
public class MyConsumer {
public static void main(String[] args) {
//1. 创建配置对象
Properties props = new Properties();
//指定kafka集群的位置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//开启自动提交offset
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//关闭自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交offset的间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1);
//重置offset : earliest(最早) latest(最后)
//满足两个条件:
// 1. 当前的消费者组在kafka没有消费过所订阅的主题
// 2.当前消费者组使用的offset在kafka集群中已经被删除
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu111");
//指定kv的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2. 创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
//3. 订阅主题
consumer.subscribe(Arrays.asList("first","second","third"));
//4. 消费数据
while(true){
// 此处是拉取数据方法 poll中传递的参数是超时时间 当主题中没有数据时候 等待超时时间之后再进行拉取数据
// 假如某一次没有消费到数据 会等待响应的时间之后再进行拉取 单位是ms
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() +" -- " +
record.key() +" -- " + record.value());
}
}
}
}
手动提交offset的两种方式
虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;
不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
由于同步提交offset有失败重试机制,故更加可靠
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
如果关闭了提交offset 在一直没有关闭consumer的情况下 consumer能正常消费数据
因为consumer从kafka中拿到offset后会一直将offset维护在内存中
但是一旦关闭 因为没有向kafka提交过offset 则offset还是之前的
那么这段时间生产的数据将被重复消费
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*/
public class MyConsumer {
public static void main(String[] args) {
//1. 创建配置对象
Properties props = new Properties();
//指定kafka集群的位置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//开启自动提交offset
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//关闭自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交offset的间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1);
//重置offset : earliest(最早) latest(最后)
//满足两个条件: 1. 当前的消费者组在kafka没有消费过所订阅的主题 2.当前消费者组使用的offset在kafka集群中已经被删除
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu111");
//指定kv的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2. 创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
//3. 订阅主题
consumer.subscribe(Arrays.asList("first","second","third"));
//4. 消费数据
while(true){
// 此处是拉取数据方法 poll中传递的参数是超时时间 当主题中没有数据时候 等待超时时间之后再进行拉取数据
// 假如某一次没有消费到数据 会等待响应的时间之后再进行拉取 单位是ms
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() +" -- " +
record.key() +" -- " + record.value());
}
//手动提交offset
//同步提交 代码会阻塞 直到提交offset成功 才开始消费下一条数据
consumer.commitSync(); //阻塞
//异步提交 会触发提交offset的操作 但是会继续消费数据 不管offset是否提交成功
//consumer.commitAsync();
}
}
}
kafka中重复消费数据和漏消费数据的情况
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。
先提交offset后消费,有可能造成数据的漏消费;
而先消费后提交offset,有可能会造成数据的重复消费。
这是offset的提交 和 消费数据 这两件事之间的先后顺序问题
例1 :
消费者poll进100条数据 但是在消费到第60条时候宕机 但是offset已经提交 这时候 offset超前
则后40条出现漏消费
例2 :
消费者poll进100条数据 但是offset在提交时候失败 但此时是先消费后提交offset的情况 这时候 offset滞后
则这100条数据在下次启动时候会被重复消费
如何解决这个问题?
将两件事情绑定在一起 如果失败则同时失败 如果成功则同时成功
不允许出现一个失败一个成功的情况
将两件事绑定为事务
kafka API 如何实现自定义存储offset
Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
// 创建配置信息
Properties props = new Properties();
// Kafka集群
props.put("bootstrap.servers", "hadoop102:9092");
// 消费者组,只要group.id相同,就属于同一个消费者组
props.put("group.id", "test");
// 关闭自动提交offset
props.put("enable.auto.commit", "false");
// Key和Value的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅主题 在订阅时候创建一个ConsumerRebalanceListener的对象实时监听
// 并重写两个方法onPartitionsRevoked 和 onPartitionsAssigned
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//该方法会在Rebalance之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在Rebalance之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));
//定位到最近提交的offset位置继续消费
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);//异步提交
}
}
//获取某分区的最新offset
private static long getOffset(TopicPartition partition) {
return 0;// 这里是伪代码 需要根据具体存储的系统来实现
}
//提交该消费者所有分区的offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
// 这里是伪代码 需要根据具体存储的系统来实现
}
}
kafka中的拦截器是如何实现的 原理是什么
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
请实现一个kafka的拦截器
需求:
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
分析:
image时间拦截器
package fun.hoffee.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 在所有的消息内容前面加上时间戳
*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//获取当前消息的value
String value = record.value();
value = System.currentTimeMillis() + " -- " + value;
//构造一个producerRecord
ProducerRecord<String, String> resultRecord =
new ProducerRecord<>(record.topic(), record.partition(), record.key(), value);
return resultRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
计数拦截器
package fun.hoffee.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 统计发送成功或失败的消息个数
*/
public class CountInterceptor implements ProducerInterceptor<String, String> {
private Integer success = 0;
private Integer fail = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 相当于原路返回没有做处理
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
success++;
} else {
fail++;
}
}
@Override
public void close() {
// 整个拦截器走完之后 调用该方法
System.out.println("Success : " + success);
System.out.println("Fail :" + fail);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在生产者的配置文件中配置拦截器(可设置多个 设置为一个list)
package fun.hoffee.kafka.interceptor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class InterceptorProducer {
public static void main(String[] args) {
//1. 创建配置对象
Properties props = new Properties();
//指定kafka集群的位置,broker-list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//指定ack的应答级别 0 1 -1(all)
props.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//批次大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16kb
//等待时间
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//RecordAccumulator缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32M
//指定kv的序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//指定拦截器
// "A list of classes to use as interceptors. Implementing the <code>ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
List<String> interceptors = new ArrayList<>();
interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
interceptors.add("com.atguigu.kafka.interceptor.CountInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
//2. 创建生产者对象
KafkaProducer producer = new KafkaProducer<String, String>(props);
//3. 生产数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord("second", "shangguigu==>" + i));
}
//4. 关闭
producer.close();
}
}
flume如何对接kafka
使用kafkasink
此时kafkasink相当于kafka的生产者 它可以根据消息的标记发送给kafka中不同的topic
flume官网关于kafka sink的介绍如下
这是一个Flume Sink实现,可以将数据发布到 Kafka主题。目标之一是将Flume与Kafka集成在一起,以便基于拉式的处理系统可以处理来自各种Flume来源的数据。目前,该版本支持Kafka 0.9.x系列发行版。
此版本的Flume不再支持Kafka的旧版本(0.8.x)。
必需的属性以粗体标记。
Property Name | Default | Description |
---|---|---|
type | – | Must be set to org.apache.flume.sink.kafka.KafkaSink
|
kafka.bootstrap.servers | – | List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port Kafka-Sink将连接到的代理列表,以获取主题分区列表。这可以是部分代理列表,但是对于HA,我们建议至少两个。格式是用逗号分隔的主机名:端口列表 |
kafka.topic | default-flume-topic | The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Kafka中将发布消息的主题。如果配置了此参数,则消息将发布到该主题。如果事件标题包含“主题”字段,则事件将发布到该主题,并覆盖此处配置的主题。 |
flumeBatchSize | 100 | How many messages to process in one batch. Larger batches improve throughput while adding latency. 一批中要处理多少条消息。较大的批次可提高吞吐量,同时增加延迟。 |
kafka.producer.acks | 1 | How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure. 在成功考虑一条消息之前,有多少个副本必须确认一条消息。接受的值为0(永远不等待确认),1(仅等待领导者),-1(等待所有副本)将其设置为-1,以避免在某些领导者失败的情况下丢失数据。 |
useFlumeEventFormat | false | By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side. 默认情况下,事件直接从事件主体作为字节放入Kafka主题。设置为true可将事件存储为Flume Avro二进制格式。与KafkaSource上的相同属性或Kafka Channel上的parseAsFlumeEvent属性结合使用,将为生产方保留任何Flume标头。 |
defaultPartitionId | – | Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader . By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class ). |
partitionIdHeader | – | When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId . |
kafka.producer.security.protocol | PLAINTEXT | Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more producer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer. | |
Other Kafka Producer Properties | – | These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer . For example: kafka.producer.linger.ms |
The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer) and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
An example configuration of a Kafka sink is given below. Properties starting with the prefix kafka.producer
the Kafka producer. The properties that are passed when creating the Kafka producer are not limited to the properties given in this example. Also it is possible to include your custom properties here and access them inside the preprocessor through the Flume Context object passed in as a method argument.
示例配置如下:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic // 指定写入topic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 // kafka位置
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
实现flume中不同的event发往kafka中不同的topic
image如何监控kafka
kafka面试题总结
1.Kafka中的ISR、OSR、AR又代表什么?
ISR:与leader保持同步的follower集合
AR:分区的所有副本
2.Kafka中的HW、LEO等分别代表什么?
LEO:没个副本的最后条消息的offset
HW:一个分区中所有副本最小的offset 控制整个分区中哪些数据能够暴露给消费者
3.Kafka中是怎么体现消息顺序性的?
每个分区内,每条消息都有一个offset,故只能保证分区内有序。
4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
拦截器 -> 序列化器 -> 分区器
5.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
image6.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
正确
7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
offset+1 记录下次消费的数据的offset
8.有哪些情形会造成重复消费?
image9.有哪些情景会造成消息漏消费?
先提交offset,后消费,有可能造成数据的重复
10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache
11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
12.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以减少,现有的分区数据难以处理。
13.Kafka有内部的topic吗?如果有是什么?有什么所用?
__consumer_offsets, 共有50个分区 保存消费者offset
14.Kafka分区分配的概念?
一个topic多个分区,一个消费者组多个消费者,故需要将分区分配个消费者(roundrobin、range)
15.简述Kafka的日志目录结构?
每个分区对应一个文件夹,文件夹的命名为topic-0,topic-1,内部为.log和.index文件
16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
image先通过offset比对log文件的名字 确定好后 再找到对应的index文件中offset对应的消息索引位置
最后在log文件中找到相应的消息
17.聊一聊Kafka Controller的作用?
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
partition leader(ISR),由Controller负责
Controller(先到先得)
19.失效副本是指什么?有那些应对措施?
不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
20.Kafka的那些设计让它有如此高的性能?
分区,顺序写磁盘,0-copy
其他kafka相关面试题搜集(一)
1、请说明什么是Apache Kafka?
Apache Kafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。
2、说说Kafka的使用场景?
①异步处理
②应用解耦
③流量削峰
④日志处理
⑤消息通讯等。
3、使用Kafka有什么优点和缺点?
优点:
①支持跨数据中心的消息复制;
②单机吞吐量:十万级,最大的优点,就是吞吐量高;
③topic数量都吞吐量的影响:topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源;
④时效性:ms级;
⑤可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用;
⑥消息可靠性:经过参数优化配置,消息可以做到0丢失;
⑦功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:
①由于是批量发送,数据并非真正的实时; 仅支持统一分区内消息有序,无法实现全局消息有序;
②有可能消息重复消费;
③依赖zookeeper进行元数据管理,等等。
4、为什么说Kafka性能很好,体现在哪里?
①顺序读写
②零拷贝
③分区
④批量发送
⑤数据压缩
5、请说明什么是传统的消息传递方法?
传统的消息传递方法包括两种:
排队:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人。
发布-订阅:在这个模型中,消息被广播给所有的用户。
6、请说明Kafka相对传统技术有什么优势?
①快速:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作。
②可伸缩:在一组机器上对数据进行分区
③和简化,以支持更大的数据
④持久:消息是持久性的,并在集群中进
⑤行复制,以防止数据丢失。
⑥设计:它提供了容错保证和持久性
7、解释Kafka的Zookeeper是什么?我们可以在没有Zookeeper的情况下使用Kafka吗?
Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。
不,不可能越过Zookeeper,直接联系Kafka broker。一旦Zookeeper停止工作,它就不能服务客户端请求。
Zookeeper主要用于在集群中不同节点之间进行通信
在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取
除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
8、解释Kafka的用户如何消费信息?
在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节从套接口转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
9、解释如何提高远程用户的吞吐量?
如果用户位于与broker不同的数据中心,则可能需要调优套接口缓冲区大小,以对长网络延迟进行摊销。
10、解释一下,在数据制作过程中,你如何能从Kafka得到准确的信息?
在数据中,为了精确地获得Kafka的消息,你必须遵循两件事:
在数据消耗期间避免重复,在数据生产过程中避免重复。
这里有两种方法,可以在数据生成时准确地获得一个语义:
每个分区使用一个单独的写入器,每当你发现一个网络错误,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功
在消息中包含一个主键(UUID或其他),并在用户中进行反复制
11、解释如何减少ISR中的扰动?broker什么时候离开ISR?
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。ISR应该总是包含所有的副本,直到出现真正的故障。如果一个副本从leader中脱离出来,将会从ISR中删除。
12、Kafka为什么需要复制?
Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
13、如果副本在ISR中停留了很长时间表明什么?
如果一个副本在ISR中保留了很长一段时间,那么它就表明,跟踪器无法像在leader收集数据那样快速地获取数据。
14、请说明如果首选的副本不在ISR中会发生什么?
如果首选的副本不在ISR中,控制器将无法将leadership转移到首选的副本。
15、有可能在生产后发生消息偏移吗?
在大多数队列系统中,作为生产者的类无法做到这一点,它的作用是触发并忘记消息。broker将完成剩下的工作,比如使用id进行适当的元数据处理、偏移量等。
作为消息的用户,你可以从Kafka broker中获得补偿。如果你注视SimpleConsumer类,你会注意到它会获取包括偏移量作为列表的MultiFetchResponse对象。此外,当你对Kafka消息进行迭代时,你会拥有包括偏移量和消息发送的MessageAndOffset对象。
16、Kafka的设计时什么样的呢?
Kafka将消息以topic为单位进行归纳
将向Kafka topic发布消息的程序成为producers. 将订阅了topics并消费消息的程序成为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息
17、数据传输的事务定义有哪三种?
(1)最多一次:
消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的
18、Kafka判断一个节点是否还活着有那两个条件?
(1)节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
(2)如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久
19、producer是否直接将数据发送到broker的leader(主节点)?
producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。
20、Kafa consumer是否可以消费指定分区消息?
Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的
21、Kafka消息是采用Pull模式,还是Push模式?
Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发
22、Kafka存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
消息长度: 4 bytes (value: 1+4+n)
版本号: 1 byte
CRC校验码: 4 bytes
具体的消息: n bytes
23、Kafka高效文件存储设计特点:
(1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
(2).通过索引信息可以快速定位message和确定response的最大大小。
(3).通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
(4).通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
24、Kafka 与传统消息系统之间有三个关键区别
(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
(3).Kafka 支持实时的流式处理
25、Kafka创建Topic时如何将分区放置到不同的Broker中
副本因子不能大于 Broker 的个数;
第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的
26、Kafka新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。 当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。 如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。 但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
27、partition的数据如何保存到硬盘
topic中的多个partition以文件夹的形式保存到broker,每个分区序号从0递增, 且消息有序 Partition文件下有多个segment(xxx.index,xxx.log) segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g 如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名
28、kafka的ack机制
request.required.acks有三个值 0 1 -1
0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢失
29、Kafka的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费。同时也可以按照指定的offset进行重新消费。
30、消费者负载均衡策略
结合consumer的加入和退出进行再平衡策略。
31、kafka消息数据是否有序?
消费者组里某具体分区是有序的,所以要保证有序只能建一个分区,但是实际这样会存在性能问题,具体业务具体分析后确认。
32、kafaka生产数据时数据的分组策略,生产者决定数据产生到集群的哪个partition中
每一条消息都是以(key,value)格式 Key是由生产者发送数据传入 所以生产者(key)决定了数据产生到集群的哪个partition
33、kafka consumer 什么情况会触发再平衡reblance?
①一旦消费者加入或退出消费组,导致消费组成员列表发生变化,消费组中的所有消费者都要执行再平衡。
②订阅主题分区发生变化,所有消费者也都要再平衡。
34、描述下kafka consumer 再平衡步骤?
①关闭数据拉取线程,清空队列和消息流,提交偏移量;
②释放分区所有权,删除zk中分区和消费者的所有者关系;
③将所有分区重新分配给每个消费者,每个消费者都会分到不同分区;
④将分区对应的消费者所有关系写入ZK,记录分区的所有权信息;
⑤重启消费者拉取线程管理器,管理每个分区的拉取线程。
网友评论