美文网首页
2012NETDB: Kafka: a Distributed

2012NETDB: Kafka: a Distributed

作者: Caucher | 来源:发表于2021-04-16 21:14 被阅读0次

标题:Kafka:一个用于日志处理的分布式消息队列
久负盛名的kafka,在工业生产中广泛使用的中间件,今天来学习一下原论文,理解下最初的设计思路。本文与2011年产出,发布式LinkedIn公司已经工业化部署Kafka6个月。

ABSTRACT

Kafka设计的初衷是能够低延迟的收集和分发大体量的日志数据,而且同时应用于在线和离线消费,而且要具备可扩展性。

1. Introduction

日志在本文中有更广泛的概念:用户行为日志+系统监控日志。
日志数据一般用于离线分析,但是目前也有部分系统需要实时的消费日志:搜索引擎、推荐系统、推送等。
实时使用这些日志数据并不容易,和事务型数据相比,它们的体量太大了。
用于离线分析时,已经诞生了一些数据聚合运输工具,比如Flume,可以离线的把数据运送到Hadoop或者数仓里去。
为了能够低延迟的消费这些日志,提出了kafka,一个将日志聚合器和消息系统结合起来的中间件。这使得kafka同时有分布式日志聚合器的高吞吐量,又有消息系统的低延迟。

2. Related Work

现有的消息队列的不合适之处:

  1. 现有的这些消息队列和日志收集处理的关注点不一致:消息队列主要关注于事务性、持久性等等,使得系统和API都非常复杂,然而事实上丢失部分日志数据不会有多大影响。
  2. 现有的消息队列不关注吞吐量。
  3. 现有的消息队列对于分布式支持太弱。
  4. 现有的消息队列缓冲区太小了,不适合大量囤积。

现有的日志聚合器主要用于离线传输数据至数仓,不合适之处在于:

  1. 不合适的实现细节暴露;
  2. push模型而非拉模型,容易导致消费端流量过高。

3. Kafka Architecture and Design Principles

某种特定类型的消息流被称为topic,topic存储于多个broker中。消费者可以通过订阅topic,进而从brokers中拉取消息。
从概念上来说消息是简单的,kafka的API尽量维护了这种简单性。producer可以单点或批量生产消息:


image.png

消费者为了消费一个主题的消息,要创建一个或多个消息流。消息将均匀的被拉取到这些消息流中。每个消息流都实现了一个迭代接口,这个迭代器永远不会终止,如果消费完了,就会阻塞线程。
作为分布式的特性:topic会被分布式的存储成多个partition位于多个broker。
作为消息队列的特性:多个producer和多个consumer可以同时生产和消费。


image.png

3.1 Efficiency on a Single Partition

  • Simple storage:
    kafka的存储模式是很简单的。一个topic的partition由一个逻辑上的log构成。一个log在物理上由一系列segment files(近似等大小)构成。broker收到消息之后,会现在内存堆积,定时/定量地flush到磁盘中,也就是在上一个segment file后面做append操作。只有flush到磁盘中的数据才能够被消费。
    kafka没有显示的message id,用消息在文件中的偏移量来代替message id,因此也不用索引和随机磁盘访问。注意到kafka message id是递增且不连续的(消息有长度)。
    消费者会从一个特定位置的offset开始消费,这意味着前面的消息他已经消费完了。每个消费的pull request包含消费起点(offser)和可接受的长度。broker在内存中维护各个Segment file的start offset,收到请求后,可以定位到Segment file并返回数据。此后由消费者计算下一次的消费位点。


    image.png
  • Efficient transfer:

    1. 消费者看似是一次迭代一条消息,实际上也是取了多条消息缓存;

    2. broker不做显示的消息缓存,全部交给os的page cache来做,好处主要是:

      1. 不会产生冗余cache;
      2. 进程重新启动时不会产生冷启动;
      3. 没有GC,非常容易在虚拟机上实现;
      4. 由于producer和consumer都是顺序的访问log,所以page cache显得非常高效。
    3. 网络传输优化:网络方面,由于kafka是多订阅系统,同一条消息可能被多个订阅者反复消费。一般的linux文件系统传输方式要分四步:

      1. 从disk读取到page cache/kernel buffer (user mode -> kernel-mode 读开始)
      2. kernel buffer -> user buffer (kernel-mode -> user mode 读取结束)
      3. user buffer -> kernel buffer (user mode -> kernel-mode 写开始)
      4. kernel buffer -> socket (kernel-mode -> user mode写结束)

      总共需要4次数据的copy,两次系统调用,4次模式转变/上下文切换。linux kernel2.0之后有了sendfile api,简化成了以下三步:

      1. disk -> kernel buffer (user mode -> kernel mode)
      2. kernel buffer -> socket kernel buffer
      3. socket kernel buffer -> socket (kernel-mode -> user mode)

      总共需要3次数据copy,一次系统调用,2次模式转变。linux kernel2.1之后sendfile api再一次优化,将第二步中的kernel buffer -> socket kernel buffer也简化掉了,只标记buffer位置即可。那么就只剩下2次数据copy。kafka的网络文件传输,用的就是sendfile api。

  • Stateless broker:kafka的broker是无状态的,每个消费者自己消费到哪里了由它们自己来维护。按照用户设置,broker定期删除日志。这样的话,其实consumer不仅可以顺序消费日志,还可以往前“倒带”重新消费,虽然听上去和队列的性质完全不符,但是实际应用中有很大的便利性。而且由于kafka的可扩展性强,存储一定时长(7天)不影响性能。

3.2 Distributed Coordination

producer对broker的发送,要不然是轮询,要不然是按照partition key和partition function进行分区。
接下来主要研究consumer和broker的一致性。
首先介绍一个概念:消费组。
消费组(consumer group):一个consumer的集合,可能分布于不同机器,他们共同订阅了同一组topic。在消费组中,每一个topic的partition总是被消费组中的一个consumer消费,然后通过一致性协议,传递给组内其他消费者。不同的消费组之间,不要任何的同步考量。

第一个设计原则:partition是最细粒度的并行。也就是说,在任何时刻,对于某个partition,只能由一个consumer group中的消费者来消费。为了保证负载均衡,可以将一个topic过度分区,超过consumer group中的consumer个数。这样每个consumer都会分到几个分区。

第二个设计原则:同一个消费组的消费者之间用P2P的架构方式。也就是说没有master掌管元信息,而是通过一致性协议来完成的。这里kafka直接用的是zookeeper来接管。
zookeeper的API像是文件系统一样,有路径,子路径,每个路径可以对应值。每个路径有两种模式,临时的/永久的,区别就在于创建(注册)它们的client挂掉的时候,这些路径是否被删除。client还可以向某些路径注册watcher以监控变化。

kafka在四种情况下会向zookeeper注册路径:

  1. broker注册(临时的):host, port, topics & partitions on it
  2. consumer注册(临时的):注册的topics, 属于的consumer group

每个消费组包含两种路径注册:

  1. ownership注册(临时的):ownership是指某个consumer负责消费某个partition。ownership注册包括一个partition的路径,值是consumer id。
  2. offset注册(永久的):一个partition的路径,值是最近消费的offset。

每个consumer都向zookeeper中所有的broker和consumer注册一个Watcher,每当发生变化时(增加/减少),则启动rebalance进程以进行负载均衡。

rebalance:首先释放当前的ownership注册。读取zookeeper中的broker和consumer,分别排序,按照顺序重新确定自己own的partition,尝试注册,注册成功则拉取offset(永久保存于zookeeper),然后开始拉取数据。
如果注册时发现新分配的partition仍然被占用,那么退出进程,释放自己的ownership registry,过一会重试。

image.png

3.3 Delivery Guarantees

  1. kafka是至少一次的传输保障。确切一次的语义需要两阶段提交,过于复杂。大多数时间,kafka也是确切一次的语义,只有一个特殊情况会导致重复,一个consumer宕机了,但是clean的不太彻底,消费了消息但是没能提交给zookeeper。下一个Consumer接替他的时候,那些没提交zookeeper的消息会重复消费。所以程序要么设计成幂等的,要么利用offset检测重复,无论如何也会比两阶段提交更为高效;
  2. kafka可以保证comsumer从一个broker中消费信息是顺序的,但是整体的消息顺序是无法保证的;
  3. kafka为每条消息提供CRC进行验错;
  4. 一旦一个broker彻底坏掉,存储也坏掉,那么这些消息就彻底没了。注意这只是初版的kafka设计,后面kafka补充了replica的设计。

4. Kafka Usage at LinkedIn

此时的kafka的负载均衡部分仍然依赖于外部。


image.png
  • rebalnce: 当broker增加或减少时,consumer应该有rebalance机制导向不同的broker。
  • 审计系统:producer生产消息时,带有生产时间戳和server name;而且周期性的发出监控事件,统计产生日志的条数,供consumer检验,以保证数据不缺不漏。
  • 序列化:用的是Avro序列化工具,以保证producer和consumer达成协议。收发方都基于一些scheme对消息进行序列化编码、解码,kafka提供一个简易的scheme registry。第一次遇到某个scheme的时候,去kafka registry取用即可。

5. Experimental Results

  • Producer Throughput
    为什么kafka的吞吐量高这么多呢?作者给出了3点解释:
  1. kafka没有对网络传输有ack,全部的异步发送,只要broker带宽和缓存足够。但这也意味着消息丢了也没人知道。
  2. kafka的消息头很小,只有9Byte。ActiveMq有144Bytes,主要都用于构建B树的信息。
  3. kafka的batch均摊了网络传输开销限制。


    image.png
  • Consumer Throughput
    作者也给出了几点解释:
  1. 仍然是受益于高效传输,较小的消息头。
  2. 无状态的broker消息协议。
  3. Sendfile api的使用。


    image.png

相关文章

网友评论

      本文标题:2012NETDB: Kafka: a Distributed

      本文链接:https://www.haomeiwen.com/subject/pqrjlltx.html