Kafka消息生产及消费大体流程
发送流程
image.png1、名词含义:
1)Producer :消息生产者,就是向kafka broker发消息的客户端。
2)Consumer :消息消费者,向kafka broker取消息的客户端
3)Topic :可以理解为一个队列。
4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
2、当要生产一条消息时大致流程如下:
1)Producer 会计算本条消息需要发送的partition。
2)Producer 根据发送的分区,向zookeeper获取对应partition的leader信息,发送消息到leader所在的broker。
3)leader在本地记录该消息。
4)follower通过轮询监控到leader新写入消息,主动拉取消息。
5)follower同步消息成功向leader发送ack。
6)leader收到所有follower同步的消息,向producer发送确认ack。
3、详细流程
3.1、首先是我们的producter调用kafka提供的rpc接口的send方法将我们要发送的消息给到kafka服务器,这其中会产生一个问题,即当生产一条消息时(某个Topic),它只会属于某一个分片,那么这个消息应该归属哪一台服务器,或者哪一个分片呢? procducer怎么知道该发送到哪个机器上? 通常这类问题有两个解决方案:
1)有proxy专门负责这类判定、转发,client对细节无感知(类似MySQL中间件代理)
2)Producer(client)端掌握server端的详细信息,实现重型Client(类似redis cluster)
而kafka使用的就是第二种方案,Producer 可以通过bootstrap.servers中任意一个kafka实例,拉取到所有元信息,比如某个Topic有多少个Partition,每个Partition的leader的地址,这些元信息Producer会定时轮询更新。
每个kafka节点都有完整的元信息,Producer可以通过任意节点拉取,源头维护于Zookeeper之中,当集群中的Partition等元信息发生变更,Controller节点会逐一推送给其他Broker最新信息。zookeeper的作用其实主要是两个,一是作为存储,二是基于其Watch能力做事件驱动(例如元信息更新推送)。
3.2、消息到达kafka服务器后,会先经过拦截器,接着进入序列化器。序列化器主要用于对消息的Key和Value进行序列化。接着进入分区器选择消息的分区。
3.3、上面这几步完成之后,消息会进入到一个名为RecordAccumulator的缓冲队列,这个队列默认32M。当满足以下两个条件的任意一个之后,消息由sender线程发送。
条件一:消息累计达到batch.size,默认是16kb。
条件二:等待时间达到linger.ms,默认是0毫秒。
所以在默认情况下,由于等待时间是0毫秒,所以只要消息来一条就会发送一条。
3.4、Sender线程首先会通过sender读取数据,并创建发送的请求,针对Kafka集群里的每一个Broker,都会有一个InFlightRequests请求队列存放在NetWorkClient中,默认每个InFlightRequests请求队列中缓存5个请求。接着这些请求就会通过Selector发送到Kafka集群中。
3.5、当请求发送到Kafka集群后,Kafka集群会返回对应的acks信息。生产者可以根据具体的情况选择处理acks信息。
0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。
到此,一条消息的发送任务就结束了。
消费流程
Kafka的消费方式
消费方式常见会有两种,即推送模型(push)和拉取模型(pull)。
Push方式在将消息推送到消费者后,就会将这条消息标记为 已消费 状态,但是如果在消费者消费消息时出现异常或者其他意外情况导致消息其实并没有被正常消费,那么该条消息就有可能存在丢失的情况。需要解决这个问题,就需要在kafka服务器在推送消息后不能直接标记消息为 已消费 ,而是会存在一个中间态,即 已发送 ,待消费者正常消费完成后再将状态改成 已消费,这种方式带来了更多的额外消耗。
Poll方式则是由消费者自行记录消息消费状态,每个消费者独立且有序的拉取每个分区的消息到本地。
Kafka使用的就是poll方式。
image.png消息拉取执行流程:
1、Consumer从 zookeeper当中获取到 partition 以及 consumer 对应的 offset (默认从zookeeper中获取上一次消费的offset)
2、找到该分区的leader,拉取数据
3、leader从本地log(日志)当中读取数据,最终返回给消费者
4、最终拉取完数据,提交offset给zookeeper。
kafka实际应用场景
- 消息
kafka更好的替换传统的消息系统,消息系统被用于各种场景,与大多数消息系统比较kafka有更好的吞吐量内置分区,副本和故障转移,这有利于处理大规模的消息。
多用于解耦消息的发送方和消费方。
根据我们的经验消息往往用于较低的吞吐量,但需要低的端到端延迟并需要提供强大的耐用性的保证。在这一领域的kafka比得上传统的消息系统,如ActiveMQ或RabbitMQ等。
- 网站活动追踪
kafka原本的使用场景是用户的活动追踪,网站的活动(网页游览,搜索或其他用户的操作信息)发布到不同的话题中心,这些消息可实时处理实时监测也可加载到Hadoop或离线处理数据仓库。
- 指标
kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告,用于监测数据,分布式应用程序生成的统计数据集中聚合。
- 日志聚合
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器中收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低延迟的处理并更容易支持多个数据源和分布式数据消费。
- 流处理
kafka中消息处理一般包含多个阶段。其中原始输入数据是从kafka主题消费的,然后汇总,丰富,或者以其他的方式处理转化为新主题,例如,一个推荐新闻文章,文章内容可能从“articles”主题获取;然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流。
除了Kafka Streams还有ApacheStorm和Apache Samza可选择。
- 事件采集
事件采集是一种应用程序的设计风格,其中状态的变化根据时间的顺序记录下来,kafka支持这种非常大的存储日志数据的场景。
- 提交日志
kafka可以作为一种分布式的外部日志,可帮助节点之间复制数据,并作为失败的节点来恢复数据重新同步,kafka的日志压缩功能很好的支持这种用法,这种用法类似于Apacha BookKeeper项目。
- 大数据的实时计算
kafka被应用到大数据处理,如与spark、storm等整合。
网友评论