美文网首页图解Kafka
KafkaProducer(1) 注释说明

KafkaProducer(1) 注释说明

作者: _孙行者_ | 来源:发表于2020-09-09 09:48 被阅读0次
  • 客户端将 records 发送到 Kafka 集群中

  • producer 是线程安全的 , 可以在多线程中共享 producer.

  • producer 持有一个缓冲池( bufferPool ) , 需要发送的 records , 还未发送到集群时 , 保存在缓冲池中 . 会有一个后台线程 ( sender ) 将这些 records 发送到 Kafka 集群.

  • send() 方法是异步的 . 调用 send() 方法后 , 将 record 添加到缓冲池中 , 并立即返回结果 ( 结果中带有 future 对象) . 因此 producer 可以将一批 record 添加到 bufferPool 中 , 提高发送效率 .

  • acks 配置用来控制请求是否完整 . 当设置为 all时 , 提交将会被完全阻塞 . 是最慢的 , 等待时间最长的设置.

  • 如果请求失败了 , producer 会自动重试 . retries 配置为 0 时 , 则不会重试 . 如果启用了重试 , 那么就会有重复发送 record 的可能 .

  • producer 通过缓冲池管理着每个 partition 上未发送的 records . 这个缓冲池的大小 , 通过batch.size配置指定 . 设置比较大的值 , 一次可以发送更多的数据 , 同样的 , 也需要更多的内存 .

  • 默认情况下 , 当缓冲区 ( buffer ) 中有 record 要发送时 , 会立即发送 . 就算 buffer 还没填满 , 也会发送 . 当然也可以通过linger.ms配置 , 当配置大于 0时 , producer 会等待指定的时间 , 这样 , 会有更多的 record 填充到相同的 buffer 中 . 类似于 TCP 协议中的 Nagle (纳格尔) 算法. 等待同样会带来发送的延迟 . 需要注意的是 , 当多个 record 同时达到时 , 此时就算linger.ms=0 , 也会汇集多个 record 后再发送 . 因此 , 在高负载下 , 无论 linger.ms 是否配置 , 都会是批量发送 . 但是不在高负载的场景下 , 设置linger.ms > 0 , 会减少请求的次数 , 请求更有效 .

  • buffer.memory 配置的是 producer 总共可用的缓冲池的大小 . 如果 写缓存 ( send()方法 ) 的速度过快 , 大于发送数据的速度 , 那么缓冲池最终会被耗尽 , 后续的send() 请求都将被阻塞 . 这个阻塞会持续 max.block.ms的时长 , 超时还没写到缓存里 , 会抛出 TimeoutException .

  • key.serializervalue.serializer 决定了怎样将 ProducerRecord 对象转化为 bytes (字节) .

  • 从 Kafka 0.11 版本开始 , KafkaProducer 将支持两种模式 : 幂等的生产者 ( idempotent producer ) 和 事务的生产者( transactional producer ) . 幂等生产者将Kafka的语义从至少一次增强到恰好一次 . 特别是 producer 在重试是 , 也不会产生重复数据 . 事务生产者原子性的发送多个 partitions ( 甚至多个 topics ) .

  • 要启用幂等 , 设置配置 enable.idempotence = true , 这么设置后 , retries 的重试次数默认为 Integer.MAX_VALUE , 即为无限次 . acks 也默认为 all . 其他没有 API 使用上的变化 , 当前已经有应用在不修改代码的情况下 , 也可以直接使用 幂等生产者 .

  • 使用幂等的话 , 一定要避免应用程序级别的重复发送 , 因为这些重复发送不能消除 . ( kafka 只能在同一个 partition 上实现顺序和幂等 , 多个服务器上发送相同的数据 , 可能会在不同的 partition 上 . 那么不同的 partition 上就没办法保证幂等 , 且重复数据没办法识别和删除 ) . 已经启用了幂等的话 , 建议不要修改 retries配置 , 默认就使用无限次 ( Integer.MAX_VALUE ) . 如果 send() 方法返回 error , 甚至重试无数次后还是 error . 建议停掉 producer , 检查下最后一次重试的内容 , 确保不是重复内容 ( not duplicated ) . 最后 , 幂等只能保证在同一个 session 里保证幂等 . 重新建立的 kafka session 就不行了.

  • 要使用事务生产者及其相关的一些 API , 必须要设置 transactional.id . 启用了事务 , 那么幂等会自动启用 , 因为事务要依赖于幂等 . 已经设置了 transactional.id , 那么事务中的 topics 需要设置为持久化 . 还有 , replication.factor 要至少设置为 3 , min.insync.replicas 对于事务中的 topics 要设置为 2 , 最后 , 为了确保端到端的事务一致性 , 消费者 ( consumers ) 必须要配置为只读取已经事务提交的 message .

  • 设置transactional.id 的目的是为了 在一个 producer 实例中跨多人 session 的情况下回滚事务的场景 . 对于 partition 上的每个 producer 实例 , transactional.id 应该是唯一的 .

  • 新的事务操作相关 API 都是阻塞式的 , 并且发生失败时都会抛出异常 .

//事务发送的简单使用示例
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();
  • 每个 producer 实例 , 每次只能开启一个事务 , 在 beginTransaction()commitTransaction() 中间发送的所有消息 , 会在同一个事务中 .

  • 事务生产者是使用异常来传递错误的状态 , 特别的 , 不能在 producer.send() 方法里使用 callback 变量 , 也不能在 返回的 future 中调用 future.get() . 事务在执行过程中产生的任务错误, 都会抛出 KafkaException 异常 .

  • 在收到 KafkaException 异常后 , 可以调用 producer.abortTransaction() 来结束并回滚事务 . 来保证所有已经执行的回滚掉 .

  • 从 0.10.0 或更高版本开始 , kafkaProducer 可以和 brokers 进行连接通信 . 要使用 事务特性 , broker 的版本至少要是 0.11.0 或更高 . 否则将返回一个UnsupportedVersionException .


总结

  • 在注释中着重提到了几个配置:
    • acks : 控制请求的结果
    • retries : 重试的次数 , 不启用幂等的话 , 可能会有重复数据
    • batch.size : 指定每个缓存 buffer 的大小 . 默认是 16 K
    • linger.ms : 批量执行的等待时长 , 默认0 , 即不等待 .
    • buffer.memory : 缓存池 (bufferPool) 的最大可用内存 . 默认 32M
    • max.block.ms : 在缓存不够用时 , 等待释放缓存的最大等待时长 .
    • enable.idempotence : 启用幂等 , 使 Kafka 从至少一次 , 增强到 恰好一次 .
    • transactional.id : 启用事务 , 指定事务实例的ID

如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

相关文章

  • KafkaProducer(1) 注释说明

    客户端将 records 发送到 Kafka 集群中 producer 是线程安全的 , 可以在多线程中共享 pr...

  • Kafka系列之(5)——Kafka Producer源码解析

    KafkaProducer源码解析 KafkaProducer使用示例 (1)、KafkaProducer的sen...

  • 注释说明-vue项目

    注释说明: 一. data属性说明: 【1】、单行属性,注释写在行尾good: bad: 【2】、多行属性,注释...

  • 消息队列之Kafka-生产者

    1、发送模式 KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例...

  • java基础第二天

    1.注释:标注说明 (1)单行注释:只对一行有效 //注释内容 (2)多行注释:对多行有效 /* *注释内容 */...

  • Java学习 Day2

    1.注释:对我们写的代码进行解释说明。 (1)单行注释:// 。对一行代码进行说明。用在方法中。 (2)多行注释:...

  • java的基本语法

    1.注释:对我们写的代码进行解释说明。 (1)单行注释:// 。对一行代码进行说明。用在方法中。 (2)多行注释:...

  • JAVA基础知识

    1.注释,关键字,标识符1.注释(1)注释:解释说明程序的而文字。(2)注释的分类:单行注释 格式: //注释的文...

  • iOS代码规范与结构

    代码规范 1. h文件代码 类功能说明注释。属性说明注释。方法说明注释。类名加前缀,避免命名空间冲突. 2. m文...

  • NO.4 Java的基础语法

    1、注释: 用于解释说明程序的文字 , 分类格式:单行注释(//注释文字可以嵌套),多行注释(/* 注释文...

网友评论

    本文标题:KafkaProducer(1) 注释说明

    本文链接:https://www.haomeiwen.com/subject/bekjektx.html