一、参数
1. 必要参数
- boostrap.servers,boker的地址,部分即可
- key.serializer和value.serializer,key和value的序列化器
2. 其他
二、发送消息
1. 调用方式
- 发后即忘
- 同步
producer.send(record).get() - 异步
send with callback
2. 异常
- 可重试异常
NetworkException、LeadernotAvailableException等,会自动重试retries参数的次数,然后会抛出异常 - 不可重试异常
比如RecordTooLargeException,直接抛异常
三、发送流程
架构图来自https://blog.csdn.net/shufangreal/article/details/110657052
核心组件
- 拦截器
- 序列化器
- 分区器
- 消息累加器
这里分区1、分区2就是topic的具体partition,ProducerBatch就是多个ProducerRecord。整个消息累加器的大小由buffer.mermory控制,默认32MB,满了之后阻塞max.block.ms(默认60000),然后抛出异常 - InFlightRequests
这里的node1、node2对应的是partition转换成ip+端口的形式,Request是kafka的发送协议结构。这里存着已发送但没有收到相应的请求,每个连接最多缓存max.in.flight.requests.per.connection(默认5)个。保证有序性时这个参数要设为1
四、元数据更新
producer需要通过broker获取kafka集群的元数据,当没有所需元数据或超过metradata.max.age.ms(默认300000)就拉取一次。这里涉及leastLoadedNode的概念,也就是InFlightRequests中等待Requests最少的Node,producer会向这个节点发送拉取请求,因为认为它负载最小。
网友评论