一个正常的生产逻辑需要具备以下几个步骤:
(1) 配置生产者客户端参数及创建相应的生产者实例。
(2) 构建待发送的消息。
(3) 发送消息。
(4) 关闭生产者实例。
必要的参数配置:
(1)bootstrap.servers:broker地址,逗号隔开。
(2)key.serializer和value.serializer:序列化器。
发送消息主要有三种模式:
(1)发后即忘(fire-and-forget):它只管往Kafka 中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能高,可靠性也最差。
代码上直接send,不传递callback,也不调用future的get方法解开。
(2)同步(sync):代码上调用完send,通过返回的future调用get方法即可同步,也可以设置超时阻塞。
(3)异步(async ):代码上调用send的时候,传递callback。
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
- 常见的可重试异常有: NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException等。
- 不可重试的异常,如RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer 对此不会进行
任何重试,直接抛出异常。
对于同一个分区而言,如果消息record1于record2之前先发送,那么KafkaProducer就可以保证对应的callback1在callback2之前调用,也就是说,回调函数的调用也可以保证分区有序。
KafkaProducer 的close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。与此同时,KafkaProducer 还提供了一个带超时时间的close()方法。如果调用了带超时时间timeout 的close()方法,那么只会在等待timeout时间内来完成所有尚未完成的请求处理,然后强行退出。在实际应用中,一般使用的都是无参的c lose()方法。
序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。除了用于String类型的序列化器, 还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口。
如果Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现, 或者使用自定义类型的序列化器来实现。
分区器
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker 。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器
的作用。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算par tition的值。分区器的作用就是为消息分配分区。
默认分区器DefaultPartitioner的实现,如果key不为null,那么默认的分区器会对key进行哈希,最终根据得到的哈希值来计算分区号,拥有相同key 的消息会被写入同一个分区。如果key为null ,那么消息将会以轮询的方式发往主题内的各个可用分区。
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
自定义实现ProducerInterceptor接口即可。
KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行。这个方法运行在Producer的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照interceptor.classes参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。
生产者客户端的整体架构
image.png
- 整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Ka fka中。
- RecordAccumulator主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为33554432B ,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
- 主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch ,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;sender读取消息时,从双端队列的头部读取。
个人理解:一个主题下面有多个分区,每个分区对应一个双端队列,队列里存了多个ProducerBatch。一个ProducerBatch代表一个消息批次,里面包含一个或多个ProducerRecord。通过将多个ProducerRecord拼凑成一个ProducerBatch的方式来减少网络次数提高吞吐量。 - ProducerBatch的大小和batch.size 参数有着密切的关系。当一条消息流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列,再从这个双端队列的尾部获取一个ProducerBatch,查看ProducerBatch中是否
还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。 - Sender 从RecordAccumulator中获取缓存的消息之后,会进一步将原<分区, Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。然后再进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。
- 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<Nodeld, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(Nodeld是一个String类型,表示节点的id编号)。
与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了。通过比较Deque<Request>的size 与这个参数的大小来判断对应的Node 中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据
new ProducerRecord<>(topic,”Hello, Kafka !”) ;
- 代码中我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。
- 我们了解了bootstrap.servers 参数只需要配置部分broker节点的地址即可,不需要配置所有broker 节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。
- 当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端该超时参数默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
- leastLoadedNode:从前文提及的InFlightRequests中获得,即所有Node中负载最小的那一个。
重要的生产者参数
(1)acks:这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消
息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐
量之间的权衡。
acks = “1”。默认值也是1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。但是,如果消息写入leader 副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader 副本中并没有这条对应的消息。
acks = “0”。生产者发送消息之后不需要等待任何服务端的响应。这个设置的吞吐量最高,但也最不可靠。
acks = “-1”或acks = “all”。生产者在消息发送之后,需要等待ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。这个设置有最高的可靠性。但是如果ISR只有leader副本一个,会跟acks = 1一样有同样的风险,要获得更高的消息可靠性需要配合min.insync.replicas等参数的联动。
注意acks参数配置的值是一个字符串类型,而不是整数类型。
(2)max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B ,即1MB。不建议修改此值。
(3)retries 和retry.backoff.ms
- retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。
- 重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
- Kafka可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用来说,顺序性非常重要,比如MySQL 的binlog传输,如果出现错误就会造成非常严重的后果。如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,那么就会出现错序的现象: 如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。
(4)compression.type
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”、“snappy”和“lz4”。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。
(5)connections.max.idle.ms
用来指定在多久之后关闭闲置的连接,默认值是540000,即9分钟。
(6)linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多消息加入ProducerBatch的时间,默认值为0。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
(7)request.timeout.ms
用来配置Producer等待请求响应的最长时间,默认值为30000ms。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
kafka常见的分区策略:
- Round-Robin,轮询,默认用这个。
- 随机策略。
- 消息键策略,用key进行哈希确定分区。
切记分区是实现负载均衡以及高吞吐量的关键
如何实现kafka消息的顺序性?
- 最简单的就是指定只使用单分区。
- 在个别业务允许的情况下,比如本身消息的顺序性只需按类型进行排序,可以通过类型作为key来进行分区。
image.png
image.png
image.png
image.png
image.png
关于kafka消息丢失:
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。
第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署Kafka Broker 服务器了。
现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。
总结一下,Kafka 是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要满足一定的条件。当然,说明这件事并不是要为 Kafka 推卸责任,而是为了在出现该类问题时我们能够明确责任边界
“消息丢失”案例
Producer 程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。我来描述一个场景:你写了一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存,于是大骂:“Kafka 真烂,消息发送居然都能丢失,而且还不告诉我?!”如果你有过这样的经历,那么请先消消气,我们来分析下可能的原因。
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg)这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。
这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire andforget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。
可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
不过,就算不是 Kafka 的“锅”,我们也要解决这个问题吧。实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
案例 2:消费者程序丢失数据
比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。
这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。
正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。
同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。
当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。在专栏后面的内容中,我会跟你分享如何应对重复消费的问题。除了上面所说的场景,其实还存在一种比较隐蔽的消息丢失场景。
为了加快阅读速度,你把书中的 10 个章节分别委托给你的 10 个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这 10 个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这 10 个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。
对于 Kafka 而言,这就好比 Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。
这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。在这里我要提醒你一下,单个Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
消息不丢失的最佳实践:
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。即能得到发送结果进而处理发送失败重新发送。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas +1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
注意第2条跟第6条并不冲突:其如果ISR中只有1个副本了,acks=all也就相当于acks=1了,引入min.insync.replicas的目的就是为了做一个下限的限制:不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas。(ISR就是保持跟leader分区同步的分区集合,acks=all表示要所有ISR收到消息才算已提交,min.insync.replicas表示ISR的数量要求多少个,这里要求必须大于1,如果等于1的话,即当ISR只有1个时,还是会有单点风险)
其实,Kafka 还有一种特别隐秘的消息丢失场景:增加主题分区。当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer 先于 Consumer 感知到新增加的分区,而Consumer 设置的是“从最新位移处”开始读取消息,因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法读取到这些消息。严格来说这是 Kafka 设计上的一个小缺陷,你有什么解决的办法吗?
这个问题,难道新增分区之后,producer先感知并发送数据,消费者后感知,消费者的offset会定位到新分区的最后一条消息?消费者没有提交offset怎么会从最后一条开始的呢?老师说:如果你配置了auto.offset.reset=latest就会这样的。所以把这个参数改成读最早的就好了?
你也许会问:怎么可能是这样?如果不调用 send 方法,这个 Producer 都不知道给哪个主题发消息,它又怎么能知道连接哪个 Broker 呢?难不成它会连接 bootstrap.servers参数指定的所有 Broker 吗?嗯,是的,Java Producer 目前还真是这样设计的。
我在这里稍微解释一下 bootstrap.servers 参数。它是Producer 的核心参数之一,指定了这个 Producer 启动时要连接的 Broker 地址。请注意,这里的“启动时”,代表的是 Producer 启动时会发起与这些 Broker 的连接。因此,如果你为这个参数指定了 1000 个 Broker 连接信息,那么很遗憾,你的 Producer 启动时会首先创建与这 1000个 Broker 的 TCP 连接。
在实际使用过程中,我并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了。因为 Producer 一旦连接到集群中的任一台Broker,就能拿到整个集群的 Broker 信息,故没必要为bootstrap.servers 指定所有的 Broker。
所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下3中:
最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
至少一次(at least once):消息不会丢失,但有可能被重复发送。
精确一次(exactly once):消息不会丢失,也不会被重复发送。
目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。在专栏第 11 期中,我们说过消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。
那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。它们分别是什么机制?两者是一回事吗?要回答这些问题,我们首先来说说什么是幂等性
什么是幂等性(Idempotence)?
幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。我来举几个简单的例子说明一下。比如在乘法运算中,让数字乘以 1 就是一个幂等操作,因为不管你执行多少次这样的运算,结果都是相同的。再比如,取整函数(floor 和ceiling)是幂等函数,那么运行 1 次 floor(3.4) 和 100 次floor(3.4),结果是一样的,都是 3。相反地,让一个数加 1这个操作就不是幂等的,因为执行一次和执行多次的结果必然不同。
幂等性 Producer
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”,ture),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
看上去,幂等性 Producer 的功能很酷,使用起来也很简单,仅仅设置一个参数就能保证消息不重复了,但实际上,我们必须要了解幂等性 Producer 的作用范围。
首先,它只能保证单分区上的幂等性,即一个幂等性Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了Producer 进程之后,这种幂等性保证就丧失了。
那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别。
Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性(Isolation) 和持久性 (Durability)。
Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。下面我们就来看看 Kafka 中的事务型 Producer。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence =true。
- 设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。
此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level参数的值即可。当前这个参数有两个取值: - read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的Consumer 就不要使用这个值。
- read_committed:表明 Consumer 只会读取事务型Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
所以,kafka首先是实现了至少一次消费,但存在重复消费问题,需要开启幂等生产者才能避免重复消费,消息会多增加一些字段来让broker知道消息重复进而能够进行丢弃。但幂等性只能确保一个主题的单分区不出现重复消息,多分区还是会存在问题。另外producer进程如果重启,也没法确保幂等性(这个我也不太懂为啥)。所以需要同时用到事务型生产者和事务型消费者。
https://blog.csdn.net/qq_20394285/article/details/104391733
简单来说,幂等性 Producer 和事务型 Producer 都是Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型Producer 能做的更多。
不过,切记天下没有免费的午餐。比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。
实现上可以用kafka的幂等性来保证单分区单会话的精准一次语义,如果是一批消息,可以路由到同一个分区。
我所理解的kafka事务是这样的:生产者的事务能够保证一条消息仅仅会保存在kafka的某一个分区上,不会出现在多个分区上,另外,能够保证多条消息原子性的发送到多个分区。也就是说它只保证了从producer端到broker端消息不丢失不重复。但对于consumer端,由于偏移量的提交和消息处理的顺序有前有后,依然可能导致重复消费或者消息丢失消费,如果要实现消费者消费的精确一次,还需要通过额外机制在消费端实现偏移量提交和消息消费的事务处理。
网友评论