Kafka producer config
(Kafka 0.10.1 Documentation)
1. 基本配置
1.1 发送设置
缓冲、批、停留等如下说明。
1.1.1 buffer.memory(32MB)
生产者用该内存块缓存app要发送给kafka的记录(即消息),注意该内存并非生产者使用的所有内存。如果app发给客户端库的消息速率大于客户端库发给broker的速率,该内存buffer会填满,此处的行为,库会阻塞max.block.ms(1min),1min后,报异常。
1.1.2 batch.size(16KB)
客户端库 会把去往同一分区去的记录打成一批,注意一批是针对同一分区的吆。Size太小效率不高,当为0时,即禁用批的功能,另外,size太大就也不好,因停留的延迟是一定的(linger.ms),在此延迟内多半都达不到这个size,对应固定的buffer[size],是浪费的。
1.1.3 linger.ms(0)
在生产者重负载情况下,app到客户端库的消息速率大于客户端库到服务器的速率,生产者会使用批传输消息,甚至在生产者一般负载情况下,客户端库也想降低发送请求的次数,这个设置为此目的,当客户端库收到app的一条记录时候,并不立即发送出去,而是要等这个时间,以允许有更多的记录打成批送出去,类似nagle,这个时间就给了把多久的记录打成一批的上限。当然如果在等的过程中,满足了一批,就会立即送出去,不管该时间设置,然而如果一直不到一批(batchsize,如果负载很轻)就在该时间超时后送出,哪怕一个小批。
1.1.4 max.request.size(1MB)
生产者发送一次数据的最大大小(即一个kafka 格式的request,一个kafka protocol unit,在进入OS内核之前,一旦进入内核就是流了,因此服务器收到后,需要根据kafka自己的协议进行解析),一个request可以包括多个批,1个批可以包括多个记录(消息),因此本设置也暗指生产者所认为的一个记录的最大大小, 另外,服务器也有自己允许的一个记录的最大大小,该大小可以不同于此 max request size。 因为一个批有自己的最大大小(bashsize),因此本设置其实也是上限了在一次发送中批的数量。
1.1.5 max.in.flight.requests.per.connection(5)
每个连接上,允许多少在飞行中的request数,即发送了多少次,这些次至今未被确认,至今没有结果,这些次的数据可能在生产者的OS 内核中、可能在线上、可能在服务器的内核中、可能在kafka server app中但尚未被处理完,相当于 tcp sliding window中可以发送的数据大小。一旦超过这个发送次数,生产者的发送就被阻塞,为此阻塞多少时间没有说明。需要注意的是:如果这个发送次数设为大于1,即可以有至少两个发送在进行中,如果两次发送中都有一个批目标是相同分区、第一次发送失败、第二次发送成功,且开启失败重发的开关,这样就会导致记录的失序,在目标分区上的记录失序。
1.1.6 max.block.ms(1min)
如果内存满了(buffer memory 32M)或元数据不可用(比如:leader不可用),KafkaProducer.send()或KafkaProducer.partitionsFor()就会被阻塞,阻塞的时间就是 max block ms。
1.2 等待确认配置
1.2.1 acks(1)
假定一个数据在集群存3份,分布在一个主片(即主分区)、两个从片(副本分区)上,主片存在于一个节点(简称点)上,两个从片可以分布在两个点上或一个点上。
生产者发一个数据到某个主分所在的点上,该点处理完后,如果直接返回生产者,ack为1,如果两个从点,都返给主点,然后,主点返给生产者则ack为-1,ack为0即不要求服务端返回。
1.2.2 timeout.ms(30s)
为了1.1acks,主节点最长等待N个从节点的时间,如果acks=0、1,不用考虑该设置(此情况下,与从节点无关),其他需要考虑最长等待从节点多久为了来自从节点的确认,在此前从节点从主节点主动pull了数据(为了数据同步),从要对数据进行处理,处理到合适的地方,给主发确认。
如果超时后,主仍未收到正确的确认数目,就给生产者返回错误。
注意,此项仅为服务端使用。
1.2.3 request.timeout.ms(30s)
生产者等待响应的超时,在未超时时,如果没收到响应,生产者会重发该发送,如果重发次数用完了,则发送最终宣告失败。重发有关的:重发次数(retries),重发间隔(retry.backoff.ms:100ms)。
该项 包括 timeout.ms表示的子流程。
1.3 内核缓冲配置
1.3.1 send.buffer.bytes(128KB)
该socket内核SO_SNDBUF的大小。
1.3.2 receive.buffer.bytes(32KB)
该socket内核 SO_RCVBUF的大小。
1.4 集群元数据设置
1.4.1 metadata.fetch.timeout.ms(1min)
生产者第一次发送数据时,要取回该topic所有分区位于哪些节点上的元数据,需要最多多长时间达成这个动作,就是本项的目的,超时后,抛异常给生产者。
1.4.2 metadata.max.age.ms(5min)
生产者多久需要刷一次元数据,定时刷,第一次的获取通过 metadata fetch timeout流程。
1.5 连接设置
1.5.1 connections.max.idle.ms(9min)
生产者在最后一次发流量之后 多久关闭该连接。
1.5.2 reconnect.backoff.ms(50ms)
生产者重连间隔,如果上次连接失败。
1.6 重发请求设置
1.6.1 retry.backoff.ms(100ms)
生产者重发间隔,如果上次发送失败。
1.6.2 retries(0)
生产者发送失败时,最多重发次数,它>1与 max in flight requests per connection>1 配合时,可以导致 多次发送之间的失序, 对于 目标是 同分区的情况会导致 记录的的失序,当然,如果是有相同记录key的情况,会带来问题。
1.7 压缩设置
1.7.1 compression.type(none)
压缩源有两处可以设置:生产者、server,如果生产者开启压缩,你会注意到:变小了的生产者吐率和(或)更低的server压缩率,当收到压缩消息时,0.10.0server不会再压了,此举比再压相比会降低时延和提高吞吐率。然而,这会降低生产者的批大小(以前够一批的,一他妈压缩变半批了,又要马上送走),因此,这会导致更差的吞吐率,不过这没关系,可以调整linger(比如1ms)和batchsize,来提上去吞吐率。
此外,在生产者上压缩消息所用的内存buffer比在server上压缩消息所用的内存buffer小,因此,对于在logfile中压缩比来说,其实在生产者压缩不如在server上压缩更好,在未来版本中 这个用于压缩的 内存的大小 变成配置项。
另外,端到端的批量压缩概念,生产者压缩,server存log的是压缩之后东西,消费者解压缩,另外,最好让kafka压缩(不要使用跟kafka无关的压缩,需要kafka感知压缩才行,即可以生产者可以server),kafka可以批量压缩,批的越好压缩比越高,反之越差,切记。
网友评论