无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。
客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。
- PRODUCE 请求:用于生产消息的
- FETCH 请求:用于消费消息的
- METADATA 请求:用于请求 Kafka 集群元数据信息的。
Kafka 定义了很多类似的请求格式。截止到 2.3 版本,Kafka 共定义了多达 45 种请求格式。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。
Kafka处理请求使用的是 Reactor 模式。
一、Reactor模式
Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。
Reactor模式架构.png
多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。
Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。
二、Kafka的Reactor模型
Kafka的Reactor模型.png- SocketServer:类似于 Reactor 模式中的 Dispatcher
- Acceptor线程:将不同的请求下发到多个工作线程中处理
- 工作线程池:网络线程池
Kafka 提供了 Broker 端参数 num.network.threads
,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。
三、Broker处理请求流程
Broker处理请求流程.png1.执行流程:
- 1.当网络线程拿到请求后,将请求放入到一个共享请求队列中。
- 2.Broker端的IO 线程池,负责从该队列中取出请求,执行真正的处理。
- 3.当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端
2.IO 线程池处理的请求:
- PRODUCE 生产请求:将消息写入到底层的磁盘日志中
- FETCH 请求:从磁盘或页缓存中读取消息
Broker 端参数 num.io.threads
可以控制IO线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。可以根据实际硬件条件设置此线程池的个数(如果CPU 资源非常充裕,可以调大该参数,允许更多的并发请求被同时处理)。
3.请求队列和响应队列
- 请求队列:所有网络线程共享
- 响应队列:每个网络线程专属
Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。
4.Purgatory
用来缓存延时请求(Delayed Request)。
延时请求,是一些一时未满足条件不能立刻处理的请求。
比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
四、控制类请求和数据类请求
在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。
与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。
数据类请求:PRODUCE 和 FETCH 这类请求
控制类请求:LeaderAndIsr、StopReplica 这类请求。
控制类请求可以直接令数据类请求失效。
1.场景一问题
假设有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。
假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。
这时,如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。
如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。即使 acks 不是 all,积压的 PRODUCE 请求能够成功写入 Leader 副本的日志,但处理 LeaderAndIsr 之后,Broker 0 上的 Leader 变为了 Follower 副本,也要执行显式的日志截断(Log Truncation,即原 Leader 副本成为 Follower 后,会将之前写入但未提交的消息全部删除),依然做了很多无用功。
2.场景二问题
同样是在积压大量数据类请求的 Broker 上,当你删除主题的时候,Kafka 控制器向该 Broker 发送 StopReplica 请求。如果该请求不能及时处理,主题删除操作会一直挂起,从而增加了删除主题的延时。
3.Kafka解决方案
社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。
社区完全拷贝了图中的一套组件,实现了两类请求的分离。
Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。
极客时间《Kafka 核心技术与实战》学习笔记Day11 - http://gk.link/a/11UOV
网友评论