kafka 知识整理
kafka介绍
kafka是一种分布式的基于发布/订阅的消息系统。具有如下特征:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
高吞吐率。即使是在非常廉价的商用机器上也能做到单机支持每秒100k条消息的传输
支持消息分区及分布式消费,同时保证每个partition内的消息顺序传输
同时支持离线和实时数据处理
kafka解析
broker
kafka集群包括有一个或多个服务器,这种服务器被称为broker
topic
kafka发送消息的类别,用于区分不同消息发送的。物理上不同的topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或者多个broker上,但用户只需指定消息的topic即可生产或消费数据,而不需要关心数据存于何处
partition
topic物理上的分组。每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition是一个有序的队列,对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。在发送一条消息时,生产者可以指定这条消息的key和分区机制来发送到不同的分区。
producer
负责发送消息到kafka broker,推送消息
consumer
负责从kafka broker 拉取消息。每个consumer属于一个特定的consumer group,同一个topic的一条消息只能被同一个group内的一个consumer消费,但多个consumer group可同时消费这一topic
kafka架构
如上图所示,一个kafka集群包含有若干个producer,若干个broker(支持水平扩展,broker数量越多,集群的吞吐率越高),若干consumer group,以及一个zookeeper集群。
kafka通过zookeeper管理集群配置,选举leader,以及在consumer group 发生变化时进行rebalance。producer通过push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
partition
每一条消息发送到broker时,会根据partition规则选择被存储到哪一个partition。如果partition规则设置合理,所有的消息可以均匀的分布到不同的partition中,这样就可以实现水平扩展。在创建topic时,可以在server.properties中指定partition的数量(num.partition=3),也可以在创建topic后再去修改partition数量。
在发送一条消息时,可以指定这条消息的key,producer会根据这个可以和partition机制来判断将这条消息发送到哪个partition。partition机制可以通过producer的partition.class参数来指定,该class必须要实现kafka.producer.Partitioner接口。
kafka删除旧数据策略
kafka提供了2种策略去删除旧数据:一是基于时间;二是基于partition文件大小
eg:在server.properties中配置
log.retention.hours=168 #kafka数据保留一周,删除1周前旧数据
log.retention.bytes=1073741824 #partition 文件大小超过1GB则删除旧数据
注:kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与kafka性能无关,选择删除旧数据策略只与磁盘及具体的需求有关
kafka会为每一个consumer group保留一些metadata信息—当前消费的信息的position 即offset。这个offset由consumer控制,正常情况下consumer会在消费完一条消息后线性增加offset。当然,这个offset也可以手动设置,重新消费一些消息。因为offset由consumer控制,所以kafka broker是无状态的,不需要标记哪些信息被consumer消费过,不过需要通过broker去保证同一个consumer group 只有一个consumer能消费某一条消息,因此就不需要提供锁机制,这也是kafka的高吞吐率提供了有力保障
replication & leader election
kafka从0.8版本开始提供了partition级别的replication,可以在server.properties 中配置 default.replication.factor = 1 ,默认情况下为1
replication与leader election配合提供了自动的failover机制,replication对kafka的吞吐率有一定的影响,但是极大的增强了可用性。每个partition都有一个唯一的leader,所有的读写操作都是在leader上完成,leader批量从leader上拉取数据,一般情况下partition的数量大于等于broker的数量,并且所有的partition的leader均匀分布在broker上,follower的日志和其leader上的完全一致。
kafka判断broker是否active,需要有两个条件:
(1) 必须维护与zookeeper的会话(通过zookeeper的心跳机制来实现)
(2) follower必须能够及时从leader的数据复制过来,不能”落后太多”
leader 选举机制:
每个partition都有一个”in sync”的node list,如果有一个follower宕机,或者落后太多,leader将把它从”in sync”list中移除,这里落后太多的标准,是在server.properties中配置: replica.lag.max.messages=4000; replica.lag.time.max.ms=10000
kafka在zookeeper中动态维护了一个ISR(in sync node list)set,这个set里面所有的replicat都跟上了leader,只有isr里面的成员才有可能被选中leader;在isr中至少有一个follower时,kafka可以确保已经commit的数据不丢失,如果partition的所有replica都挂了,就无法保证数据不丢失了,这种情况下有两种可行的方案:
(1) 等待isr中的任一个replica “活”过来,并且选它作为leader
(2) 选择第一个“活”过来的replica(不一定是isr中的)作为leader
这就需要在可用性和一致性当中做出一个简单的平衡,如果一定等待isr中的replica活过来,那么可能会耗费比较长的时间;如果选择非isr中的replica作为leader,即不能保证包含所有已经提交的消息。Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
consumer group
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer消费。(不同group可以同时消费同一条消息)
Consumer Rebalance
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。
文件存储
同一个Topic 通常存储的是一类消息,每个topic内部实现又被分成多个partition,每个partition在存储层面是append log文件。
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
• 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
• 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
• segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。
• segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
副本放置策略
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。Kafka分配Replica的算法如下:
• 将所有存活的N个Brokers和待分配的Partition排序
• 将第i个Partition分配到第(i mod n)个Broker上,这个Partition的第一个Replica存在于这个分配的Broker上,并且会作为partition的优先副本
• 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
假设集群一共有4个brokers,一个topic有4个partition,每个Partition有3个副本。
同步策略
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。
Kafka Replication的数据流如下图所示:
acks
枚举值【-1,0,1】
返回值“0”,表示producer每生产一条数据,不会等broker确认是否已经提交到log
返回值“1”,表示producer每生产一条数据,会跟leader的replica确认是否收到数据,这种保证了延迟性小的同时确保了leader成功接收了数据
返回值“-1”,表示producer每生产一条数据,会跟所有的replica确认是否收到数据,这种情况延迟性最大
ksql
kafka 监控
Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息
下载地址:https://github.com/smartloli/kafka-eagle
文档:https://ke.smartloli.org/2.Install/2.Installing.html
安装
解压 tar –zxvf kafka-eagle-${version}-bin.tar.gz
添加环境变量
export KE_HOME=/data/soft/new/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
修改系统环境变量 system-config.properties
cluster1.zk.list=node51:2181,node50:2181
修改mysql的配置信息,包括创建mysql库ke,执行sql目录下ke.sql 创建表信息
启动kafka eagle
sh bin/ke.sh start
浏览器访问
http://node51:8048/ke 登录用户名从mysql的user表中查询,admin/123456
kafka基本命令
生产消息
sh kafka-console-consumer.sh --bootstrap-server node51:6667 --topic mp1
消费消息
sh kafka-console-producer.sh --broker-list node51:6667 --topic mp1
topic描述
sh kafka-topics.sh --zookeeper node51:2181 --describe --topic mp1
topic列表
sh kafka-topics.sh --zookeeper node51:2181 --list
网友评论