Kafka
官方网站:http://kafka.apache.org/quickstart
消息队列&分布式流处理
1. 单机搭建测试
Step 1: Download the code
安装包下载:
http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
下载,解压
Step 2: Start the server
-daemon为可选,后台运行
> bin/kafka-server-start.sh [-daemon] config/server.properties
Step 3: Create a topic
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Step 4: Send some messages
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Step 5: Start a consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
2. 集群搭建测试
Step 6: Setting up a multi-broker cluster
[以下操作均需要在所以机器上执行]
机器信息
192.168.8.10,192.168.8.11,192.168.8.12
编辑server.properties:
#broker.id需要每台节点唯一
broker.id=1
#以下配置统一
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.8.10:2181,192.168.8.11:2181,192.168.8.12:2181
然后每台机器分别执行
> bin/kafka-server-start.sh -daemon config/server.properties
创建一个带副本的topic
> bin/kafka-topics.sh --create --zookeeper 192.168.8.10:2181 --replication-factor 2 --partitions 1 --topic taotao
> Created topic taotao.
查看Topic信息
> bin/kafka-topics.sh --describe --zookeeper 192.168.8.10:2181 --topic taotao
> Topic:taotao PartitionCount:1 ReplicationFactor:2 Configs:
Topic: taotao Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
-
leader" 是负责给定分区的所有读写的节点。每个节点都是分区中随机选择的一部分的leader。
-
"replicas" 是指存在的于哪个或者哪些个broker上,不管broker是否存活
-
"isr" 是指存在的于哪个或者哪些个broker上,且broker是存活状态
也就是说,Kafka数据写入成功后,只要保证一个节点也就是isr至少有一个存活状态的broker,就能正常处理消息
Kafka的基本原理
数据存放是.index索引文件和.log数据文件,Kafka消费数据时需要查找offset也就是偏移量,这个偏移量的索引就在.index里面,如若有多个索引文件,则采用二分查找法查找索引文件
1. Kafka分区策略
1.1 分区原因
1.1 方便在集群中扩展,每个partition可以通过调整以适应所在的机器,而一个topic又可以有多个Partition组成,所以集群就可以适应任意数据量的大小
1.2 可以提高并发,可以按照Partition为单位读写了
1.2 副本同步策略
方案 | 优点 | 缺点 |
---|---|---|
1. 半数以上完成同步就发送ack | 延迟低 | 选举新Leader时,容忍n台节点故障,需要有2n+1个节点才可以 |
2. 全部完成数据同步才发送ack | 选举新Leader时,容忍n台节点故障,只要有n+1个副本就可以 | 延迟高 |
问题: 基于方案2,若当Leader收到数据,所有follower开始同步数据,此时某个节点故障且无法发送ack给Leader,就无法完成同步状态的更新
解决方式:
- 这时就引入了ISR的概念,ISR表示"保持和Leader同步"的节点,若节点长时间未向Leader同步数据,则踢出ISR列表,这个时间阈值由replica.lag.time.max.ms参数设定,这时当Leader挂掉时,就会从ISR例表里面选取新的Leader
在Kafka0.9.0.0版本开始,移除了replica.lag.max.message 参数被移除,因为判断是否加入ISR的条件有两个,两个条件为或的关系,且不同时满足,则会频繁加入ISR踢出ISR,同时也频繁的操作Zookeeper的元数据信息,所以新版本移除了其中一个,移除了判断条数的参数
2. Kafka数据一致性
2.1. LEO(Log End Offset):
每一个副本的最后一个Offset
2.2. HW(Hight Watermark):
所有副本中最小的LEO,也就是Consumer可以获取到的最大的Offset,这句很重要
[图片上传失败...(image-66c844-1583334746512)]
3. 故障处理:
3.1. Follower故障:
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
3.2. Leader故障:
leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的 数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
* 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
4. Kafka幂等性( Exactly Once 语义)
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义。相对的,将服务器 ACK 级别设置为0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。 但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。 在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:At Least Once + 幂等性 = Exactly Once要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。 Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时, Broker 只会持久化一条。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
5. Kafka生产者总结:
5.1. ACK问题:决定数据会不会丢失
1. (1)
5.2. ISR(In-Sync-Replica):
6. Kafka消费
6.1 消费方式:
Consumer是采用Pull的方式从broker中读取数据,Push很难适应不同的消费者,因为不同的消费者消费速率差异很大,所以Push很容易造成有的Consumer消费能力很强但是处理的消息太少,有的Consumer消费能力弱,来不及处理消息。 Pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,Consumer 会等待一段时间之后再返回,这段时长即为 timeout。
6.2 消费者组案例:
需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费
1.修改config/consumer.properties文件,新增group.id=NAME(NAME随意)
2.运行两个命令行consumer: kafka-console-consumer.sh --bootstrap-server kd-bd01:9092 --topic zhangwentao --consumer.config consumer.properties
3.运行一个不指定--consumer.config选项的consumer: kafka-console-consumer.sh --bootstrap-server kd-bd01:9092 --topic zhangwentao
结论:2中的两个consumer因为配置了同一个组,所以只能有一个人拿到消息,3因为不是同一个组,所以就算指定和2一样的topic也能拿到消息
网友评论