生产者
建立连接时机
- 启动
在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。
- 创建KafkaProducer 实例时,根据配置节点数,建立所有链接
- 选负载最小的一个节点获取元数据信息
- 根据元数据信息与所有节点建立连接
- 除了发送消息有往来的节点,其他节点都没数据交互,过一段时间超时,节点主动断开空闲链接
- 元数据变动
当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接,有两个场景更新元数据:
- 当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。
- Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。
- 发送消息时发现与broker无连接时会建立
关闭连接时机
- 客户端自己调close
- broker自动断开
这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。
一旦broker自动断开,连接变成close-wait状态。
消费者
建立连接时机
TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
-
.发起 FindCoordinator 请求时
消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群(负载最小的节点)发送一个名为FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。
但当第三类连接建立起来之后,这个连接会被关闭。 -
连接协调者时
消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接,并保持心跳上报。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。 -
消费数据时
消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。 -
示例
image.png
image.png
image.png
网友评论