一、发送原理剖析
image消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入
RecordAccumulator
(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator
中取出消息并批量发送出去,需要注意的是,KafkaProducer
是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
二、其他生产者参数
之前提及的默认三个客户端参数,大部分参数都有合理的默认值,一般情况下不需要修改它们。
参考官网:http://kafka.apache.org/documentation/#producerconfigs
1.acks
这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功的。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。
- ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
- ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
- ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出以下异常
image
2.retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了 retires
设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,可以通过 retry.backoff.ms
参数来修改这个时间间隔。
3.batch.size
当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把 batch.size
设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。
4.max.request.size
该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。 broker
对可接收的消息最大值也有自己的限制( message.max.size
),所以两边的配置最好匹配,避免生产者发送的消息被 broker
拒绝。
总结
本章主要讲了生产者客户端的用法以及整体流程架构,主要内容包括配置参数的详解、消息的发送方式、序列化器、分区器、拦截器等,在实际使用中,Kafka已经提供了良好的Java客户端支持,提高了开发效率。
参考资料:《Kafka技术手册》
免费获取方式:私信【资料】
免费获取
还有更多Java PDF学习资料等你来拿!!!
网友评论