Kafka 核心概念
Message
Key+Value ,根据路由规则发往Broker
Topic和Log
image.png- 同一个topic会在不同Broker上分成多个partition
- 每个partition对应一个Log,每个Log存储时候分成多个Segment。
- 写操作都是追加写
- 每个Segment默认1G,写满以写下一个Segment
- 用一个索引文件对Log进行索引,不是每条记录都有索引,而是稀疏记录的。
Topic的保存策略
- 根据retention
- 根据日志文件大小的阈值
- Log Compact策略
Broker
Replica
- 每个partition可以有1个或多个replica
- 其中一个选做lead,其实为follower
-
Follower主动去从lead拉取最新数据,更新自己本地Log
image.png
ISR Table
- ISR - In Sync Replica, 和Lead进度接近的Follower
- 需要和zookeeper保持连接
- 本地offset和Lead的offset的差值小于阈值
- 每个Lead维护ISR Table,对于失去连接的,或者catch up慢的replica,从ISR Table中删掉
HW和LEO
- HW : High Watermark, 在ISR Table中最慢的那个offset决定了HW
- LEO : Log end offset, 消费者提交的职位
- 消费者只能拉取HW标记之前的数据, 之后的数据对消费者不可见
-
HW之前的数据被称作committed的数据
image.png
offset为11的数据最终变成commit状态
Cluster和Controller
- 多个Broker组成Cluster
- 每个Cluster有一个Controller,管理replica状态,监听zookeeper上数据变化。Controller的竞选由Zookeeper帮忙选择。
Producer
Properties props = new Properties();
props.put("client.id", InetAddress.getLocalHost().getHostName());
props.put("bootstrap.servers", "host1:9092,host2:9092");
props.put("key.serializer", ...);
props.put("value.serializer", ...);
producer = new KafkaProducer<K, V>(props);
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
Consumer
-
offset可以由消费者自己维护,也可以由Kafka集群帮忙维护
image.png
Consumer Group
image.png- 每个Consumer只能属于一个Consumer Group
- 每个partition只能被Consumer Group中的一个Consumer消费
Properties props = new Properties();
props.put("bootstrap.servers", "host1:9092,host2:9092");
props.put("group.id", "test"); //group id
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer = new KafkaConsumer<K, V>(props);
consumer.subscribe(Arrays.asList("test1","test2"));
ConsumerRecords<K, V> records = consumer.poll(100);
Rebalance
image.png image.png协议
- Kafka自定义了进程间通信API的格式,大约有40多个API。有的是Broker之间发送的,有的是Producer和Broker之间的,有的是Consumer和Broker之间。
- 每个API包括了Request和Response,每个API会自带版本信息
Produce过程
- Producer Client会定期(每隔5分钟)MetaData请求给Broker(选择负载最小的一个),获取Broker的分布,Partition的分布情况
- Producer Client按照路由规则,将消息路由到相应partition上
Produce过程的传递保证语义(Delivery guarantee semantic)
- 默认时,Kafka的Produce保证了At Least Once语义。
- 由于网络原因没有响应,导致Client会重发消息。不会丢数据,但可以能会有重复数据。
- 在0.11版本中实现了幂等性, 幂等性 + At Least Once = Exactly Once
Produce的幂等性的实现
- 每次建立连接的时候,向Broker发送一个InitProducerId命令,获取Cluster全局唯一的Produce ID
- 每次发送消息的时候,每条消息会自带sequence number,Broker判断Produce ID+sequence number出现过,即认定已经发过此条消息。
- 添加InitProducerId命令,修改Produce命令
- 下面代码打开Produce的幂等性
props.put("enable.idempotence", "true");
Consume过程的传递保证语义(Delivery guarantee semantic)
- 先消息处理,然后commit offset,但commit失败,会导致At Least Once
- 如果消息处理实现幂等性,可以认为整体是Exactly Once的
- 先commit offset,然后处理消息,但处理消息失败,会导致At Most Once
- 高版本Kafka已经通过2PC实现了Exactly Once
Consume使用Transaction
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
String msg = "matt test";
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction();
image.png
Consume的具体过程
- 默认每间隔5分钟会发送一次 MetaData 命令
- 每个Consumer member向Cluster中一台(选择负载最低的一个),发送GroupCoordinator命令,获取哪个broker是Coordinator。
- 每个Consumer member向Coordinator发送JoinGroup命令, GroupCoordinator在给所有的member返回时,只有一个member被标记为Lead
- 每个Consumer member定期向Coordinator发送HeartBeat命令
- 每个Consumer member向Coordinator发送SyncGroup命令,其中Lead会发送如何分组的信息
- 当Coordinator发现HeartBeat停止,或者由member发送LeaveGroup命令,触发reblance操作,重新分配分组
- Member发送OffsetCommit命令
- Member发送OffsetFetch命令
- Member发送Fetch命令消费message
Log的存储
image.pngLog的Index
image.pngreading list
- https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
- https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
- https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
rd_kafka_subscribe => RD_KAFKA_OP_SUBSCRIBE
rd_kafka_cgrp_subscribe
rd_kafka_cgrp_join
set state to RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN
rd_kafka_JoinGroupRequest
rd_kafka_cgrp_handle_JoinGroup
rd_kafka_SyncGroupRequest
rd_kafka_handle_SyncGroup
网友评论