接下来从三个部分介绍下kafka 0.11及以上版本如何实现事务机制。
第一部分:kafka producer生产records的过程。
第二部分:kafka producer配置使用事务机制注意事项。
第三部分:kafka producer如何实现事务机制。
kafka producer在0.11版本增加新增了幂等性、事务机制等特性。接下来主要介绍下producer是如何实现幂等性的。
kafka producer生产records的过程
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
上面是基本的kafka producer代码。其中的send方法是异步的,使用send方法实际是调用add records到producer创建的buffer。然后producer会批量提交buffer中的records,从而实现高效的发生records。其中存储Record的buffer使用buffer =free.allocate(size, maxTimeToBlock)进行分配。
申请存储kafka record的buffer producer send方法主代码查看producer源码后可以看到是将records解析生成TopicPartition、serializedKey、serializedValue等对象后存储到RecordAccumulator累加器中的。将record存储到Deque<ProducerBatch>队列后会返回RecordAppendResult对象,如果RecordAppendResult对象提示队列已满的话,会唤醒sender方法,给kafka发送消息。
尝试写入records数据到队列中在kafka producer中创建kafkaclient线程,并启动sender线程。
启动sender线程在使用kafka producer时,我们可以配置
batch.size:producer中每个分区中可以保存的没有发送的records大小。
buffer.memory:producer缓存records时可以使用的内存大小。如果records发送速度超过传送速度的话,buffer空间就会耗光,其他发送将被阻塞。max.block.ms可以设置发送端的最大阻塞时间,如果超过这个时间将报异常。
kafka producer配置使用事务机制注意事项
上面回顾了kakfa producer发送record的主过程,现在我们回到正题,看下kakfa配置幂等性和事务机制。
幂等性
在使用kafka 幂等性(idempotence)时,producer中的retries默认配置的是Integer.MAX_VALUE,max.in.flight.requests.per.connection默认配置为1,acks默认配置为all。使用幂等性的优势是可以避免应用级别的数据重发。如果发生错误,需要关闭producer,检查最后产生的消息是否有重复。与此同时,producer只有在一个session中可以确保消息的幂等性。
事务机制
在使用事务事,必须设置transactional.id,设置事务id的目的是确保在一个producer实例下,可以实现跨域多个session下的恢复事务。同时replication.factor至少设置为3,min.insync.replicas应该被设置为2。如果要确保端到端的事务机制,需要在consumer中配置只读已经提交的消息。
kafka producer如何实现事务机制
producer事务主代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();
transactional producer使用异常传递错误状态,如果是KafkaException错误,则会直接终止事务。而transactional producer定义了如下几种stata,并定义如果当前是这种stata时,对应的前一个状态是什么,再根据具体stata确定处理方式。
UNINITIALIZED ——》UNINITIALIZED
INITIALIZING ——》
READY ——》INITIALIZING、COMMITTING_TRANSACTION、ABORTING_TRANSACTION
IN_TRANSACTION ——》READY
COMMITTING_TRANSACTION ——》IN_TRANSACTION
ABORTING_TRANSACTION ——》IN_TRANSACTION、ABORTABLE_ERROR
ABORTABLE_ERROR ——》IN_TRANSACTION、COMMITTING_TRANSACTION、ABORTABLE_ERROR
FATAL_ERROR;
在TransactionManager中定义了initializeTransactions、beginTransaction、beginCommit、beginAbort、beginCompletingTransaction具体实现方法。而TxnRequestHandler抽象类中定义了每个阶段的处理方法,包括如下对象:
AddOffsetsToTxnHandler
AddPartitionsToTxnHandler
EndTxnHandler
FindCoordinatorHandler
InitProducerIdHandler
执行的先后顺序是FindCoordinatorHandler-》InitProducerIdHandler-》AddPartitionsToTxnHandler-》EndTxnHandler-》AddOffsetsToTxnHandler-》TxnOffsetCommitHandler
简单拿一个handler介绍下。如下
handler处理返回reponse从上图可以看出根据handler返回的response的不同情况进行相关处理。比如:在提交数据时因为coordinate的问题话,会先尝试查询coordinates,然后将没有提交成功的数据重新写入队列中,而如果发生其他error错误的话,可能会直接返回错误。
网友评论