kafka性能优化

作者: nightwish夜愿 | 来源:发表于2018-05-04 14:02 被阅读376次

    1.partition数量配置

    partition数量由topic的并发决定,并发少则1个分区就可以,并发越高,分区数越多,可以提高吞吐量。

    创建topic时指定topic数量

    推荐一款kafka监控工具kafkatool ,可用来创建topic。

    2.日志保留策略设置

    #当kafka broker的被写入海量消息后,会生成很多数据文件,占用大量磁盘空间,kafka默认是保留7天,建议根据磁盘情况配置,避免磁盘撑爆。

    log.retention.hours=72

    #段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)

    log.segment.bytes=1073741824

    3.文件刷盘策略

    为了大幅度提高producer写入吞吐量,需要定期批量写文件。建议配置:

    #每当producer写入10000条消息时,刷数据到磁盘

    log.flush.interval.messages=10000

    #每间隔1秒钟时间,刷数据到磁盘

    log.flush.interval.ms=1000

    4.网络和io操作线程配置优化

    一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

    #broker处理消息的最大线程数

    num.network.threads=xxx

    num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.

    #broker处理磁盘IO的线程数

    num.io.threads=xxx

    #加入队列的最大请求数,超过该值,network thread阻塞

    queued.max.requests=5000

    #server使用的send buffer大小。

    socket.send.buffer.bytes=1024000

    #server使用的recive buffer大小。

    socket.receive.buffer.bytes=1024000

    5.异步提交(kafka.javaapi.producer)

    采用同步:1000条8s;

    采用异步:100条或3s异步写入,速度提升为1w条2s(ProducerConfig)

    #request.required.acks属性取值含义

    ##0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

    ##1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

    ##-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

    ##三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

    request.required.acks=0

    producer.type=async

    ##在异步模式下,一个batch发送的消息数量。producer会等待直到要发送的消息数量达到这个值,之后才会发送。但如果消息数量不够,达到queue.buffer.max.ms时也会直接发送。

    batch.num.messages=100

    ##默认值:200,当使用异步模式时,缓冲数据的最大时间。例如设为100的话,会每隔100毫秒把所有的消息批量发送。这会提高吞吐量,但是会增加消息的到达延时

    queue.buffering.max.ms=100

    ##默认值:5000,在异步模式下,producer端允许buffer的最大消息数量,如果producer无法尽快将消息发送给broker,从而导致消息在producer端大量沉积,如果消息的条数达到此配置值,将会导致producer端阻塞或者消息被抛弃。

    queue.buffering.max.messages=1000 ##发送队列缓冲长度

    ##默认值:10000,当消息在producer端沉积的条数达到 queue.buffering.max.meesages 时,阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)。此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制阻塞的时间,如果值为-1(默认值)则 无阻塞超时限制,消息不会被抛弃;如果值为0 则立即清空队列,消息被抛弃。

    queue.enqueue.timeout.ms=100

    ## 消息压缩

    compression.codec=gzip

    6.producer版本

    使用新producer发送少量消息时丢失

    新producer:org.apache.kafka.clients.producer(KafkaProducer.java)

    老producer:kafka.javaapi.producer(Producer.scala)

    查阅资料后,原因为使用producer时必须调用producer.close(),且在发送后Thread.sleep适当时间,则不会丢失数据。否则会造成资源泄露,导致数据丢失。

    当使用多个producer进行发送时(使用apache线程池),当同时有多个producer并发发送时,依然会造成数据丢失。sleep后有好转,但仍然丢失。

    使用老producer,且compression.codec不为snappy时,不会造成数据丢失。使用线程池也不会丢失。

    7.性能测试

    kafka 10 性能测试

    kafka自带的性能测试工具,位于bin/kafka-producer-perf-test.sh。

    8.生产端发送堵塞

    调整producer缓冲区大小 queue.buffering.max.messages

    增加通道数量:多建几个producer,使用连接池管理producer

    producer使用线程池

    buffer.memory设置的缓存是针对每个producerThread

    针对每个producerThread,不应设置高,以免影响内存

    线程池中线程数量如何设置?

    监视剩余线程数据,进行动态调整,并针对可能出现的峰值预留一定的线程。

    使用tryAcquire()还是acquire()??阻塞或放弃消息??

    使用apache的线程池即可,设置阻塞时的等待时间,超过后则抛出异常。

    是否对线程池容量进行动态调整?

    使用apache的线程池即可。

    线程池最大线程数100,启用50个thread同时发送日志,报错:

    kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(test12,null,null,........

    报错原因为生产速度大于发送速度(网络传输等决定),可设置继续等待时间,超过此时间后丢弃消息;或设置一直阻塞,排队等待消息发送完毕(会造成线程死锁)。

    9、kafka中处理超大消息的一些考虑

    Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?

    针对这个问题,有以下几个建议:

      最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。

      第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。

    第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

    不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:

    broker 配置:

        message.max.bytes(默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。

        log.segment.bytes(默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。

    replica.fetch.max.bytes(默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

    Consumer 配置:

    fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。

    所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

    性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。

    可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数*最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数*最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。

    垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。

    相关文章

      网友评论

      本文标题:kafka性能优化

      本文链接:https://www.haomeiwen.com/subject/izsdrftx.html