消息系统的用户从更严格的幂等生产者语义中获益良多,即每个消息写将被精确地持久化一次,没有重复和数据丢失——即使在客户端重试或代理失败的情况下也是如此。这些更强的语义不仅使编写应用程序更容易,而且扩展了可以使用给定消息传递系统的应用程序的空间。
然而,幂等Producer并不为跨多个主题分区的写提供保证。为此,需要更强的事务保证。能够自动写入多个主题分区。
在原子性上,我们指的是跨topicpartition将一组消息作为一个单元提交的能力:要么提交所有消息,要么不提交
流处理应用程序是“consu -transform- production”任务的管道,当流的重复处理不可接受时,绝对需要事务保证。因此,将事务保证添加到Kafka(一个流平台)使其不仅对流处理更有用,而且对其他各种应用程序也更有用。
在本文档中,我们提出了将事务引入Kafka的建议。我们将只关注面临变化的用户:客户端API的变化,我们将引入的新配置,以及保证的总结。我们还概述了基本的数据流,它总结了我们将在事务中引入的所有新rpc。设计细节在单独的文档中给出。
简单介绍一下Transaction和Streams
在上一节中,我们提到事务的主要动机是在Kafka流中只启用一次处理。我们有必要再深入研究一下这个用例,
回想一下,使用Kafka Streams的数据转换通常通过多个stream processors进行,每个处理器由Kafka主题连接。这个设置被称为流拓扑,基本上是一个DAG,其中流处理器是节点,连接Kafka主题是顶点。这种模式是所有流架构的典型模式。您可以在这里阅读更多关于Kafka streams架构的内容。
因此,Kafka流的事务本质上将包含输入消息、本地状态存储的更新和输出消息。在事务中包含输入偏移量会促使将“sendOffsets”API添加到生产者接口,如下所述。进一步的细节将在单独的KIP中呈现。
Public Interfaces
Producer API changes
生产者将获得5个新方法(initTransactions、beginTransaction、sendoffset、commitTransaction、abortTransaction),并更新send方法以抛出一个新的异常。详情如下:
public interface Producer<K,V> extends Closeable {
/**
* Needs to be called before any of the other transaction methods. Assumes that
* the transactional.id is specified in the producer configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer
* are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* @throws IllegalStateException if the TransactionalId for the producer is not set
* in the configuration.
*/
void initTransactions() throws IllegalStateException;
/**
* Should be called before the start of each new transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void beginTransaction() throws ProducerFencedException;
/**
* Sends a list of consumed offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* consumed only if the transaction is committed successfully.
*
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/**
* Commits the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void commitTransaction() throws ProducerFencedException;
/**
* Aborts the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void abortTransaction() throws ProducerFencedException;
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
*
* @param record The record to send
* @return A future which will eventually contain the response information
*
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}
The OutOfOrderSequence Exception
如果代理检测到数据丢失,生产者将引发OutOfOrderSequenceException
。换句话说,如果它接收到的序列号大于它所期望的序列号。这个异常将在将来返回并传递给回调(如果有的话)。这是一个致命的异常,以后对产生器方法(如send
、beginTransaction
、commitTransaction
等)的调用将引发IlegalStateException
public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Note that the ‘transactional.id’ configuration _must_ be specified in the
// producer config in order to use transactions.
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
// We need to initialize transactions once per producer instance. To use transactions,
// it is assumed that the application id is specified in the config with the key
// transactional.id.
//
// This method will recover or abort transactions initiated by previous instances of a
// producer with the same app id. Any other transactional messages will report an error
// if initialization was not performed.
//
// The response indicates success or failure. Some failures are irrecoverable and will
// require a new producer instance. See the documentation for TransactionMetadata for a
// list of error codes.
producer.initTransactions();
while(true) {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
// Start a new transaction. This will begin the process of batching the consumed
// records as well
// as an records produced as a result of processing the input records.
//
// We need to check the response to make sure that this producer is able to initiate
// a new transaction.
producer.beginTransaction();
// Process the input records and send them to the output topic(s).
List<ProducerRecord<String, String>> outputRecords = processRecords(records);
for (ProducerRecord<String, String> outputRecord : outputRecords) {
producer.send(outputRecord);
}
// To ensure that the consumed and produced messages are batched, we need to commit
// the offsets through
// the producer and not the consumer.
//
// If this returns an error, we should abort the transaction.
sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
// Now that we have consumed, processed, and produced a batch of messages, let's
// commit the results.
// If this does not report success, then the transaction will be rolled back.
producer.endTransaction();
}
}
}
}
New Configurations
Broker configs
- transactional.id.timeout.ms
transaction coordinator主动终止 producer transactionalId之前等待的ms中的最大时间量(该事务协调器没有收到来自它的任何事务状态更新)
默认值为604800000(7天)。 - max.transaction.timeout.ms
事务允许的最大超时。如果客户端请求的事务时间超过这个值,那么代理将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户端超时过大,这会阻止用户从事务中包含的主题进行读取。
默认值是900000(15分钟)。这是需要发送消息事务的时间段的保守上限。
Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.
-
transaction.state.log.replication.factor
事务状态主题的副本数。
Default: 3 -
transaction.state.log.num.partitions
事务状态主题的分区数。
Default: 50 -
transaction.state.log.min.isr
线事务状态主题的每个分区的insync副本的最小数量。
Default: 2 -
transaction.state.log.segment.bytes
事务状态主题的段大小
Default: 104857600 bytes.
Producer configs
- enable.idempotence
是否启用等幂性(默认为false)。如果禁用,生产者将不会在produce请求中设置PID字段,并且当前生产者交付语义将生效。注意,必须启用幂等性才能使用事务。
当启用idempotence时,我们强制acks=all,retries > 1
,并且max.inflight. request.per.connection =1
。如果这些配置没有这些值,我们就不能保证幂等性。如果应用程序没有显式地覆盖这些设置,当启用幂等性时,生产者将设置
acks=all, retries=Integer.MAX_VALUE,max.inflight.requests.per.connection=1
transaction.timeout.ms
事务协调器在主动终止正在进行的事务之前等待来自生产者的事务状态更新的ms中的最大时间量。
这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout.ms
在BROKER设置的ms,请求将失败,并出现InvalidTransactionTimeout
错误
默认是60000。这使得事务不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。
transactional.id
用于事务传递的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果不提供TransactionalId,则生产者仅限于幂等交付
注意,启用。如果配置了TransactionalId,则必须启用幂等性。
默认值为空,这意味着不能使用事务。
Consumer configs
isolation.level
以下是可能的值(默认为read_uncommitted
):
read_uncommitted
:消费uncommitted
和committed
的消息 in offset ordering.
read_committed
:只消费non-transactional messages(非开启事务的消息和)和committed transactional messages in offset order.未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。为了维持in offset ordering,这个设置意味着我们必须缓冲消费者中的消息,直到我们看到给定事务中的所有消息
Proposed Changes
Summary of Guarantees
- Idempotent Producer Guarantees
为了实现幂等生产者语义,我们引入了生产者id(后面成为为PID)和Kafka消息的sequence numbers的概念。在initialization过程中,每个新生产者都会被分配一个惟一的PID。PID分配对用户是完全透明的
对于给定的PID,序列号将从零开始并单调递增,每个主题分区产生一个序列号。在发送给Broker的每个消息上,Produer将递增序列号。Broker在内存中维护从每个PID
接收到的每个TopicPartition
的sequence numbers(<PID,TopicPartition>)。
- 如果一个produce request的
sequence numbers
大于PID/TopicPartition
上次提交 committed message,BROKER将拒绝该请求。 -
较低
的sequence numbers
消息会导致重复错误,生产者可以忽略该错误。 - 具有
较高
sequence numbers的消息会导致序列错误,这表示一些消息已经丢失,并且是致命的
这确保了,即使producer在失败时必须retry requests,每条消息都将被精确地保存在日志中一次。此外,由于为new instance of a producer分配惟一的PID,因此我们只能保证在单个producer 会话
中实现幂等
生产。
这些幂等生产者语义对于无状态应用程序(如度量跟踪和审计)可能非常有用。
- Transactional Guarantees
Transactional,可以自动对多个TopicPartitions的写入保证事务,例如。作为一个单元,对这些主题分区的所有写操作都将成功或失败
因为offsets topic记录了consumer 消费的进度,因此利用了上述功能,使应用程序能够将consumed and produced 的消息批次绑定到到一个原子单元中
只有当整个“consu -transform- production”被完整地执行时,才可以认为一次完整的事务完成。
另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明
当提供这样一个TransactionalId时,Kafka将保证:
1.跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。
2.跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。
需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:
- 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
- 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
- Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
- Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息
关键概念
为了实现Transaction,确保一组消息是自动的produce和consumer,我们引入了几个新概念:
我们将引入一个称为事务协调器的实体对象。与group coordinator类似,每个生产者都被分配一个transaction coordinator,所有分配pid和管理事务
的逻辑都由transaction coordinator完成。
我们引入控制消息
的概念。这些是写进用户主题
的特殊消息
,由client
处理,但对用户透明。例如,可以使用它们让broker向consumer表明以前获取的消息是否已经原子提交。在此之前已经提出了控制消息。
我们引入了TransactionalId
的概念,使用户能够以持久的方式标识producer。具有相同TransactionalId的生产者的不同实例将能够resume(或abort)前一个实例实例化的任何事务
我们引入了producer epoch的概念,它使我们能够确保具有给定TransactionalId
的producer只有一个合法的active实例,从而使我们能够在发生故障时维护保证事务。
除了上面的新概念之外,我们还引入了新的请求类型、现有请求的新版本和核心消息格式的新版本,以支持事务。所有这些的细节将在其他文档介绍。
完整事务过程
image.png image.png1. 找到 transaction coordinator -- the FindCoordinatorRequest
由于Transaction Coordinator是分配PID和管理事务的核心,因此Producer要做的第一件事情就是通过向任意一个Broker发送FindCoordinator请求找到Transaction Coordinator的位置。
-
获取 producer Id -- the InitPidRequest
找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。
- 2.1 如果事务特性被开启,InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的InitPidRequest请求,它将会把该<TransactionID, PID>存入Transaction Log,如上图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。
除了返回PID外,InitPidRequest还会执行如下任务:
增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。
注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。
- 2.2 另外,如果事务特性未开启,InitPidRequest可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性
-
启动Transaction – The beginTransaction() API
Kafka从0.11.0.0版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。 -
The consume-transform-produce loop
这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。
- 4.1 AddPartitionsToTxnRequest
一个Producer可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。
Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。
另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间
- 4.2 ProduceRequest
Producer通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number。该过程如上图中步骤4.2所示。
4.3 AddOffsetCommitsToTxnRequest
Producer有一个新的KafkaProducer.sendOffsetsToTransaction
API方法,它支持批量处理consumed and produced 的消息。这个方法接受Map<TopicPartitions、OffsetAndMetadata>和一个groupId参数。
该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤4.3所示。该方法会阻塞直到收到响应。
4.4 TxnOffsetCommitRequest
作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.4所示。
在此过程中,Consumer Coordinator会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。
这里需要注意:
写入__consumer_offsets的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,也即对应的消息尚未被完成处理。
Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。
5. Committing or Aborting a Transaction
一旦上述数据写入操作完成,应用程序必须调用KafkaProducer.endTransaction方法或者KafkaProducer.abortTransaction方法以结束当前事务。
5.1 EndTxnRequest
commitTransaction方法使得Producer写入的数据对下游Consumer可见。abortTransaction方法通过Transaction Marker将Producer写入的数据标记为Aborted状态。下游的Consumer如果将isolation.level设置为READ_COMMITTED,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。
无论是Commit还是Abort,Producer都会发送EndTxnRequest请求给Transaction Coordinator,并通过标志位标识是应该Commit还是Abort。
收到该请求后,Transaction Coordinator会进行如下操作
- 将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,如上图中步骤5.1所示
- 通过WriteTxnMarkerRequest以Transaction Marker的形式将COMMIT或ABORT信息写入用户数据日志以及Offset Log中,如上图中步骤5.2所示
- 最后将COMPLETE_COMMIT或COMPLETE_ABORT信息写入Transaction Log中,如上图中步骤5.3所示
5.2 WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的Leader。收到该请求后,对应的Leader会将对应的COMMIT(PID)或者ABORT(PID)控制信息写入日志,如上图中步骤5.2所示。
该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。
这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息。
5.3 Writing the final Commit or Abort Message
写完所有的Transaction Marker后,Transaction Coordinator会将最终的COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束,如上图中步骤5.3所示。
此时,Transaction Log中所有关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。
另外,COMPLETE_COMMIT或COMPLETE_ABORT的写入并不需要得到所有Rreplica的ACK,因为如果该消息丢失,可以根据事务协议重发。
补充说明,如果参与该事务的某些<Topic, Partition>在被写入Transaction Marker前不可用,它对READ_COMMITTED的Consumer不可见,但不影响其它可用<Topic, Partition>的COMMIT或ABORT。在该<Topic, Partition>恢复可用后,Transaction Coordinator会重新根据PREPARE_COMMIT或PREPARE_ABORT向该<Topic, Partition>发送Transaction Marker。
总结
PID与Sequence Number的引入实现了写操作的幂等性
写操作的幂等性结合At Least Once语义实现了单一Session内的Exactly Once语义
Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
Offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理
Kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的Offset的更新进行同样的标记(即Transaction Marker)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见
Kafka只提供对Kafka本身的读写操作的事务性,不提供包含外部系统的事务性
参考自
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka#TransactionalMessaginginKafka-ProducerIDsandstategroups
https://www.confluent.io/blog/transactions-apache-kafka/
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
网友评论