目录
- 生产者是如何向kafka发送消息的
- 生产者发送消息有几种方式
- 生产者怎么通过配置进行调优
- 生产者是如何序列化键和值得对象
- 分区器是如何进行分区的
- 单个分区顺序性保证
- 消费者多线程时顺序性保证
- kafka重试机制
生产者
生产者是如何向kafka发送消息的?
生产者发送消息.png生产者发送消息有几种方式?
- 发送并忘记
- 同步发送,使用send发送消息,会返回一个Future对象,调用get()方法进行等待
- 异步发送,调用send方法,并指定一个回调函数,服务器在返回相应的时候调用该函数
生产者怎么通过配置进行调优?
- acks,指定必须有多少个分区副本收到消息,acks=0、1、all对信息丢失和性能
影响都是不一样的 - buffer.memony,设置生产者内存缓冲区的大小
- copression.type,默认情况下,消息发送时不会被发送,提供snappy, gzip, lz4三种压缩。snappy压缩算法由google发明,占用较少的cpu,却能够提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的CPU,但会提供更好的压缩比,如果网络带宽受限制,可以使用这种算法。压缩可以降低网络传输开销和存储开销,往往是发送消息的瓶颈所在。
- retries,该参数指定重发的次数,重试等待间隔是100ms,可以通过retry.backoff.ms参数来改变这个时间间隔。
- batch.size,批次的大小。设置的很大也不会造成延迟,只是会占用更多的内存。如果设置太小,生产者需要更频繁发送消息,增加额外开销。
- linger.rms,该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。设置的大了,增加延迟,提升吞吐量。
- client.id,可以是任意字符串,服务器用它来识别消息的来源
- max.in.flight.requests.per.connection,该参数指定了生产者在收到服务器响应之前可以发送多少个消息。值越高,占用越多的内存,提升了吞吐量
- timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms,timeout.ms指定了生产者在发送消息时等待服务器响应的时间。metadata.fetch.timeout.ms指定了生产者获取元数据时等待服务器响应的时间。timeou.ms指定broker等待同步副本返回消息确认的时间,与asks的配置相匹配。
- max.block.ms,该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。
- max.request.size,该参数用于控制生产者发送的请求大小。可以指能发送的单个消息的最大值,也可以指定单个消息里所有消息总的大小。
- receive.buffer.bytes和send.buffer.bytes,这两个参数分别指定了TCP socket接受和发送数据包的换乘区大小。
生产者是如何序列化键和值得对象?
- 自定义序列化器,自定义序列化有很多局限。
- 使用已有的序列化器和反序列化器,比如JSON,Avro、Thrift或Protobuf。比如Avro,,是一种与编程语言无关的序列化格式。Avro数据是通过与语言无关的schema来定义,schema通过json来描述,数据被序列化成二进制文件或Json文件,不过一般推荐使用二进制文件。在使用过程中,设置value.serializer为所定义的avroSerializerJ。
分区器是如何进行分区的?
- 使用分区器。分区器使用了键,相同的键的消息将被写到同一个分区,也就是说,如果一个进程只从一个主题的分区读取数据,那么相同键的所有记录都会被该进程读取。如果键值设为null, 并且使用默认的分区器,那么记录将会被随机地发送到主题内各个可用的分区上。分区器使用轮询算法将消息均衡地分布在各个分区上。如果键值不为空,并且使用默认的分区器,那么kafka会对键进行散列,然后根据散列值把消息映射到特定的分区上。这里关键在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有的分区,而不仅仅是可用的分区。这意味着,如果写入数据的分区是不可用的,你们就会发生错误。
- 实现自定义分区策略,使用自定义分区策略处理一些特殊情况,比如某个键的数据量特别多,可以使用自定义的分区策略对该键的数据进行分区,对其他键还可以使用散列分区算法
单个分区顺序性保证
- 如果把retries 设为非零整数,同时把max.in.flight.requests.per.connection
设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功, broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批悄息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量 ,所以只有在对消息的顺序有严格要求的情况下才能这么做。
消费者多线程时顺序性保证
- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
kafka生产者重试
- 本质上重新丢回Kafka队列里面,具体参考Kafka Producer send原理及重试机制浅析
网友评论