标题:Kafka:一个用于日志处理的分布式消息队列
久负盛名的kafka,在工业生产中广泛使用的中间件,今天来学习一下原论文,理解下最初的设计思路。本文与2011年产出,发布式LinkedIn公司已经工业化部署Kafka6个月。
ABSTRACT
Kafka设计的初衷是能够低延迟的收集和分发大体量的日志数据,而且同时应用于在线和离线消费,而且要具备可扩展性。
1. Introduction
日志在本文中有更广泛的概念:用户行为日志+系统监控日志。
日志数据一般用于离线分析,但是目前也有部分系统需要实时的消费日志:搜索引擎、推荐系统、推送等。
实时使用这些日志数据并不容易,和事务型数据相比,它们的体量太大了。
用于离线分析时,已经诞生了一些数据聚合运输工具,比如Flume,可以离线的把数据运送到Hadoop或者数仓里去。
为了能够低延迟的消费这些日志,提出了kafka,一个将日志聚合器和消息系统结合起来的中间件。这使得kafka同时有分布式日志聚合器的高吞吐量,又有消息系统的低延迟。
2. Related Work
现有的消息队列的不合适之处:
- 现有的这些消息队列和日志收集处理的关注点不一致:消息队列主要关注于事务性、持久性等等,使得系统和API都非常复杂,然而事实上丢失部分日志数据不会有多大影响。
- 现有的消息队列不关注吞吐量。
- 现有的消息队列对于分布式支持太弱。
- 现有的消息队列缓冲区太小了,不适合大量囤积。
现有的日志聚合器主要用于离线传输数据至数仓,不合适之处在于:
- 不合适的实现细节暴露;
- push模型而非拉模型,容易导致消费端流量过高。
3. Kafka Architecture and Design Principles
某种特定类型的消息流被称为topic,topic存储于多个broker中。消费者可以通过订阅topic,进而从brokers中拉取消息。
从概念上来说消息是简单的,kafka的API尽量维护了这种简单性。producer可以单点或批量生产消息:
image.png
消费者为了消费一个主题的消息,要创建一个或多个消息流。消息将均匀的被拉取到这些消息流中。每个消息流都实现了一个迭代接口,这个迭代器永远不会终止,如果消费完了,就会阻塞线程。
作为分布式的特性:topic会被分布式的存储成多个partition位于多个broker。
作为消息队列的特性:多个producer和多个consumer可以同时生产和消费。
image.png
3.1 Efficiency on a Single Partition
-
Simple storage:
kafka的存储模式是很简单的。一个topic的partition由一个逻辑上的log构成。一个log在物理上由一系列segment files(近似等大小)构成。broker收到消息之后,会现在内存堆积,定时/定量地flush到磁盘中,也就是在上一个segment file后面做append操作。只有flush到磁盘中的数据才能够被消费。
kafka没有显示的message id,用消息在文件中的偏移量来代替message id,因此也不用索引和随机磁盘访问。注意到kafka message id是递增且不连续的(消息有长度)。
消费者会从一个特定位置的offset开始消费,这意味着前面的消息他已经消费完了。每个消费的pull request包含消费起点(offser)和可接受的长度。broker在内存中维护各个Segment file的start offset,收到请求后,可以定位到Segment file并返回数据。此后由消费者计算下一次的消费位点。
image.png -
Efficient transfer:
-
消费者看似是一次迭代一条消息,实际上也是取了多条消息缓存;
-
broker不做显示的消息缓存,全部交给os的page cache来做,好处主要是:
- 不会产生冗余cache;
- 进程重新启动时不会产生冷启动;
- 没有GC,非常容易在虚拟机上实现;
- 由于producer和consumer都是顺序的访问log,所以page cache显得非常高效。
-
网络传输优化:网络方面,由于kafka是多订阅系统,同一条消息可能被多个订阅者反复消费。一般的linux文件系统传输方式要分四步:
- 从disk读取到page cache/kernel buffer (user mode -> kernel-mode 读开始)
- kernel buffer -> user buffer (kernel-mode -> user mode 读取结束)
- user buffer -> kernel buffer (user mode -> kernel-mode 写开始)
- kernel buffer -> socket (kernel-mode -> user mode写结束)
总共需要4次数据的copy,两次系统调用,4次模式转变/上下文切换。linux kernel2.0之后有了sendfile api,简化成了以下三步:
- disk -> kernel buffer (user mode -> kernel mode)
- kernel buffer -> socket kernel buffer
- socket kernel buffer -> socket (kernel-mode -> user mode)
总共需要3次数据copy,一次系统调用,2次模式转变。linux kernel2.1之后sendfile api再一次优化,将第二步中的kernel buffer -> socket kernel buffer也简化掉了,只标记buffer位置即可。那么就只剩下2次数据copy。kafka的网络文件传输,用的就是sendfile api。
-
-
Stateless broker:kafka的broker是无状态的,每个消费者自己消费到哪里了由它们自己来维护。按照用户设置,broker定期删除日志。这样的话,其实consumer不仅可以顺序消费日志,还可以往前“倒带”重新消费,虽然听上去和队列的性质完全不符,但是实际应用中有很大的便利性。而且由于kafka的可扩展性强,存储一定时长(7天)不影响性能。
3.2 Distributed Coordination
producer对broker的发送,要不然是轮询,要不然是按照partition key和partition function进行分区。
接下来主要研究consumer和broker的一致性。
首先介绍一个概念:消费组。
消费组(consumer group):一个consumer的集合,可能分布于不同机器,他们共同订阅了同一组topic。在消费组中,每一个topic的partition总是被消费组中的一个consumer消费,然后通过一致性协议,传递给组内其他消费者。不同的消费组之间,不要任何的同步考量。
第一个设计原则:partition是最细粒度的并行。也就是说,在任何时刻,对于某个partition,只能由一个consumer group中的消费者来消费。为了保证负载均衡,可以将一个topic过度分区,超过consumer group中的consumer个数。这样每个consumer都会分到几个分区。
第二个设计原则:同一个消费组的消费者之间用P2P的架构方式。也就是说没有master掌管元信息,而是通过一致性协议来完成的。这里kafka直接用的是zookeeper来接管。
zookeeper的API像是文件系统一样,有路径,子路径,每个路径可以对应值。每个路径有两种模式,临时的/永久的,区别就在于创建(注册)它们的client挂掉的时候,这些路径是否被删除。client还可以向某些路径注册watcher以监控变化。
kafka在四种情况下会向zookeeper注册路径:
- broker注册(临时的):host, port, topics & partitions on it
- consumer注册(临时的):注册的topics, 属于的consumer group
每个消费组包含两种路径注册:
- ownership注册(临时的):ownership是指某个consumer负责消费某个partition。ownership注册包括一个partition的路径,值是consumer id。
- offset注册(永久的):一个partition的路径,值是最近消费的offset。
每个consumer都向zookeeper中所有的broker和consumer注册一个Watcher,每当发生变化时(增加/减少),则启动rebalance进程以进行负载均衡。
rebalance:首先释放当前的ownership注册。读取zookeeper中的broker和consumer,分别排序,按照顺序重新确定自己own的partition,尝试注册,注册成功则拉取offset(永久保存于zookeeper),然后开始拉取数据。
如果注册时发现新分配的partition仍然被占用,那么退出进程,释放自己的ownership registry,过一会重试。
3.3 Delivery Guarantees
- kafka是至少一次的传输保障。确切一次的语义需要两阶段提交,过于复杂。大多数时间,kafka也是确切一次的语义,只有一个特殊情况会导致重复,一个consumer宕机了,但是clean的不太彻底,消费了消息但是没能提交给zookeeper。下一个Consumer接替他的时候,那些没提交zookeeper的消息会重复消费。所以程序要么设计成幂等的,要么利用offset检测重复,无论如何也会比两阶段提交更为高效;
- kafka可以保证comsumer从一个broker中消费信息是顺序的,但是整体的消息顺序是无法保证的;
- kafka为每条消息提供CRC进行验错;
- 一旦一个broker彻底坏掉,存储也坏掉,那么这些消息就彻底没了。注意这只是初版的kafka设计,后面kafka补充了replica的设计。
4. Kafka Usage at LinkedIn
此时的kafka的负载均衡部分仍然依赖于外部。
image.png
- rebalnce: 当broker增加或减少时,consumer应该有rebalance机制导向不同的broker。
- 审计系统:producer生产消息时,带有生产时间戳和server name;而且周期性的发出监控事件,统计产生日志的条数,供consumer检验,以保证数据不缺不漏。
- 序列化:用的是Avro序列化工具,以保证producer和consumer达成协议。收发方都基于一些scheme对消息进行序列化编码、解码,kafka提供一个简易的scheme registry。第一次遇到某个scheme的时候,去kafka registry取用即可。
5. Experimental Results
- Producer Throughput
为什么kafka的吞吐量高这么多呢?作者给出了3点解释:
- kafka没有对网络传输有ack,全部的异步发送,只要broker带宽和缓存足够。但这也意味着消息丢了也没人知道。
- kafka的消息头很小,只有9Byte。ActiveMq有144Bytes,主要都用于构建B树的信息。
-
kafka的batch均摊了网络传输开销限制。
image.png
- Consumer Throughput
作者也给出了几点解释:
- 仍然是受益于高效传输,较小的消息头。
- 无状态的broker消息协议。
-
Sendfile api的使用。
image.png
网友评论