美文网首页
kafka配置

kafka配置

作者: williamlee | 来源:发表于2020-04-26 19:32 被阅读0次

    producer


    bootstrap.servers

    kafka server地址,设置一个节点虽然仍然可以拿到集群信息但是避免当前节点挂掉,建议设置多个.

    建议: 当设置一个host时只要这个host不挂是不影响使用的,但是为了高可用建议把所有server都配置上.

    batch.size

    批量发送到服务端的字节数大小,批量发送可以提高整体的吞吐量,但是如果设置过大会导致内存浪费.

    根据应用实际情况设置,如果要求高吞吐量建议开启.默认关闭.

    acks

    server端返回ack的个数.

    • =0那么不需要任何确认,添加到socket buffer中后就当作成功处理,这时retries设置就会无效了因为默认就发送成功不存在重试情况;
    • =1那么只需要该partition的master确认后才算发送成功.存在的问题是当写入master后并未同步到ISR中的follower那么该条消息会丢失;
    • =ALL || =-1需要ISR中的所有节点都确认收到才算发送成功.安全级别最高;

    默认值=1master确认后才算发送成功,满足大部分需求,如果消息量不大或者对消息准确性要求高建议=ALL

    linger.ms

    发送消息的延迟时间,比如设置5ms那么消息会逗留5ms来等待更多消息批量发送,和上面batch.size组合使用,当满足batch.sizelinger.ms条件之一就会发送,默认值是0,未设置的情况下消息直接发送出去了.

    默认值=0,直接发送.根据应用实际情况调节.收益类似batch.size.

    client.id

    应用标识,消息溯源时可以使用,设置后server端的log中会同时记录,host/ip/clientid.

    建议添加该配置.

    send.buffer.bytes

    设置tcp send buffer,如果设置为-1则使用操作系统默认值.建议跟随系统默认值,错误设置会导致TCP协议层出现问题.

    默认128K,原理不清楚不建议调整.

    receive.buffer.bytes

    设置tcp receive buffer,如果设置为-1则使用操作系统默认值.建议跟随系统默认值,错误设置会导致TCP协议层出现问题.

    默认32K,原理不清楚不建议调整.

    max.request.size

    单次请求最大字节数的限制,影响batch.size配置,另外这个参数服务端也有自己的配置,存在服务端与客户端不同的情况.

    默认1Mb, 根据应用实际情况调整.如果开启批量可能需要调整此参数.

    reconnect.backoff.ms

    连接在重试之前等待时间,避免短时间频繁的连接服务端.

    reconnect.backoff.max.ms

    重试连接的最大等待时间,和上一个参数相关联,如果设置了这个参数,那么当连接失败后时间会成指数增加避免connection storm,比如第一次失败,重连等待1s,又失败了重连等待3s(只是举例,具体值不是这个).

    max.block.ms

    当缓冲区满了或者metadata获取不到时,调用KafkaProducer.send()KafkaProducer.partitionsFor()阻塞的最大时间.

    默认50ms,不建议调整.

    buffer.memory

    设置缓冲区大小,如果消息产生速度大于发送到broker的速度,缓冲区用量就会增加如果满了就会阻塞max.block.ms设置的时间,阻塞后缓冲区仍然是满的将会抛异常.

    默认32Mb,可以通过线上JMX MBean观察,如果消息量比较小一般用不了这么大.

    retry.backoff.ms

    设置请求失败重试延后时间,同样避免短时内大量重试的问题.

    默认100ms,不建议修改

    compression.type

    压缩类型,默认没有压缩.提供none, gzip, snappy, lz4压缩算法.

    默认无压缩,消息量大建议压缩.节省空间,节省网络传输(带宽是稀缺资源).留心压缩带来的CPU消耗.

    metrics.sample.window.ms

    指标计算的时间窗口

    metrics.num.samples

    计算指标抽样的数量

    metrics.recording.level

    记录指标的级别

    metric.reporters

    设置reporter类型,需要实现org.apache.kafka.common.metrics.MetricsReporter,如果有事件发生则会触发对应的调用.

    max.in.flight.requests.per.connection

    一个connection上未ack的请求数,如果这个值大于1,重试时就会导致顺序是乱的.

    retries

    当发送错误时重试次数,max.in.flight.requests.per.connection与这个参数有关系,如果max.in.flight.requests.per.connection大于1则存在乱序的可能.举例:两批消息,第一批发送失败,第二批发送成功,第一批重试.那么第二批反而在前面.

    key.serializer

    序列化方式,需要实现org.apache.kafka.common.serialization.Serializer接口.

    value.serializer

    序列化方式,需要实现org.apache.kafka.common.serialization.Serializer接口.

    connections.max.idle.ms

    空闲多久之后会关闭这个connection.

    partitioner.class

    需要实现org.apache.kafka.clients.producer.Partitioner这个接口,作topic partition的路由规则.

    request.timeout.ms

    针对请求的响应时间超时时间设置,如果超过了这个时间则进行重试,如果重试次数到了上限则失败.该参数的值应该大于broker端的配置replica.lag.time.max.ms, 避免因为数据备份导致的重试.

    interceptor.classes

    拦截器配置,需要实现org.apache.kafka.clients.producer.ProducerInterceptor这个接口,默认无.

    enable.idempotence

    开启幂等,新功能0.11版本引入的.如果设置为true则保证每条消息只被写入一次.并且要求max.in.flight.requests.per.connection<=5,retries>0,acks=all.如果这些参数没有明确设置那么将自适应,如果配置了但是参数之间不兼容则会抛ConfigException异常. 幂等只对单partition生效.

    transaction.timeout.ms

    transaction coordinator等待事务状态更新的超时时间,如果改值比broker中的transaction.max.timeout.ms大,那么请求将发生InvalidTransactionTimeout错误.

    transactional.id

    事务ID,需要enable.idempotence=true,如果未设置则不开启事务功能

    使用producerkafka时建议都添加callback,即便什么都不做也要打印行日志.可以参考:org.apache.kafka.clients.producer.internals.ErrorLoggingCallback


    Consumer


    group.id

    标识当前消费者属于哪个消费组

    建议配置,管理offset的必要参数

    max.poll.records

    调用一次poll()返回的记录数量

    建议设置,和其他参数有关联,如果单次拉取条数过多导致消费时间过长未commit,可能会导致该consumer被判定为down掉了,从而触发rebalance后其他consumer会重复消费.

    max.poll.interval.ms

    调用poll()方法的最大间隔时间,如果在间隔时间内没有调用poll()那么这个consumer被判定为失败,触发rebalance.

    session.timeout.ms

    该参数用来判定consumer是否失效,consumer会周期性的发送心跳到broker,如果在这个配置的周期内并未发送心跳那么判定该consumer失效,触发rebalance.

    这个参数配置一定要在broker两个配置数值之间group.min.session.timeout.msgroup.max.session.timeout.ms.

    heartbeat.interval.ms

    两次心跳的间隔时间,心跳的作用有两方面:

    1. 确保kafka知道当前的consumer是存活的
    2. 当consumer group发生变化时(新增或者减少),能确保触发rebalance

    建议配置的值要小于session.timeout.ms/3

    bootstrap.servers

    kafka server地址,设置一个节点虽然仍然可以拿到集群信息但是避免当前节点挂掉,建议设置多个.

    建议: 当设置一个host时只要这个host不挂是不影响使用的,但是为了高可用建议把所有server都配置上.

    enable.auto.commit

    如果设置成true,那么将异步周期性的提交offset

    尽量设置成false,由自动变手动更加能控制消息的消费情况.

    auto.commit.interval.ms

    自动提交的周期时间. 开启enable.auto.commit后有效

    partition.assignment.strategy

    partition的分配关系,也就是确定partition与consumer之间关系的方式

    默认org.apache.kafka.clients.consumer.RangeAssignor

    auto.offset.reset

    offset重置的规则,如果新的topic kafka没有对应的offset信息,或者当前的offset无效了(历史数据被删除),那么需要指定新的offset是什么,提供几种类型:

    1. earliest: 队列中能找到的最早的offset
    2. latest: 加入时最新的offset
    3. none: 找不到就报错
    4. anything else: 直接报错

    默认latest

    fetch.min.bytes

    fetch一次返回的最小字节数,如果不够这个字节数就等待.

    默认值为1字节.调大这个参数可以增加server端的吞吐量

    fetch.max.bytes

    fetch一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证consumer的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到broker的message.max.bytes限制,以及topic的max.message.bytes的限制.

    默认值是50Mb,

    fetch.max.wait.ms

    如果没达到fetch.min.bytes配置的值,fetch请求阻塞的最长时间

    默认500ms

    metadata.max.age.ms

    周期性的拉取metadata,即便服务端没发生变化

    默认3分钟

    max.partition.fetch.bytes

    fetch.max.bytes类似,只不过这个是限制单partition

    默认值1Mb

    send.buffer.bytes/receive.buffer.bytes/client.id/reconnect.backoff.ms/reconnect.backoff.max.ms/retry.backoff.ms/metrics.sample.window.ms/metrics.num.samples/metrics.recording.level/metric.reporters

    同producer

    check.crcs

    使用crc32的方式校验消息是否准确,避免磁盘等其他原因导致的消息错误,该功能有一定的性能损失,追求极致的性能需要关闭

    默认true

    key.deserializer/value.deserializer

    同producer

    request.timeout.ms

    同producer

    default.api.timeout.ms

    指定consumer所有操作的超时时间

    interceptor.classes

    consumer拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor .

    默认为空

    exclude.internal.topics

    是否排除kafka内部队列,比如offset队列.

    默认值true

    internal.leave.group.on.close

    当consumer关闭时是否离开group, 如果设置为false,则不离开组直到时间超过session.timeout.ms导致rebalance.

    isolation.level

    消息的隔离级别,类似mysql, 如果设置的是read_uncommitted那么调用consumer.poll()将读取到所有消息,如果设置了read_committed那么调用consumer.poll()将只能读取到已经commit的消息.

    使用kafkaconsumer时建议commit offset的操作都修改成手动提交,来控制消息消费的情况.

    相关文章

      网友评论

          本文标题:kafka配置

          本文链接:https://www.haomeiwen.com/subject/prgjwhtx.html