Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
本篇接上篇Flink 源码之 KafkaSource,详细分析新Sink架构,以及新架构下KafkaSink
的实现逻辑。
KafkaSink创建和使用
我们先从KafkaSink
的使用开始分析。新的KafkaSink
使用起来不是很复杂。参见官网给出的示例代码如下:
DataStream<String> stream = ...;
// 使用builder方式创建
KafkaSink<String> sink = KafkaSink.<String>builder()
// 配置Kafka broker
.setBootstrapServers(brokers)
// 配置RecordSerializer,定义如何转换Flink内部数据类型到Kafka的ProducerRecord类型
// 需要指定topic名称和key/value的序列化器
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 配置数据投送语义保证,至少一次,精准一次投送等
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 指定数据流的sink为KafkaSink
stream.sinkTo(sink);
如上面例子所述,sink主要的配置为Kafka的bootstrap server,序列化器和DeliveryGuarantee
(数据投送保障)。
其中DeliveryGuarantee
具有三个选项。分别为:
-
EXACTLY_ONCE
: 精准一次投送。这是最严格,最理想的数据投送保证。数据不丢失不重复。 -
AT_LEAST_ONCE
: 至少一次投送。数据保证不丢失,但可能会重复。 -
NONE
: 无任何额外机制保证。数据有可能丢失或者重复。
几个重要的接口/角色
接下来开始源代码分析章节。新Sink架构中有几个比较重要的接口。下面分别介绍下这些接口和KafkaSink
中的相关实现。
TwoPhaseCommittingSink
Two Phase Commit即两阶段提交,用于精准一次投送语义。两阶段提交分为预提交和最终提交两步。其中预提交在sink进行checkoint的时候时候进行,这时候数据虽然已经发送到外部系统,但是没有最终提交,这部分数据对于外部系统是不可见的。最终提交在Flink所有的算子checkpoint都成功之后调用,将预提交的数据真正提交,在这之后外部系统能够看到预提交的数据。整个提交过程完成。通过这正机制,Flink可以保证数据不丢失不重复。本人分析过老版本Flink两阶段提交,具体内容参见 Flink 源码之两阶段提交。
TwoPhaseCommittingSink
包含支持预提交的PrecommittingSinkWriter
和支持最终提交的Committer
。接口代码和分析如下所示:
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
/**
* Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
* input.
*
* @param context the runtime context.
* @return A sink writer for the two-phase commit protocol.
* @throws IOException for any failure during creation.
*/
// 创建PrecommittingSinkWriter。这个writer实现了部分两阶段提交的功能,实现类要求必须实现预提交(prepareCommit)方法
// 接口定义在下面
PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
/**
* Creates a {@link Committer} that permanently makes the previously written data visible
* through {@link Committer#commit(Collection)}.
*
* @return A committer for the two-phase commit protocol.
* @throws IOException for any failure during creation.
*/
// 创建Committer。Committer用来正式提交数据。正式提交之后这些数据在Kafka中可见
Committer<CommT> createCommitter() throws IOException;
/** Returns the serializer of the committable type. */
// 返回提交物(管理Kafka提交有关的元数据,即KafkaCommittable)的序列化器
// 这个序列化器中包含版本号,随着序列化的数据一起存储,这样在序列化器迭代升级之后可以通过版本号得知该数据是通过哪个版本的序列化器序列化的
SimpleVersionedSerializer<CommT> getCommittableSerializer();
/** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
// 用来定义两阶段提交第一步预提交(prepareCommit)方法
@PublicEvolving
interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
/**
* Prepares for a commit.
*
* <p>This method will be called after {@link #flush(boolean)} and before {@link
* StatefulSinkWriter#snapshotState(long)}.
*
* @return The data to commit as the second step of the two-phase commit protocol.
* @throws IOException if fail to prepare for a commit.
*/
// 这个方法需要在SinkWriter的flush方法之后和snapshotState(生成快照)之前调用
// flush确保所有的数据都已输出,不丢失数据。snapshotState的时候会KafkaWriter会获取新的producer开启新的事务
// 所以需要在这个时候执行预提交
Collection<CommT> prepareCommit() throws IOException, InterruptedException;
}
}
KafkaSink
实现了TwoPhaseCommittingSink
。Kafka负责预提交和最终提交的对象分别为KafkaWriter
和KafkaCommitter
。后面分析提交流程的时候再介绍有关的方法。
Committer
Committer
是TwoPhaseCommittingSink
接口中用来做最终提交的接口。它将PrecommittingSinkWriter
中预提交的内容真正的提交上去。
Committer
接口内容如下:
@PublicEvolving
public interface Committer<CommT> extends AutoCloseable {
/**
* Commit the given list of {@link CommT}.
*
* @param committables A list of commit requests staged by the sink writer.
* @throws IOException for reasons that may yield a complete restart of the job.
*/
// 将可提交物(committables,对应泛型CommT)提交
// 这里的提交为两阶段提交的最终提交
void commit(Collection<CommitRequest<CommT>> committables)
throws IOException, InterruptedException;
/**
* A request to commit a specific committable.
*
* @param <CommT>
*/
// 用来提交某个committable的请求
@PublicEvolving
interface CommitRequest<CommT> {
/** Returns the committable. */
// 获取request中携带的committable
CommT getCommittable();
/**
* Returns how many times this particular committable has been retried. Starts at 0 for the
* first attempt.
*/
// 返回committable提交操作已重试次数
int getNumberOfRetries();
/**
* The commit failed for known reason and should not be retried.
*
* <p>Currently calling this method only logs the error, discards the comittable and
* continues. In the future the behaviour might be configurable.
*/
// 提交失败原因已知的时候调用
void signalFailedWithKnownReason(Throwable t);
/**
* The commit failed for unknown reason and should not be retried.
*
* <p>Currently calling this method fails the job. In the future the behaviour might be
* configurable.
*/
// 提交失败原因未知的时候调用
void signalFailedWithUnknownReason(Throwable t);
/**
* The commit failed for a retriable reason. If the sink supports a retry maximum, this may
* permanently fail after reaching that maximum. Else the committable will be retried as
* long as this method is invoked after each attempt.
*/
// 尝试稍后重新提交。如果设置有重试次数上限,在达到重试次数上限之后会一直失败
void retryLater();
/**
* Updates the underlying committable and retries later (see {@link #retryLater()} for a
* description). This method can be used if a committable partially succeeded.
*/
// 和上面方法基本相同,区别是再次尝试提交之前会更新committable
void updateAndRetryLater(CommT committable);
/**
* Signals that a committable is skipped as it was committed already in a previous run.
* Using this method is optional but eases bookkeeping and debugging. It also serves as a
* code documentation for the branches dealing with recovery.
*/
// 表示committable已经成功提交过了
void signalAlreadyCommitted();
}
}
KafkaCommitter
实现了此Committer
接口。
StatefulSink
顾名思义,即有状态的Sink。这个Sink支持创建和从checkpoint保存的writerState中恢复出StatefulSinkWriter
。StatefulSinkWriter
除了继承SinkWriter
的所有特性外,还要求必须实现snapshotState
逻辑,即必须要支持将自身状态存入checkpoint。接口主要内容和上面描述类似,这里不再贴出源代码。KafkaSink
同时实现了TwoPhaseCommittingSink
和StatefulSink
。
KafkaCommittable
在Kafka针对新Sink接口的实现中,KafkaCommittable
是Committer
接口中的可提交物(committable)。
KafkaCommittable
包含如下信息:
// kafkaProducer的ID
private final long producerId;
// kafka producer对应的年代信息
private final short epoch;
// 事务id,这个是最关键的
private final String transactionalId;
// 负责提交这个事务的FlinkKafkaInternalProducer
@Nullable private Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer;
其中producer id和epoch用于确保消息的幂等性。开启幂等性之后,kakfaProducer向broker发送数据之前会先确认自己是否已经获取到了producer id。每个producer都会分配唯一的producer id。epoch为producer年代信息,用来kafka producer fencing,即解决僵尸实例问题。Kafka broker会拒绝相同transactionalId但是epoch更旧的producer的事务性写入。
transactionalId用来保存kafka事务id。事务id是根据Flink内部checkpoint id生成的。每次checkpoint对应不同的事务id。生成逻辑后面代码中有分析。
最后是被Recyclable
类包装起来的FlinkKafkaInternalProducer
。Recyclable
意思为可回收利用的。实际上在后面的分析中不难发现Flink KafkaWriter
中维护了一个producer池。producer在使用完毕之后,会被Recyclable
回收再加入到producer池中。详细的分析请参见后面章节。
CommittableCollector
CommittableCollector
用于暂存commit进度。内部维护了两层结构。第一层为checkpoint
和CheckpointCommittableManagerImpl
的对应关系,每个CheckpointCommittableManagerImpl
中维护了subtask id和SubtaskCommittableManager
的对应关系,即第二层结构。SubtaskCommittableManager
中包含了CommitRequestImpl
。所以说,CommittableCollector
根据checkpoint id和subtask id,可以找出关联的CommitRequestImpl
,即关联的committable。
数据写入Kafka和提交流程
接下来我们开始分析数据写入流程和两阶段提交流程。
数据写入流程
数据写入的逻辑位于SinkOperator
的processElement
方法。代码如下所示:
@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
// context封装了当前处理的StreamRecord,watermark和获取StreamRecord timestamp的逻辑
context.element = element;
// 调用SinkWriter的write方法
sinkWriter.write(element.getValue(), context);
}
在使用KafkaSink的场景中,sinkWriter
为KafkaWriter
。我们查看下它的write
方法集体做了什么。
@Override
public void write(IN element, Context context) throws IOException {
// 将数据(element),kafkaSinkContext(包含subtask的id,subtask数量和kafkaProducer的配置信息)序列化为字节数组,构造成Kafka的ProducerRecord
// 这个序列化器是创建KafkaSink的时候配置的
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
// 发送数据到kafka
currentProducer.send(record, deliveryCallback);
// 已发送数据条数监控计数器加1
numRecordsSendCounter.inc();
}
两阶段提交——预提交
Kafka预提交的执行的时机为当前operator需要checkpoint,在向下游发送checkpoint barrier之前。这个时间点调用的方法为SinkWriterOperator::prepareSnapshotPreBarrier
。
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
if (!endOfInput) {
// 如果数据输入没有结束,先flush writer
sinkWriter.flush(false);
// 调用emitCommittables方法
emitCommittables(checkpointId);
}
// no records are expected to emit after endOfInput
}
继续分析emitCommittables
方法。
private void emitCommittables(Long checkpointId) throws IOException, InterruptedException {
// 是否向下游发送
// 如果sink是TwoPhaseCommittingSink类型,emitDownstream为true
// 这个if分支用来兼容老版本的sink接口
if (!emitDownstream) {
// To support SinkV1 topologies with only a writer we have to call prepareCommit
// although no committables are forwarded
if (sinkWriter instanceof PrecommittingSinkWriter) {
((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
}
return;
}
// 调用prepareCommit方法,两阶段提交的预提交方法
Collection<CommT> committables =
((PrecommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
// 获取StreamingRntimeContext,该对象包含了流作业运行时信息
StreamingRuntimeContext runtimeContext = getRuntimeContext();
// 如果此subTask的id
final int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
// 获取subTask的并行度
final int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
// Emit only committable summary if there are legacy committables
// 如果使用的是老的sink API,发送统计信息到下游
// legacyCommittables在状态恢复的时候被赋值
if (!legacyCommittables.isEmpty()) {
checkState(checkpointId > InitContext.INITIAL_CHECKPOINT_ID);
emit(
indexOfThisSubtask,
numberOfParallelSubtasks,
InitContext.INITIAL_CHECKPOINT_ID,
legacyCommittables);
legacyCommittables.clear();
}
// 发送这些committable内容到下游
emit(indexOfThisSubtask, numberOfParallelSubtasks, checkpointId, committables);
}
在使用Kafka的场景下,sinkWriter
为KafkaWriter
,它的prepareCommit
方法内容和分析如下:
@Override
public Collection<KafkaCommittable> prepareCommit() {
// 如果配置为精准一次投送,需要返回预提交阶段需要处理的committable
// 其他情况不需要两阶段提交,故这里返回空集合
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
final List<KafkaCommittable> committables =
Collections.singletonList(
KafkaCommittable.of(currentProducer, producerPool::add));
LOG.debug("Committing {} committables.", committables);
return committables;
}
return Collections.emptyList();
}
这里我们穿插进来一段KafkaWriter和KafkaProducer相关的代码分析。KafkaWriter
中有一个currentProducer
变量,字面意思是当前使用的producer,言外之意是producer会变化。在这里不好理解为什么这样设计。我们先分析代码,后面展开的时候会为大家解开疑惑。
currentProducer
的创建逻辑位于KafkaWriter
的构造方法。我们分析下有关片段:
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
// 如果是精准一次投送,终止还未提交的事务
abortLingeringTransactions(
checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1);
// 获取一个FlinkKafkaInternalProducer,将checkpoint id增加1
// EXACTLY_ONCE模式KafkaWriter需要维护一个producer池,这个池的作用后面分析
this.currentProducer = getTransactionalProducer(lastCheckpointId + 1);
// 开启事务
this.currentProducer.beginTransaction();
} else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
// 如果是至少一次投送或者未配置(NONE),创建出一个新的FlinkKafkaInternalProducer,不指定transactionalId
this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
closer.register(this.currentProducer);
initKafkaMetrics(this.currentProducer);
} else {
throw new UnsupportedOperationException(
"Unsupported Kafka writer semantic " + this.deliveryGuarantee);
}
这里需要注意。大家已经看到上面分析在创建KafkaWriter的时候,如果配置了精准一次投送,KafkaWriter
会为Producer开启事务。在Commiter
最终提交的时候会提交事务。不可能只有创建KafkaWriter
的时候才开启事务,开启新事物的地方一定不止这一处。实际上currentProducer
在KafkaWriter
checkpoint的时候(snapshotState
方法中)会变更,它不在数据提交流程中,所以这里将它单独提出来分析。snapshotState
的时候从producerPool中获取一个新的producer。传入的checkpoint id自增1,意味着这个新producer的事务对应下一个checkpoint。总结一下,在精准一次投送场景中,KafkaWriter
使用了Producer池,从池中获取一个producer,指定它的transactionalId
(依据checkpoint id分配),开启事务后通过这个producer send数据。等到checkpoint的时候将这个producer和对应的checkpoint,transactionalId等信息保存起来(在KafkaCommittable
中保存,后面分析)。然后从池中拿一个新的producer,开启新的事务继续重复以上过程。上面被保存状态的producer等到所有operator都checkpoint成功,即CheckpointCoordinator
发送notifyCheckpointComplete
的时候提交事务(两阶段提交最终提交),然后回收这个producer到producer池,以便下个checkpoint的时候复用。
下面分析checkpoint的逻辑。snapshotState
方法印证了上面所述的部分内容:
@Override
public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
// 如果使用精准一次投送,从producer池中拿一个新的producer,checkpoint自增1
currentProducer = getTransactionalProducer(checkpointId + 1);
// 开启新事务
currentProducer.beginTransaction();
}
return ImmutableList.of(kafkaWriterState);
}
继续分析获取producer的getTransactionalProducer
方法。代码分析如下:
private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
// in case checkpoints have been aborted, Flink would create non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized (empty) transactions
// 如果前面的checkpoint被终止过,这里面checkpoint id可能和上一次的checkpoint id不连续
// 这个for循环的目的就是确保把把checkpoint id之间的空当使用空的事务填充了
for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
// 根据checkpoint id 构建transactionalId
// 构建的逻辑为:
// transactionalIdPrefix
// + TRANSACTIONAL_ID_DELIMITER
// + subtaskId
// + TRANSACTIONAL_ID_DELIMITER
// + checkpointOffset
String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
// 获取或者创建一个producer
producer = getOrCreateTransactionalProducer(transactionalId);
}
// 更新lastCheckpointId
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}", producer.getTransactionalId());
// 返回这个producer
return producer;
}
继续分析获取或创建producer的方法:getOrCreateTransactionalProducer
。该方法从producer池(producerPool)中获取一个producer,如果池中没有可用的producer则创建一个新的。代码和分析如下所示:
private FlinkKafkaInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(
String transactionalId) {
// 先从producer池中拿一个producer
FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
// 如果没拿到
if (producer == null) {
// 创建一个新的producer
producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
// 加入到closer中,当KafkaWriter关闭的时候可以统一关闭所有的producer
closer.register(producer);
// 初始化事务
// 设置producer的transactionalId之后必须要调用initTransactions方法
producer.initTransactions();
// 初始化监控信息
initKafkaMetrics(producer);
} else {
// 如果从池中拿到了producer
// 配置transactionId并且initTransactions
producer.initTransactionId(transactionalId);
}
// 返回producer
return producer;
}
KafkaWriter
维护的KafkaProducer
为FlinkKafkaInternalProducer
类型。它继承自KafkaProducer
,因此能够访问到KafkaProducer
的一些内部变量,从而增强了对Kafka事务的管理。它内部通过反射拿到org.apache.kafka.clients.producer.internals.TransactionManager
,能够访问修改它的私有变量producerIdAndEpoch
和transactionalId
等,增强了事务功能。此外FlinkKafkaInternalProducer
内部还有一个inTransaction
变量用来记录目前状态是否处于事务当中(调用了beginTransaction
但还未提交)。
最后我们看下Flink如何将Producer信息封装为Committer
的可提交物KafkaCommittable
的方法。这段逻辑位于KafkaCommittable::of
中。内容如下所示:
public static <K, V> KafkaCommittable of(
FlinkKafkaInternalProducer<K, V> producer,
Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
return new KafkaCommittable(
producer.getProducerId(),
producer.getEpoch(),
producer.getTransactionalId(),
new Recyclable<>(producer, recycler));
}
KafkaCommittable
保存了producer的producerID,epoch和transactionalId信息,还有这个producer本身。这个producer被Recyclable
(可循环利用的)类包装起来,一旦producer使用完了,会调用recycler将其回收。实际调用的时候(KafkaWriter
的prepareCommit
方法),recycler
为producerPool::add
,意味着producer使用完毕之后会添加会producer队列。印证了上面对producer的分析。
上面我们穿插进行了producer有关的代码分析。接下来回到SinkWriterOperator
的emitCommittables
方法。这个方法最终将committable相关信息发送到下游。
我们分析往下游发送committable相关信息的emit
方法。
private void emit(
int indexOfThisSubtask,
int numberOfParallelSubtasks,
long checkpointId,
Collection<CommT> committables) {
output.collect(
new StreamRecord<>(
new CommittableSummary<>(
indexOfThisSubtask,
numberOfParallelSubtasks,
checkpointId,
committables.size(),
committables.size(),
0)));
for (CommT committable : committables) {
output.collect(
new StreamRecord<>(
new CommittableWithLineage<>(
committable, checkpointId, indexOfThisSubtask)));
}
}
这个方法接收如下参数:
- 当前subtask的index
- subtask的并行度
- 当前checkpoint id
- committables集合
然后将信息封装为CommittableSummary
和CommittableWithLineage
发往下游。其中CommittableSummary
为committable汇总信息,用来描述提交的进度,包含当前subtask的id,subtask数量,checkpoint id,committable数量,待提交的committable数量和提交失败的数量。CommittableWithLineage
用于向Committer
传递committable元数据。
这里需要告诉大家CommitterOperator
是SinkWriterOperator
的下游(至于为什么,参见SinkTransformationTranslator::addCommittingTopology
方法)。
大家可能会问下游是如何处理这些信息的呢?接下来我们分析下游CommitterOperator
的processElement
方法。
@Override
public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
// committableCollector用来保管各个subtask和checkpoint的committable提交状态
// commit信息分为两种:CommittableSummary和CommittableWithLineage
committableCollector.addMessage(element.getValue());
// in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the
// committables
// 获取checkpoint id
OptionalLong checkpointId = element.getValue().getCheckpointId();
// 如果获取到的checkpoint id小于等于最近一次的checkpoint id,说明这些committable的内容需要被最终提交
// 调用commitAndEmitCheckpoints方法
// 这个方法在最终提交的时候也用到了,放到下一节分析
if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) {
commitAndEmitCheckpoints();
}
}
CommittableCollector
的addMessage
方法将committable信息暂存起来,逻辑如下:
public void addMessage(CommittableMessage<CommT> message) {
if (message instanceof CommittableSummary) {
addSummary((CommittableSummary<CommT>) message);
} else if (message instanceof CommittableWithLineage) {
addCommittable((CommittableWithLineage<CommT>) message);
}
}
上面提到的CommittableSummary
和CommittableWithLineage
都是CommittableMessage
的接口的实现类。这个方法区分开这两种类型消息,分别调用addSummary
和addCommittable
方法。
private void addSummary(CommittableSummary<CommT> summary) {
checkpointCommittables
.computeIfAbsent(
summary.getCheckpointId().orElse(EOI),
key ->
new CheckpointCommittableManagerImpl<>(
subtaskId,
numberOfSubtasks,
summary.getCheckpointId().orElse(EOI)))
.upsertSummary(summary);
}
checkpointCommittables
维护了checkpoint id和CheckpointCommittableManagerImpl
的对应关系。CheckpointCommittableManagerImpl
是CheckpointCommittableManager
的实现类。CheckpointCommittableManager
又继承自CommittableManager
,它是一个包装类,用来封装提交committable的逻辑。CheckpointCommittableManager
多了一个可以返回checkpoint id的方法。在addSummary
方法中,创建出checkpoint id 和此次checkpoint需要提交的committable的管理器CheckpointCommittableManagerImpl
。CheckpointCommittableManagerImpl
内部保存一系列subtask id和SubtaskCommittableManager
的对应关系。简单来说committable信息的维护分为两个层级,先按照checkpoint id(即按照committable在哪一次checkpoint提交)分类,然后再按照committable从属的subtask id分类。上面的方法先找到或创建出当前checkpoint对应的CheckpointCommittableManagerImpl
,然后将CommittableSummary
信息存放在它所属的subtask id对应的SubtaskCommittableManager
中。
upsertSummary
方法正是这个逻辑,代码如下:
void upsertSummary(CommittableSummary<CommT> summary) {
SubtaskCommittableManager<CommT> existing =
subtasksCommittableManagers.putIfAbsent(
summary.getSubtaskId(),
new SubtaskCommittableManager<>(
summary.getNumberOfCommittables(),
subtaskId,
summary.getCheckpointId().isPresent()
? summary.getCheckpointId().getAsLong()
: null));
if (existing != null) {
throw new UnsupportedOperationException(
"Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920");
}
}
我们分析完了addSummary
方法,接下来继续addCommittable
方法。addCommittable
首先找出commitable对应的checkpoint id,然后将committable添加到这个checkpoint id对应的CheckpointCommittableManagerImpl
中。
private void addCommittable(CommittableWithLineage<CommT> committable) {
getCheckpointCommittables(committable).addCommittable(committable);
}
private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(
CommittableMessage<CommT> committable) {
CheckpointCommittableManagerImpl<CommT> committables =
this.checkpointCommittables.get(committable.getCheckpointId().orElse(EOI));
return checkNotNull(committables, "Unknown checkpoint for %s", committable);
}
我们看下CheckpointCommittableManagerImpl
的addCommittable
方法。它又获取这个committable所属subtask对应的SubtaskCommittableManager
,调用它的add
方法:
void addCommittable(CommittableWithLineage<CommT> committable) {
getSubtaskCommittableManager(committable.getSubtaskId()).add(committable);
}
终于我们一路跟踪代码到SubtaskCommittableManager
,它的add
方法内容如下:
void add(CommittableWithLineage<CommT> committable) {
add(committable.getCommittable());
}
void add(CommT committable) {
checkState(requests.size() < numExpectedCommittables, "Already received all committables.");
requests.add(new CommitRequestImpl<>(committable));
}
SubtaskCommittableManager
内部有一个CommitRequestImpl
组成的双向队列。CommitRequestImpl
实现了Committer.CommitRequest<CommT>
。前面分析过CommitRequest
是用于提交某个committable的请求。这些CommitRequest
用于在两阶段提交最终提交的时候被Committer
读取出来并最终提交上去。提交逻辑在下一节最终提交分析。
最终需要提交内容的元数据保存在了CommitRequest
。这个类有3个成员变量:
// 保存committable内容
private CommT committable;
// 记录重试次数
private int numRetries;
// 保存当前CommitRequest的状态
private CommitRequestState state;
CommitRequestState有如下4个值:
- RECEIVED: 表示刚接收到/创建出Request,Request还没有得到处理。新创建出的
CommitRequest
为RECEIVED状态。 - RETRY: 表示该Request稍后重试提交。
- FAILED: 表示Request提交失败。
- COMMITTED: Request提交成功。
两阶段提交——最终提交
当Flink各个operator都成功完成checkpoint之后,CheckpointCoordinator
接下来向各个operator发送checkpoint成功的信号,调用它们的notifyCheckpointComplete
方法。只有在这个时候才能确保所有operator都运行正常,数据没有丢失,因此可以执行两阶段提交的最终提交这一步,让数据在kafka集群中对外可见。
CommitterOperator
的notifyCheckpointComplete
方法分析如下:
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (endInput) {
// This is the final checkpoint, all committables should be committed
// 如果数据源输入结束,标记上一个完成的checkpoint id为Long.MAX_VALUE,意味着所有的committable都应该提交
lastCompletedCheckpointId = Long.MAX_VALUE;
} else {
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
}
// 调用commitAndEmitCheckpoints
commitAndEmitCheckpoints();
}
继续分析commitAndEmitCheckpoints
方法。
private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
do {
// 获取并遍历所有的lastCompletedCheckpointId之前的checkpoint id对应的CheckpointCommittableManager
// 需要将它们之中存放的CommitRequest提交
for (CheckpointCommittableManager<CommT> manager :
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
// wait for all committables of the current manager before submission
// 遍历到checkpoint id为最新的checkpoint id的manager的时候,认为committable已完全接收
// getCheckpointCommittablesUpTo返回的是NavigableMap,是一个有序的map,遍历到上面所述的时候说明更早的checkpoint对应的CheckpointCommittableManager已遍历完毕,所以可以认为committable已完全接收
boolean fullyReceived =
!endInput && manager.getCheckpointId() == lastCompletedCheckpointId;
// 提交这些committable
commitAndEmit(manager, fullyReceived);
}
// !committableCollector.isFinished() indicates that we should retry
// Retry should be done here if this is a final checkpoint (indicated by endInput)
// WARN: this is an endless retry, may make the job stuck while finishing
// 如果数据输入结束(endInput为true),一直循环直到所有的SubTaskCommittableManager中的CommitRequest都得到处理
// 如果endInput为false,上面的逻辑只执行一次
// SubTaskCommittableManager是否finished判断条件为numExpectedCommittables - (numDrained + numFailed)是否为0
// 即CommittableSummary中包含的所有committable数量等于已完成提交的committable数量加上提交失败的数量,会被认为是finished状态
} while (!committableCollector.isFinished() && endInput);
// 如果commitAndEmit之后CommittableCollector中还有没得到处理的CommitRequest,schedule一个定时器
// 1000ms之后重新执行commitAndEmitCheckpoints
if (!committableCollector.isFinished()) {
// if not endInput, we can schedule retrying later
retryWithDelay();
}
}
继续分析commitAndEmit
方法,内容如下:
private void commitAndEmit(CommittableManager<CommT> committableManager, boolean fullyReceived)
throws IOException, InterruptedException {
// 调用committableManager的commit方法,执行最终提交
Collection<CommittableWithLineage<CommT>> committed =
committableManager.commit(fullyReceived, committer);
// 如果sink为WithPostCommitTopology类型,emitDownstream为true
// 这个类型的sink允许高级用户在Committer之后实现一些自定义的执行计划
// 如果emitDownstream为true,下游可以接收到committableSummary和committable信息
if (emitDownstream && !committed.isEmpty()) {
output.collect(new StreamRecord<>(committableManager.getSummary()));
for (CommittableWithLineage<CommT> committable : committed) {
output.collect(new StreamRecord<>(committable));
}
}
}
提交的逻辑位于CheckpointCommittableManagerImpl
的commit
方法,分析如下:
@Override
public Collection<CommittableWithLineage<CommT>> commit(
boolean fullyReceived, Committer<CommT> committer)
throws IOException, InterruptedException {
// 过滤出所有的pendingRequest(状态不是committed和failed的request)
// 如果fullyReceived为true,过滤时候先过滤出已接收到所有CommitRequest的subtasksCommittableManagers
// (只提交这些subtasksCommittableManagers)
// (已接收到所有CommitRequest即requests.size() + numDrained + numFailed = numExpectedCommittables)
// 再拿出这些subtasksCommittableManagers中的pendingRequest
Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(fullyReceived);
// 将所有的request状态更改为RECEIVED
requests.forEach(CommitRequestImpl::setSelected);
// 使用committer提交这些pendingRequest
committer.commit(new ArrayList<>(requests));
// 将状态修改为committed
requests.forEach(CommitRequestImpl::setCommittedIfNoError);
// 返回所有成功提交的request,将这些request从SubtaskCommittableManager的requests队列中移除
return drainFinished();
}
最后,KafkaCommitter
的commit
方法将一系列(存放在集合内的)CommitRequest
提交。代码如下:
@Override
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
throws IOException, InterruptedException {
// 遍历这些request
for (CommitRequest<KafkaCommittable> request : requests) {
// 获取committable
final KafkaCommittable committable = request.getCommittable();
// 获取committable中的事务ID
final String transactionalId = committable.getTransactionalId();
LOG.debug("Committing Kafka transaction {}", transactionalId);
// 获取FlinkKafkaInternalProducer
// 它被包装在了Recyclable中
// Recyclable是带有资源回收逻辑对象抽象出的接口,内部维护了对象本身还有资源回收逻辑recycler
// Recyclable接口实现了Closable接口,在资源对象关闭的时候调用recycler将其回收
// FlinkKafkaInternalProducer继承了KafkaProducer,拓展了Kafka对事务的支持
Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =
committable.getProducer();
FlinkKafkaInternalProducer<?, ?> producer;
try {
// 从recyclable中获取FlinkKafkaInternalProducer
producer =
recyclable
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
// 提交事务
producer.commitTransaction();
// 调用flush
producer.flush();
// 如果FlinkKafkaInternalProducer没有被回收,调用回收逻辑(将producer加入producer池中)
recyclable.ifPresent(Recyclable::close);
} catch (RetriableException e) {
// 遇到这个错误,表示重试有可能成功
LOG.warn(
"Encountered retriable exception while committing {}.", transactionalId, e);
// 稍后重试这个request
request.retryLater();
} catch (ProducerFencedException e) {
// 这个错误表示同一个transation ID被不同的KafkaProducer持有,或者是事务协调器等待producer事务状态更新超时(transaction.timeout.ms配置项)
// initTransaction has been called on this transaction before
LOG.error(
"Unable to commit transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' (this is"
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
request,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
KafkaSink.class.getSimpleName(),
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
recyclable.ifPresent(Recyclable::close);
// 由于已知原因提交失败
request.signalFailedWithKnownReason(e);
} catch (InvalidTxnStateException e) {
// This exception only occurs when aborting after a commit or vice versa.
// It does not appear on double commits or double aborts.
// 该错误仅在提交后终止(abort)或者终止后提交发生,此时事务状态会发生异常
LOG.error(
"Unable to commit transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (UnknownProducerIdException e) {
// KAFKA-9310 bug造成,建议升级kafka到2.5版本以上
LOG.error(
"Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (Exception e) {
// 其他类型错误统一调用signalFailedWithUnknownReason方法,表示错误原因未知
LOG.error(
"Transaction ({}) encountered error and data has been potentially lost.",
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithUnknownReason(e);
}
}
}
到这里为止KafkaSink数据写入和两阶段提交的逻辑分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论