美文网首页
kafka 0.11 事务机制梳理

kafka 0.11 事务机制梳理

作者: 稻草人_d41b | 来源:发表于2020-01-16 17:25 被阅读0次

    接下来从三个部分介绍下kafka 0.11及以上版本如何实现事务机制。

    第一部分:kafka producer生产records的过程。

    第二部分:kafka producer配置使用事务机制注意事项。

    第三部分:kafka producer如何实现事务机制。

    kafka producer在0.11版本增加新增了幂等性、事务机制等特性。接下来主要介绍下producer是如何实现幂等性的。

    kafka producer生产records的过程

    Producer<String, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 100; i++)

    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

    producer.close();

    上面是基本的kafka producer代码。其中的send方法是异步的,使用send方法实际是调用add records到producer创建的buffer。然后producer会批量提交buffer中的records,从而实现高效的发生records。其中存储Record的buffer使用buffer =free.allocate(size, maxTimeToBlock)进行分配。

    申请存储kafka record的buffer producer send方法主代码

    查看producer源码后可以看到是将records解析生成TopicPartition、serializedKey、serializedValue等对象后存储到RecordAccumulator累加器中的。将record存储到Deque<ProducerBatch>队列后会返回RecordAppendResult对象,如果RecordAppendResult对象提示队列已满的话,会唤醒sender方法,给kafka发送消息。

    尝试写入records数据到队列中

    在kafka producer中创建kafkaclient线程,并启动sender线程。

    启动sender线程

    在使用kafka producer时,我们可以配置

    batch.size:producer中每个分区中可以保存的没有发送的records大小。

    buffer.memory:producer缓存records时可以使用的内存大小。如果records发送速度超过传送速度的话,buffer空间就会耗光,其他发送将被阻塞。max.block.ms可以设置发送端的最大阻塞时间,如果超过这个时间将报异常。

    kafka producer配置使用事务机制注意事项

    上面回顾了kakfa producer发送record的主过程,现在我们回到正题,看下kakfa配置幂等性和事务机制。

    幂等性

    在使用kafka 幂等性(idempotence)时,producer中的retries默认配置的是Integer.MAX_VALUE,max.in.flight.requests.per.connection默认配置为1,acks默认配置为all。使用幂等性的优势是可以避免应用级别的数据重发。如果发生错误,需要关闭producer,检查最后产生的消息是否有重复。与此同时,producer只有在一个session中可以确保消息的幂等性。

    事务机制

    在使用事务事,必须设置transactional.id,设置事务id的目的是确保在一个producer实例下,可以实现跨域多个session下的恢复事务。同时replication.factor至少设置为3,min.insync.replicas应该被设置为2。如果要确保端到端的事务机制,需要在consumer中配置只读已经提交的消息。

    kafka producer如何实现事务机制

    producer事务主代码

    Properties props = new Properties();

     props.put("bootstrap.servers", "localhost:9092");

    props.put("transactional.id", "my-transactional-id");

    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

    producer.initTransactions();

    try {

    producer.beginTransaction();

    for (int i = 0; i < 100; i++)

       producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

       producer.commitTransaction();

       } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

        producer.close();

     } catch (KafkaException e) {

       producer.abortTransaction();

     } 

    producer.close();

    transactional producer使用异常传递错误状态,如果是KafkaException错误,则会直接终止事务。而transactional producer定义了如下几种stata,并定义如果当前是这种stata时,对应的前一个状态是什么,再根据具体stata确定处理方式。

    UNINITIALIZED ——》UNINITIALIZED

    INITIALIZING ——》

    READY ——》INITIALIZING、COMMITTING_TRANSACTION、ABORTING_TRANSACTION

    IN_TRANSACTION ——》READY

    COMMITTING_TRANSACTION ——》IN_TRANSACTION

    ABORTING_TRANSACTION ——》IN_TRANSACTION、ABORTABLE_ERROR

    ABORTABLE_ERROR ——》IN_TRANSACTION、COMMITTING_TRANSACTION、ABORTABLE_ERROR

    FATAL_ERROR;

    在TransactionManager中定义了initializeTransactions、beginTransaction、beginCommit、beginAbort、beginCompletingTransaction具体实现方法。而TxnRequestHandler抽象类中定义了每个阶段的处理方法,包括如下对象:

    AddOffsetsToTxnHandler

    AddPartitionsToTxnHandler

    EndTxnHandler

    FindCoordinatorHandler

    InitProducerIdHandler

    执行的先后顺序是FindCoordinatorHandler-》InitProducerIdHandler-》AddPartitionsToTxnHandler-》EndTxnHandler-》AddOffsetsToTxnHandler-》TxnOffsetCommitHandler

    简单拿一个handler介绍下。如下

    handler处理返回reponse

    从上图可以看出根据handler返回的response的不同情况进行相关处理。比如:在提交数据时因为coordinate的问题话,会先尝试查询coordinates,然后将没有提交成功的数据重新写入队列中,而如果发生其他error错误的话,可能会直接返回错误。

    相关文章

      网友评论

          本文标题:kafka 0.11 事务机制梳理

          本文链接:https://www.haomeiwen.com/subject/iywhsftx.html