1. 简介
Kafka 是由 LinkedIn 开发的一个基于发布/订阅的消息系统,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
Kafka 对外使用 topic 的概念,一个 topic 可以由一个或多个 partition(分区),生产者往 topic 里写消息,消费者从 topic 拉取消息。如果一个主题有多个分区,Kafka只能保证一个分区内消息的有序性,在不同的分区之间无法保证。对于时序数据而言,如果将某个主题下的数据集合分成了多个分区,可能会造成读取数据的无序。
系统遇到瓶颈时,可以通过增加 partition 的数量来进行横向扩容。
2. 相关术语
-
broker:一台 Kafka 服务器就是一个
broke
,一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic。 -
topic:类似于消息通道名称,生产者通过
topic
向 broker 发送消息,消费者通过订阅相应的 topic 读取消息。 -
partition:一个
partition
在服务器上对应的是一个具体的文件目录,目录下存放的是消息文件和索引文件。一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列且都有一个 leader。 - producer:生产者,负责发布消息到 broker。
- consumer:消费者,负责从 broker 中读取消息。
-
consumer group:若干个 consumer 组成的集合,每个 consumer 属于一个特定的
consumer group
。每个 consumer 可指定 group name,若不指定 group name,则属于默认的group。 -
offset:Kafka 为每条在分区的消息保存一个偏移量
offset
,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。 -
segment:Kafka 还将 partition 进一步分割成多个段(
segment
),一个段对应一个服务器上的 log 文件。这样分割好处就是在清理过期的消息时,可以对整个段的文件作删除操作,从而避免对文件的随机写操作,提高吞吐量。 - replica:Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,副本分为领导者副本和追随者副本,副本在分区的层级下,每个分区可配置多个副本实现高可用。
- rebalance:消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅分区的过程,重平衡是 Kafka 消费者端实现高可用的重要手段。
3. 使用场景
-
分布式消息队列:Kafka 可以代替传统的消息队列软件(阿里的队列软件 RocketMQ 就是基于 Kafka 实现的),稳定性强、分布式容灾好、数据量不会影响到 KafKa 的速度等优点,在队列软件的选择上 Kafka 已经成了不二之选。
-
分布式日志系统:Kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka 为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka 很方便与 HDFS 和 Flume 进行整合,这样就方便将 Kafka 采集的数据持久化到其他外部系统。
-
网站用户行为追踪:为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到 Kafka 集群上,通过 Hadoop、Spark 或 Strom 等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
-
流处理:Kafka 中消息处理一般包含多个阶段,其中原始输入数据是从 Kafka 主题中消费,然后汇总处理,或者以其他的方式处理转化为新主题。例如,一个推荐新闻文章,文章内容可能从 articles 主题获取。然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流,从0.10.0.0开始。
-
应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如 CPU 占用率、IO、内存、连接数、TPS、QPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用 Kafka 与ELK(ElasticSearch、Logstash 和 Kibana)整合构建应用服务监控系统。
4. 消息投递语义
- At most once:最多一次,消息可能会丢失,但不会重复。
设置
enable.auto.commit
为 ture。
设置auto.commit.interval.ms
为一个较小的时间间隔。
client 不要调用 commitSync(),Kafka 在特定的时间间隔内自动提交。
- At least once:至少一次,消息不会丢失,可能会重复。
方法一:
设置enable.auto.commit
为 false。
client 调用commitSync()
,增加消息偏移。方法二:
设置 enable.auto.commit 为 ture。
设置auto.commit.interval.ms
为一个较大的时间间隔。
client 调用 commitSync(),增加消息偏移。
- Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现)。
设置
enable.auto.commit
为 false。
保存 ConsumerRecord 中的 offset 到数据库。
当 partition 发生变化的时候需要 rebalance,有以下几个事件会触发分区变化:
- consumer 订阅的 topic 中的分区大小发生变化。
- topic 被创建或者被删除。
- consuer 所在 group 中有个成员挂了。
- 新的 consumer 通过调用 join 加入了 group。此时 consumer 通过实现 ConsumerRebalanceListener 接口,捕捉这些事件,对偏移量进行处理。consumer 通过调用 seek(TopicPartition, long) 方法,移动到指定的分区的偏移位置。
在业务中,常常都是使用 At least once
的模型,整体的消息投递语义需要 Producer 端和 Consumer 端两者来保证。
5. 生产者分区选择配策略
生产者在将消息发送到某个 topic ,需要经过拦截器、序列化器和分区器(Partitioner
)的一系列作用之后才能发送到对应的 broker,在发往 broker 之前是需要确定它所发往的分区。
- 如果消息
ProducerRecord
指定了 partition 字段,那么就不需要分区器。 - 如果消息
ProducerRecord
没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
public class ProducerRecord<K, V> {
// 该消息需要发往的主题
private final String topic;
// 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
// 如果该值为null,则利用分区器进行分区的选择
private final Integer partition;
private final Headers headers;
// 如果partition字段为null,则使用分区器进行分区选择时会用到该key字段,该值可为空
private final K key;
private final V value;
private final Long timestamp;
Kafka 中提供的默认分区器是 DefaultPartitioner
,它实现了 Partitioner 接口(用户可以实现这个接口来自定义分区器),其中的 partition 方法就是用来实现具体的分区分配逻辑:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区。
- 如果没有指定分区,但是消息的 key 不为空,则使用称之为
murmur
的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。 - 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 首先通过cluster从元数据中获取topic所有的分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 拿到该topic的分区数
int numPartitions = partitions.size();
// 如果消息记录中没有指定key
if (keyBytes == null) {
// 则获取一个自增的值
int nextValue = nextValue(topic);
// 通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 如果该topic存在可用的分区
if (availablePartitions.size() > 0) {
// 那么将nextValue转成正数之后对可用分区数进行取余
int part = Utils.toPositive(nextValue) % availablePartitions.size();
// 然后从可用分区中返回一个分区
return availablePartitions.get(part).partition();
} else { // 如果不存在可用的分区
// 那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else { // 如果消息记录中指定了key
// 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
//toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
// 方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
private int nextValue(String topic) {
// 每个topic都对应着一个计数
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) { // 如果是第一次,该topic还没有对应的计数
// 那么先生成一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
// 然后将该随机数与topic对应起来存入map中
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
// 之后把这个随机数返回
counter = currentCounter;
}
}
// 一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
return counter.getAndIncrement();
}
6. 消费者分区分配策略
消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费。
- 如果分区数大于或者等于组中的消费者实例数,一个消费者会负责多个分区。
- 如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息。
如果多个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。
6.1 Range策略
range
(默认分配策略)对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor
。
- 首先,将分区按数字顺序排行序,消费者按名称的字典序排序。
- 然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区。
-
假设,有1个主题、10个分区、3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者C1将会多消费一个分区,分配结果是:
- C1将消费T1主题的0、1、2、3分区。
- C2将消费T1主题的4、5、6分区。
- C3将消费T1主题的7、8、9分区
-
假设,有11个分区,分配结果是:
- C1将消费T1主题的0、1、2、3分区。
- C2将消费T1主题的4、5、 6、7分区。
- C2将消费T1主题的8、9、10分区。
-
假如,有2个主题(T0和T1),分别有3个分区,分配结果是:
- C1将消费T1主题的 0、1 分区,以及 T1 主题的 0、1 分区。
- C2将消费T1主题的 2、3 分区,以及 T2 主题的 2、3 分区。
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 主题与消费者的映射
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey(); // 主题
List<String> consumersForTopic = topicEntry.getValue(); // 消费者列表
// partitionsPerTopic表示主题和分区数的映射
// 获取主题下有多少个分区
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
// 消费者按字典序排序
Collections.sort(consumersForTopic);
// 分区数量除以消费者数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 取模,余数就是额外的分区
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
6.2 RoundRobin策略
RoundRobin 基于轮询算法,对应的实现类是 org.apache.kafka.clients.consumer.RoundRobinAssignor
- 首先,将所有主题的分区组成
TopicAndPartition
列表。 - 然后对 TopicAndPartition 列表按照 hashCode 进行排序某个 topic。(
假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:
- C0将消费T0主题的0、2分区,以及T1主题的1分区。
- C1将消费T0主题的1分区,以及T1主题的0、2分区。
网友评论