一、简介
1. 什么是kafka
Kafka 是一款分布式消息发布和订阅系统,具有高性能、高吞吐量的特点而被广泛应用与大数据传输场景。它由 LinkedIn 公司开发,使用 Scala 语言编写,之后成为 Apache 基金会的一个顶级项目。
2. kafka 产生背景
早期,kafka是用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)。先把各种活动数据以日志的形式写入某种文件,然后周期性的对这些文件进行统计分析。
3. kafka 的应用场景
由于 kafka 具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka 每秒可以处理几十万消息),让 kafka 成为了一个很好的大规模消息处理应用的解决方案。在企业级应用里,主要用作:
-
行为追踪
-
日志收集
总之, 套路就是 通过发布-订阅模式将数据写入对应的topic中,然后由后端平台进行进一步地处理
4. 基本架构组成
-
broker
-
producer
-
consumer group
-
zookeeper 集群
Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从 broker 订阅并消费消息。 多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中。三者通过 zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
kafka frame.png
二 、集群搭建
1. 配置集群
-
conf/seervice.properties 需要修改的项
broker.id=0/1/2...
zookeeper.connect=zookeeperIp1:2081,zookeeper ip2:2081,zookeeper ip3:2081...
listeners=PLAINTEXT://当前主机ip:9092
2. 启动集群
-
启动配置的zookeeper集群/主机
-
启动kafka
sh kafka-server-start.sh -daemon ..config/server.properties
3.kafka命令行
- 创建topic test
sh kafka-topics.sh --create --zookeeper 192.168.1.110:2181 --replication-factor 1 --partitions 1 --topic test
replication-factor :副本
partitions:分区
- 查看topic 列表
sh kafka-topics.sh --list --zookeeper 192.168.1.110
- 控制台提供消息
sh kafka-console-producer.sh --broker-list 192.168.1.110:9092 --topic test
- 控制台消费消息
sh kafka-console-consumer.sh --bootstrap-server 192.168.1.110:9092 --topic test --from-beginning
三、在代码里使用
1. 引入依赖
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
2. 配置说明
producer配置项
-
batch.size (默认16KB) producer对于同一个分区来说,会按照batch.size的大小统一收集批量发送
-
linger.ms (默认0) 延迟后批量发送
batch.size / linger.ms 只需满足其中一个,就会发送。因此这两个配置需要配合使用
-
max.request.size (默认1M) 请求的最大字节数 数据太大影响发送、接收性能
-
acks producer 发送消息到 broker 上以后的确认值。有三个可选项
-
0:表示 producer 不需要等待 broker 的消息确认。这个选项时延最小但同时风险最大(因为当 server 宕机时,数据将会丢失)。
-
1:表示 prod ucer 只需要获得 kafka 集群中的 leader 节点确认即可,这个选择时延较小同时确保了 leader 节点确认接收成功。
-
all(-1):需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最高,
-
Consumer配置项
-
GROUP_ID_CONFIG 消费组
一个topic下的数据,对于一个groupId中的consumer来说是竞争的,即一条数据只能被一个consumer消费
不同groupId,不存在竞争
-
AUTO_OFFSET_RESET_CONFIG
earliest 对于新的groupId来说,重置offset,从最早的消息开始消费
offset 偏移量,相当于一个指针或游标,指向的是当前分区数据的位置
latest 对于新的groupId来说,取已经消费并且提交的最大offset 即取最近的一个值
none 对于新的groupId,如果之前没有offset(新的group一定没有offset),会抛出异常(NoOffsetForPartitionException)
none: throw exception to the consumer if no previous offset is found for the consumer's group
-
ENABLE_AUTO_COMMIT_CONFIG
自动提交,消息消费后,如果不提交,可以一直消费。
如果设置为false, 则可以通过 KafkaConsumer.commitAsync();手动提交
-
MAX_POLL_RECORDS_CONFIG
设置每一次调用 poll() 返回的消息数,批量返回以减少poll()的次数,提升性能
3. Spring - boot整合kafka
四、kafka定义和原理
1. Topic 和 Partition
topic
topic是存储消息的逻辑概念,不同的topic下的数据是分开存储的。不同的 topic 的消息是分开存储的, 每个 topic 可以有多个生产者向它发送消息,也可以有多 个消费者去消费其中的消息。
topic.pngpartition
一个 topic 可以划分多个分区(每个 Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。第i个分区分配在第 i mod n 个broker上。
每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset 保证消息在分区内的顺序,offset的顺序不跨分区,即kafka 只保证在同一个分区内的消息是有序的。
partition.png
2. offset
每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序。offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。
3. 消息的分发策略
消息是kafka中最基本数据单元。一条消息由Key、Value两部分构成,其中,key可以指定也可以不指定。默认情况下,kafka 采用的是 hash 取模算法决定消息存储到哪个分区。如果Key 为 null,则会随机分配一个分区。
4. 消息的消费策略
一个consumer group-0 里有3个consumer时,他们一起消费topic.test,这个test下有3个分区,怎么协调?
c1消费p0,c2消费p1,c3消费p2
kafka的策略是:一个分区只能由一个消费者消费。
分区分配策略:
-
范围分区(Range strategy 默认)
范围分区策略先对一个主题里面的分区按照序号排序,并对消费者按字母顺序排序。对于如上3个分区,3个消费者,排序后:
分区序列:0,1,2
消费者序列:C1-0,C2-0,C3-0
然后将分区数 / 消费者数 决定每个消费线程消费几个分区,最终
C1-0 : p0
C2-0:p1
C3-0:p2
如果2个主题,每个主题10个分区,group-0下的3个消费者怎么协调呢?
C1-0 将消费 T1 主题的 0, 1, 2, 3 分区以及 T2 主题的 0,1, 2, 3 分区 C2-0 将消费 T1 主题的 4, 5, 6 分区以及 T2 主题的 4, 5, 6 分区 C3-0 将消费 T1 主题的 7, 8, 9 分区以及 T2 主题的 7, 8, 9分区
-
轮询分配
轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,按照 hashcode 进行排序。然后将所有分区依次轮流分配给所有consumer。
5. 什么时候触发消费分配策略?
消费者分区分配策略又叫 consumer rebalance。当:
-
一个consumer group里新增消费者
-
有消费者离开当前的consumer group
-
topic新增分区
消费者 <= 分区数,若大于分区数,就会有闲置的消费者
partition最好是consumer的整数倍
网友评论