美文网首页
Kafka broker 调优

Kafka broker 调优

作者: 背锅填坑交给我 | 来源:发表于2018-09-28 10:35 被阅读2729次

    这些优化可以去官网看,官网有说明

    Kafka 的架构这里就不多做介绍了,直接步入正题。

    这里主要是 Kafka 集群基本配置的相关内容。

    硬件要求

    Kafka 集群基本硬件的保证

    集群规模 内存 CPU 存储
    Kafka Brokers 3+ 24GB+(小规模);64GB+(大规模) 多核(12CPU+),并允许超线程 6+ 1TB 的专属磁盘(RAID 或 JBOD)
    Zookeeper 3(小规模);5(大规模) 8GB+(小规模);24GB+(大规模) 2核+ SSD 用于中间的日志传输

    OS 调优

    可能不需要太多操作系统级别的调优,但有一些重要的操作系统级别配置:

    • 文件句柄数 限制:100k+;
    • 最大socket缓冲区大小:可以像这里描述的那样增加数据中心之间的高性能数据传输。
    • JVM 配置
      1. JDK 8 并且使用 G1 垃圾收集器;
      2. 至少要分配 6-8 GB 的堆内存。
        优化成G1GC,堆内存6-8G

    broker_java_opts -server -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true

    mirror_maker_max_heap_size 8G
    broker_max_heap_size 6G

    Kafka 磁盘存储

    • 使用多块磁盘,并配置为 Kafka 专用的磁盘,确保良好的延迟;
    • kafka本身有备份机制,使用RAID在负载和性能方面并没有很大提升,所以RAID没什么必要。
    • 文件系统:使用 EXT 或 XFS;

    Broker 配置

    Broker 级别有几个比较重要的配置,一般需要根据实际情况进行相应配置的:

    • 基础配置
      default.replication.factor: 3 // 全局默认副本数
      auto.create.topics.enable: true // 默认true 是否开启自动创建topic的功能
      delete.topic.enable: true // 默认true 是否开启删除topic的功能
      num.partitions: 27 // partition 数量

    • 副本
      Partition 有两种副本:Leader,Follower;
      Leader 负责维护 in-sync-replicas(ISR)
      replica.lag.time.max.ms:10000
      默认10s,如果在这个时间内,flower没有同步leader数据,则把flower从ISR剔除;按照flower和leader之间相差消息数的参数(replica.lag.max.messages was removed)在0.9.0版本已经被废弃。
      num.replica.fetchers: 6 //副本之间同步数据的线程数,默认1
      min.insync.replicas: 1 //这个参数设定ISR中的最小副本数是多少,默认值为1
      如果同步状态的副本小于该值,服务器将不再接受request.required.acks为-1或all的写入请求,即write服务挂了。
      unclean.leader.election.enable: flase //默认为flase, 不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。
      如果关闭,leader只能从ISR中选举;如果开启,在ISR集合为空时,选择任一副本作为leader,这样做可能会导致数据丢失。

    • 线程优化

    num.io.threads: 8 // 集群用作处理磁盘io的线程数,默认为8,建议cpu的2倍
    num.recovery.threads.per.data.dir: 1 //默认值为1,建议调整为log.dirs中目录的个数?每个数据目录的线程数,被用作集群启动恢复数据,集群停止事刷新数据

    • 日志留存策略

    默认日志留存策略是delete留存策略,可以按时间、空间、偏移量三个维度delete

    log.retention.hours: 168 //日志保留时间,过期删除 默认7天
    log.retention.check.interval.ms: 300000 //检查日志是否需要清除,是否过期, 默认5分钟

    • 数据文件刷新策略
      log.flush.interval.ms: 10000 //数据刷磁盘策略 按照时间 10s
      log.flush.interval.messages: 20000 //数据刷磁盘策略 按照按照数据条数 2w条
      log.flush.scheduler.interval.ms: 2000 //检查日志是否需要刷新到磁盘 2s

    log.segment.bytes: 1073741824 //单个日志文件的最大值 默认1G
    log.dirs: /data/1/kafka,/data/2/kafka,/data/3/kafka,/data/4/kafka,/data/5/kafka,/data/6/kafka //日志存放目录

    • Balancing leadership

    auto.leader.rebalance.enable: true // 默认true,如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership。

    leader.imbalance.per.broker.percentage: 10 // 默认值10 每个broker允许的不平衡的leader的百分比, 即leader均匀的分配在每个broker上 。如果每个broker超过了这个百分比,复制控制器会重新平衡leadership。

    leader.imbalance.check.interval.seconds: 300 // 默认5分钟 检查leader不平衡的时间间隔。

    Kafka replica 相关配置及监控

    Under Replicated Partitions

    当发现 replica 的配置与集群的不同时,一般情况都是集群上的 replica 少于配置数时,可以从以下几个角度来排查问题:

    • JMX 监控项:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions;
    • 可能的原因:
      • Broker 挂了?
      • Controller 的问题?
      • ZooKeeper 的问题?
      • Network 的问题?
    • 解决办法:
      • 调整 ISR 的设置;
      • Broker 扩容。

    Controller

    • 负责管理 partition 生命周期;
    • 避免 Controller’s ZK 会话超时:
      • ISR 抖动;
      • ZK Server 性能问题;
      • Broker 长时间的 GC;
      • 网络 IO 问题;
    • 监控:
      • kafka.controller:type=KafkaController,name=ActiveControllerCount,应该为1;
      • LeaderElectionRate。

    Unclean leader 选举

    允许不在 isr 中 replica 被选举为 leader。

    • 这是 Availability 和 Correctness 之间选择,Kafka 默认选择了可用性;
    • unclean.leader.election.enable:默认为 true,即允许不在 isr 中 replica 选为 leader,这个配置可以全局配置,也可以在 topic 级别配置;

    不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。
    如果关闭,leader只能从ISR中选举;如果开启,在ISR集合为空时,选择任一副本作为leader,这样做可能会导致数据丢失。

    • 监控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec。

    Kafka 相关资源的评估

    集群评估

    • Broker 评估
      • 每个 Broker 的 Partition 数不应该超过2k;
      • 控制 partition 大小(不要超过25GB);
    • 集群评估(Broker 的数量根据以下条件配置)
      • 数据保留时间;
      • 集群的流量大小;
    • 集群扩容:
      • 磁盘使用率应该在 60% 以下;
      • 网络使用率应该在 75% 以下;
    • 集群监控
      • 保持负载均衡;
      • 确保 topic 的 partition 均匀分布在所有 Broker 上;
      • 确保集群的阶段没有耗尽磁盘或带宽。

    kafka集群服务器监控

    Kafka 集群需要监控的一些指标,这些指标反应了集群的健康度。

    • CPU 负载;
    • Network Metrics;
    • File Handle 使用;
    • 磁盘空间;
    • 磁盘 IO 性能;
    • GC 信息;
    • ZooKeeper 监控。

    Broker 监控

    • Partition 数:kafka.server:type=ReplicaManager,name=PartitionCount;
    • Leader 副本数:kafka.server:type=ReplicaManager,name=LeaderCount;
    • ISR 扩容/缩容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec;
    • 读写速率:Message in rate/Byte in rate/Byte out rate;
    • 网络请求的平均空闲率:NetworkProcessorAvgIdlePercent;
    • 请求处理平均空闲率:RequestHandlerAvgIdlePercent。

    Topic 评估

    • partition 数
      • Partition 数应该至少与最大 consumer group 中 consumer 线程数一致;
      • 对于使用频繁的 topic,应该设置更多的 partition;
      • 控制 partition 的大小(25GB 左右);
      • 考虑应用未来的增长(可以使用一种机制进行自动扩容);
    • 使用带 key 的 topic;
    • partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)。

    合理地设置 partition

    • 根据吞吐量的要求设置 partition 数:
      • 假设 Producer 单 partition 的吞吐量为 P;
      • consumer 消费一个 partition 的吞吐量为 C;
      • 而要求的吞吐量为 T;
      • 那么 partition 数至少应该大于 T/P、T/c 的最大值;
    • 更多的 partition,意味着:
      • 更多的 fd;
      • 可能增加 Unavailability(可能会增加不可用的时间);
      • 可能增加端到端的延迟;
      • client 端将会使用更多的内存。

    关于 Partition 的设置可以参考这篇文章How to choose the number of topics/partitions in a Kafka cluster?,这里简单讲述一下,Partition 的增加将会带来以下几个优点和缺点:

    1. 增加吞吐量:对于 consumer 来说,一个 partition 只能被一个 consumer 线程所消费,适当增加 partition 数,可以增加 consumer 的并发,进而增加系统的吞吐量;
    2. 需要更多的 fd:对于每一个 segment,在 broker 都会有一个对应的 index 和实际数据文件,而对于 Kafka Broker,它将会对于每个 segment 每个 index 和数据文件都会打开相应的 file handle(可以理解为 fd),因此,partition 越多,将会带来更多的 fd;
    3. 可能会增加数据不可用性(主要是指增加不可用时间):主要是指 broker 宕机的情况,越多的 partition 将会意味着越多的 partition 需要 leader 选举(leader 在宕机这台 broker 的 partition 需要重新选举),特别是如果刚好 controller 宕机,重新选举的 controller 将会首先读取所有 partition 的 metadata,然后才进行相应的 leader 选举,这将会带来更大不可用时间;
    4. 可能增加 End-to-end 延迟:一条消息只有其被同步到 isr 的所有 broker 上后,才能被消费,partition 越多,不同节点之间同步就越多,这可能会带来毫秒级甚至数十毫秒级的延迟;
    5. Client 将会需要更多的内存:Producer 和 Consumer 都会按照 partition 去缓存数据,每个 partition 都会带来数十 KB 的消耗,partition 越多, Client 将会占用更多的内存。

    Producer 的相关配置、性能调优及监控

    Quotas

    • 避免被恶意 Client 攻击,保证 SLA;
    • 设置 produce 和 fetch 请求的字节速率阈值;
    • 可以应用在 user、client-id、或者 user 和 client-id groups;
    • Broker 端的 metrics 监控:throttle-rate、byte-rate;
    • replica.fetch.response.max.bytes:用于限制 replica 拉取请求的内存使用;
    • 进行数据迁移时限制贷款的使用,kafka-reassign-partitions.sh -- -throttle option

    Kafka Producer

    • 使用 Java 版的 Client;
    • 使用 kafka-producer-perf-test.sh 测试你的环境;
    • 设置内存、CPU、batch 压缩;
      • batch.size:该值设置越大,吞吐越大,但延迟也会越大;
      • linger.ms:表示 batch 的超时时间,该值越大,吞吐越大、但延迟也会越大;
      • max.in.flight.requests.per.connection:默认为5,表示 client 在 blocking 之前向单个连接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;
      • compression.type:压缩设置,会提高吞吐量;
      • acks:数据 durability 的设置;
    • 避免大消息
      • 会使用更多的内存;
      • 降低 Broker 的处理速度;

    性能调优

    • 如果吞吐量小于网络带宽
      • 增加线程;
      • 提高 batch.size;
      • 增加更多 producer 实例;
      • 增加 partition 数;
    • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
    • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

    Prodcuer 监控

    • batch-size-avg
    • compression-rate-avg
    • waiting-threads
    • buffer-available-bytes
    • record-queue-time-max
    • record-send-rate
    • records-per-request-avg

    Kafka Consumer 配置、性能调优及监控

    Kafka Consumer

    • 使用 kafka-consumer-perf-test.sh 测试环境;
    • 吞吐量问题:
      • partition 数太少;
      • OS page cache:分配足够的内存来缓存数据;
      • 应用的处理逻辑;
    • offset topic(__consumer_offsets
      • offsets.topic.replication.factor:默认为3;
      • offsets.retention.minutes:默认为1440,即 1day;
        – MonitorISR,topicsize;
    • offset commit较慢:异步 commit 或 手动 commit。

    Consumer 配置

    • fetch.min.bytesfetch.max.wait.ms
    • max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance;
    • max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500;
    • session.timeout.ms
    • Consumer Rebalance
      – check timeouts
      – check processing times/logic
      – GC Issues
    • 网络配置;

    Consumer 监控

    consumer 是否跟得上数据的发送速度。

    • Consumer Lag:consumer offset 与 the end of log(partition 可以消费的最大 offset) 的差值;
    • 监控
      • metric 监控:records-lag-max;
      • 通过 bin/kafka-consumer-groups.sh 查看;
      • 用于 consumer 监控的 LinkedIn’s Burrow;
    • 减少 Lag
      • 分析 consumer:是 GC 问题还是 Consumer hang 住了;
      • 增加 Consumer 的线程;
      • 增加分区数和 consumer 线程;

    相关文章

      网友评论

          本文标题:Kafka broker 调优

          本文链接:https://www.haomeiwen.com/subject/skyuoftx.html