Flink 源码之 KafkaSink

作者: AlienPaul | 来源:发表于2022-11-03 15:08 被阅读0次

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

本篇接上篇Flink 源码之 KafkaSource,详细分析新Sink架构,以及新架构下KafkaSink的实现逻辑。

KafkaSink创建和使用

我们先从KafkaSink的使用开始分析。新的KafkaSink使用起来不是很复杂。参见官网给出的示例代码如下:

DataStream<String> stream = ...;

// 使用builder方式创建
KafkaSink<String> sink = KafkaSink.<String>builder()
// 配置Kafka broker
        .setBootstrapServers(brokers)
// 配置RecordSerializer,定义如何转换Flink内部数据类型到Kafka的ProducerRecord类型
// 需要指定topic名称和key/value的序列化器
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
// 配置数据投送语义保证,至少一次,精准一次投送等
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

// 指定数据流的sink为KafkaSink
stream.sinkTo(sink);

如上面例子所述,sink主要的配置为Kafka的bootstrap server,序列化器和DeliveryGuarantee(数据投送保障)。

其中DeliveryGuarantee具有三个选项。分别为:

  • EXACTLY_ONCE: 精准一次投送。这是最严格,最理想的数据投送保证。数据不丢失不重复。
  • AT_LEAST_ONCE: 至少一次投送。数据保证不丢失,但可能会重复。
  • NONE: 无任何额外机制保证。数据有可能丢失或者重复。

几个重要的接口/角色

接下来开始源代码分析章节。新Sink架构中有几个比较重要的接口。下面分别介绍下这些接口和KafkaSink中的相关实现。

TwoPhaseCommittingSink

Two Phase Commit即两阶段提交,用于精准一次投送语义。两阶段提交分为预提交和最终提交两步。其中预提交在sink进行checkoint的时候时候进行,这时候数据虽然已经发送到外部系统,但是没有最终提交,这部分数据对于外部系统是不可见的。最终提交在Flink所有的算子checkpoint都成功之后调用,将预提交的数据真正提交,在这之后外部系统能够看到预提交的数据。整个提交过程完成。通过这正机制,Flink可以保证数据不丢失不重复。本人分析过老版本Flink两阶段提交,具体内容参见 Flink 源码之两阶段提交

TwoPhaseCommittingSink包含支持预提交的PrecommittingSinkWriter和支持最终提交的Committer。接口代码和分析如下所示:

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {

    /**
     * Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    // 创建PrecommittingSinkWriter。这个writer实现了部分两阶段提交的功能,实现类要求必须实现预提交(prepareCommit)方法
    // 接口定义在下面
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    // 创建Committer。Committer用来正式提交数据。正式提交之后这些数据在Kafka中可见
    Committer<CommT> createCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    // 返回提交物(管理Kafka提交有关的元数据,即KafkaCommittable)的序列化器
    // 这个序列化器中包含版本号,随着序列化的数据一起存储,这样在序列化器迭代升级之后可以通过版本号得知该数据是通过哪个版本的序列化器序列化的
    SimpleVersionedSerializer<CommT> getCommittableSerializer();

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    // 用来定义两阶段提交第一步预提交(prepareCommit)方法
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        /**
         * Prepares for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the two-phase commit protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        // 这个方法需要在SinkWriter的flush方法之后和snapshotState(生成快照)之前调用
        // flush确保所有的数据都已输出,不丢失数据。snapshotState的时候会KafkaWriter会获取新的producer开启新的事务
        // 所以需要在这个时候执行预提交
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}

KafkaSink实现了TwoPhaseCommittingSink。Kafka负责预提交和最终提交的对象分别为KafkaWriterKafkaCommitter。后面分析提交流程的时候再介绍有关的方法。

Committer

CommitterTwoPhaseCommittingSink接口中用来做最终提交的接口。它将PrecommittingSinkWriter中预提交的内容真正的提交上去。

Committer接口内容如下:

@PublicEvolving
public interface Committer<CommT> extends AutoCloseable {
    /**
     * Commit the given list of {@link CommT}.
     *
     * @param committables A list of commit requests staged by the sink writer.
     * @throws IOException for reasons that may yield a complete restart of the job.
     */
    // 将可提交物(committables,对应泛型CommT)提交
    // 这里的提交为两阶段提交的最终提交
    void commit(Collection<CommitRequest<CommT>> committables)
            throws IOException, InterruptedException;

    /**
     * A request to commit a specific committable.
     *
     * @param <CommT>
     */
    // 用来提交某个committable的请求
    @PublicEvolving
    interface CommitRequest<CommT> {

        /** Returns the committable. */
        // 获取request中携带的committable
        CommT getCommittable();

        /**
         * Returns how many times this particular committable has been retried. Starts at 0 for the
         * first attempt.
         */
        // 返回committable提交操作已重试次数
        int getNumberOfRetries();

        /**
         * The commit failed for known reason and should not be retried.
         *
         * <p>Currently calling this method only logs the error, discards the comittable and
         * continues. In the future the behaviour might be configurable.
         */
        // 提交失败原因已知的时候调用
        void signalFailedWithKnownReason(Throwable t);

        /**
         * The commit failed for unknown reason and should not be retried.
         *
         * <p>Currently calling this method fails the job. In the future the behaviour might be
         * configurable.
         */
        // 提交失败原因未知的时候调用
        void signalFailedWithUnknownReason(Throwable t);

        /**
         * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
         * permanently fail after reaching that maximum. Else the committable will be retried as
         * long as this method is invoked after each attempt.
         */
        // 尝试稍后重新提交。如果设置有重试次数上限,在达到重试次数上限之后会一直失败
        void retryLater();

        /**
         * Updates the underlying committable and retries later (see {@link #retryLater()} for a
         * description). This method can be used if a committable partially succeeded.
         */
        // 和上面方法基本相同,区别是再次尝试提交之前会更新committable
        void updateAndRetryLater(CommT committable);

        /**
         * Signals that a committable is skipped as it was committed already in a previous run.
         * Using this method is optional but eases bookkeeping and debugging. It also serves as a
         * code documentation for the branches dealing with recovery.
         */
        // 表示committable已经成功提交过了
        void signalAlreadyCommitted();
    }
}

KafkaCommitter实现了此Committer接口。

StatefulSink

顾名思义,即有状态的Sink。这个Sink支持创建和从checkpoint保存的writerState中恢复出StatefulSinkWriterStatefulSinkWriter除了继承SinkWriter的所有特性外,还要求必须实现snapshotState逻辑,即必须要支持将自身状态存入checkpoint。接口主要内容和上面描述类似,这里不再贴出源代码。KafkaSink同时实现了TwoPhaseCommittingSinkStatefulSink

KafkaCommittable

在Kafka针对新Sink接口的实现中,KafkaCommittableCommitter接口中的可提交物(committable)。

KafkaCommittable包含如下信息:

// kafkaProducer的ID
private final long producerId;
// kafka producer对应的年代信息
private final short epoch;
// 事务id,这个是最关键的
private final String transactionalId;
// 负责提交这个事务的FlinkKafkaInternalProducer
@Nullable private Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer;

其中producer id和epoch用于确保消息的幂等性。开启幂等性之后,kakfaProducer向broker发送数据之前会先确认自己是否已经获取到了producer id。每个producer都会分配唯一的producer id。epoch为producer年代信息,用来kafka producer fencing,即解决僵尸实例问题。Kafka broker会拒绝相同transactionalId但是epoch更旧的producer的事务性写入。

transactionalId用来保存kafka事务id。事务id是根据Flink内部checkpoint id生成的。每次checkpoint对应不同的事务id。生成逻辑后面代码中有分析。

最后是被Recyclable类包装起来的FlinkKafkaInternalProducerRecyclable意思为可回收利用的。实际上在后面的分析中不难发现Flink KafkaWriter中维护了一个producer池。producer在使用完毕之后,会被Recyclable回收再加入到producer池中。详细的分析请参见后面章节。

CommittableCollector

CommittableCollector用于暂存commit进度。内部维护了两层结构。第一层为checkpointCheckpointCommittableManagerImpl的对应关系,每个CheckpointCommittableManagerImpl中维护了subtask id和SubtaskCommittableManager的对应关系,即第二层结构。SubtaskCommittableManager中包含了CommitRequestImpl。所以说,CommittableCollector根据checkpoint id和subtask id,可以找出关联的CommitRequestImpl,即关联的committable。

数据写入Kafka和提交流程

接下来我们开始分析数据写入流程和两阶段提交流程。

数据写入流程

数据写入的逻辑位于SinkOperatorprocessElement方法。代码如下所示:

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
    // context封装了当前处理的StreamRecord,watermark和获取StreamRecord timestamp的逻辑
    context.element = element;
    // 调用SinkWriter的write方法
    sinkWriter.write(element.getValue(), context);
}

在使用KafkaSink的场景中,sinkWriterKafkaWriter。我们查看下它的write方法集体做了什么。

@Override
public void write(IN element, Context context) throws IOException {
    // 将数据(element),kafkaSinkContext(包含subtask的id,subtask数量和kafkaProducer的配置信息)序列化为字节数组,构造成Kafka的ProducerRecord
    // 这个序列化器是创建KafkaSink的时候配置的
    final ProducerRecord<byte[], byte[]> record =
            recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
    // 发送数据到kafka
    currentProducer.send(record, deliveryCallback);
    // 已发送数据条数监控计数器加1
    numRecordsSendCounter.inc();
}

两阶段提交——预提交

Kafka预提交的执行的时机为当前operator需要checkpoint,在向下游发送checkpoint barrier之前。这个时间点调用的方法为SinkWriterOperator::prepareSnapshotPreBarrier

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
    super.prepareSnapshotPreBarrier(checkpointId);
    if (!endOfInput) {
        // 如果数据输入没有结束,先flush writer
        sinkWriter.flush(false);
        // 调用emitCommittables方法
        emitCommittables(checkpointId);
    }
    // no records are expected to emit after endOfInput
}

继续分析emitCommittables方法。

private void emitCommittables(Long checkpointId) throws IOException, InterruptedException {
    // 是否向下游发送
    // 如果sink是TwoPhaseCommittingSink类型,emitDownstream为true
    // 这个if分支用来兼容老版本的sink接口
    if (!emitDownstream) {
        // To support SinkV1 topologies with only a writer we have to call prepareCommit
        // although no committables are forwarded
        if (sinkWriter instanceof PrecommittingSinkWriter) {
            ((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
        }
        return;
    }
    // 调用prepareCommit方法,两阶段提交的预提交方法
    Collection<CommT> committables =
            ((PrecommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
    // 获取StreamingRntimeContext,该对象包含了流作业运行时信息
    StreamingRuntimeContext runtimeContext = getRuntimeContext();
    // 如果此subTask的id
    final int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
    // 获取subTask的并行度
    final int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();

    // Emit only committable summary if there are legacy committables
    // 如果使用的是老的sink API,发送统计信息到下游
    // legacyCommittables在状态恢复的时候被赋值
    if (!legacyCommittables.isEmpty()) {
        checkState(checkpointId > InitContext.INITIAL_CHECKPOINT_ID);
        emit(
                indexOfThisSubtask,
                numberOfParallelSubtasks,
                InitContext.INITIAL_CHECKPOINT_ID,
                legacyCommittables);
        legacyCommittables.clear();
    }
    // 发送这些committable内容到下游
    emit(indexOfThisSubtask, numberOfParallelSubtasks, checkpointId, committables);
}

在使用Kafka的场景下,sinkWriterKafkaWriter,它的prepareCommit方法内容和分析如下:

@Override
public Collection<KafkaCommittable> prepareCommit() {
    // 如果配置为精准一次投送,需要返回预提交阶段需要处理的committable
    // 其他情况不需要两阶段提交,故这里返回空集合
    if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
        final List<KafkaCommittable> committables =
                Collections.singletonList(
                        KafkaCommittable.of(currentProducer, producerPool::add));
        LOG.debug("Committing {} committables.", committables);
        return committables;
    }
    return Collections.emptyList();
}

这里我们穿插进来一段KafkaWriter和KafkaProducer相关的代码分析。KafkaWriter中有一个currentProducer变量,字面意思是当前使用的producer,言外之意是producer会变化。在这里不好理解为什么这样设计。我们先分析代码,后面展开的时候会为大家解开疑惑。

currentProducer的创建逻辑位于KafkaWriter的构造方法。我们分析下有关片段:

if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
    // 如果是精准一次投送,终止还未提交的事务
    abortLingeringTransactions(
            checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1);
    // 获取一个FlinkKafkaInternalProducer,将checkpoint id增加1
    // EXACTLY_ONCE模式KafkaWriter需要维护一个producer池,这个池的作用后面分析
    this.currentProducer = getTransactionalProducer(lastCheckpointId + 1);
    // 开启事务
    this.currentProducer.beginTransaction();
} else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
        || deliveryGuarantee == DeliveryGuarantee.NONE) {
    // 如果是至少一次投送或者未配置(NONE),创建出一个新的FlinkKafkaInternalProducer,不指定transactionalId
    this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
    closer.register(this.currentProducer);
    initKafkaMetrics(this.currentProducer);
} else {
    throw new UnsupportedOperationException(
            "Unsupported Kafka writer semantic " + this.deliveryGuarantee);
}

这里需要注意。大家已经看到上面分析在创建KafkaWriter的时候,如果配置了精准一次投送,KafkaWriter会为Producer开启事务。在Commiter最终提交的时候会提交事务。不可能只有创建KafkaWriter的时候才开启事务,开启新事物的地方一定不止这一处。实际上currentProducerKafkaWriter checkpoint的时候(snapshotState方法中)会变更,它不在数据提交流程中,所以这里将它单独提出来分析。snapshotState的时候从producerPool中获取一个新的producer。传入的checkpoint id自增1,意味着这个新producer的事务对应下一个checkpoint。总结一下,在精准一次投送场景中,KafkaWriter使用了Producer池,从池中获取一个producer,指定它的transactionalId(依据checkpoint id分配),开启事务后通过这个producer send数据。等到checkpoint的时候将这个producer和对应的checkpoint,transactionalId等信息保存起来(在KafkaCommittable中保存,后面分析)。然后从池中拿一个新的producer,开启新的事务继续重复以上过程。上面被保存状态的producer等到所有operator都checkpoint成功,即CheckpointCoordinator发送notifyCheckpointComplete的时候提交事务(两阶段提交最终提交),然后回收这个producer到producer池,以便下个checkpoint的时候复用。

下面分析checkpoint的逻辑。snapshotState方法印证了上面所述的部分内容:

@Override
public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
   if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
       // 如果使用精准一次投送,从producer池中拿一个新的producer,checkpoint自增1
       currentProducer = getTransactionalProducer(checkpointId + 1);
       // 开启新事务
       currentProducer.beginTransaction();
   }
   return ImmutableList.of(kafkaWriterState);
}

继续分析获取producer的getTransactionalProducer方法。代码分析如下:

private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long checkpointId) {
    checkState(
            checkpointId > lastCheckpointId,
            "Expected %s > %s",
            checkpointId,
            lastCheckpointId);
    FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
    // in case checkpoints have been aborted, Flink would create non-consecutive transaction ids
    // this loop ensures that all gaps are filled with initialized (empty) transactions
    // 如果前面的checkpoint被终止过,这里面checkpoint id可能和上一次的checkpoint id不连续
    // 这个for循环的目的就是确保把把checkpoint id之间的空当使用空的事务填充了
    for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
        // 根据checkpoint id 构建transactionalId
        // 构建的逻辑为:
        //        transactionalIdPrefix
        //        + TRANSACTIONAL_ID_DELIMITER
        //        + subtaskId
        //        + TRANSACTIONAL_ID_DELIMITER
        //        + checkpointOffset
        String transactionalId =
                TransactionalIdFactory.buildTransactionalId(
                        transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
        // 获取或者创建一个producer
        producer = getOrCreateTransactionalProducer(transactionalId);
    }
    // 更新lastCheckpointId
    this.lastCheckpointId = checkpointId;
    assert producer != null;
    LOG.info("Created new transactional producer {}", producer.getTransactionalId());
    // 返回这个producer
    return producer;
}

继续分析获取或创建producer的方法:getOrCreateTransactionalProducer。该方法从producer池(producerPool)中获取一个producer,如果池中没有可用的producer则创建一个新的。代码和分析如下所示:

private FlinkKafkaInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(
        String transactionalId) {
    // 先从producer池中拿一个producer
    FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
    // 如果没拿到
    if (producer == null) {
        // 创建一个新的producer
        producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
        // 加入到closer中,当KafkaWriter关闭的时候可以统一关闭所有的producer
        closer.register(producer);
        // 初始化事务
        // 设置producer的transactionalId之后必须要调用initTransactions方法
        producer.initTransactions();
        // 初始化监控信息
        initKafkaMetrics(producer);
    } else {
        // 如果从池中拿到了producer
        // 配置transactionId并且initTransactions
        producer.initTransactionId(transactionalId);
    }
    // 返回producer
    return producer;
}

KafkaWriter维护的KafkaProducerFlinkKafkaInternalProducer类型。它继承自KafkaProducer,因此能够访问到KafkaProducer的一些内部变量,从而增强了对Kafka事务的管理。它内部通过反射拿到org.apache.kafka.clients.producer.internals.TransactionManager,能够访问修改它的私有变量producerIdAndEpochtransactionalId等,增强了事务功能。此外FlinkKafkaInternalProducer内部还有一个inTransaction变量用来记录目前状态是否处于事务当中(调用了beginTransaction但还未提交)。

最后我们看下Flink如何将Producer信息封装为Committer的可提交物KafkaCommittable的方法。这段逻辑位于KafkaCommittable::of中。内容如下所示:

public static <K, V> KafkaCommittable of(
        FlinkKafkaInternalProducer<K, V> producer,
        Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
    return new KafkaCommittable(
            producer.getProducerId(),
            producer.getEpoch(),
            producer.getTransactionalId(),
            new Recyclable<>(producer, recycler));
}

KafkaCommittable保存了producer的producerID,epoch和transactionalId信息,还有这个producer本身。这个producer被Recyclable(可循环利用的)类包装起来,一旦producer使用完了,会调用recycler将其回收。实际调用的时候(KafkaWriterprepareCommit方法),recyclerproducerPool::add,意味着producer使用完毕之后会添加会producer队列。印证了上面对producer的分析。


上面我们穿插进行了producer有关的代码分析。接下来回到SinkWriterOperatoremitCommittables方法。这个方法最终将committable相关信息发送到下游。

我们分析往下游发送committable相关信息的emit方法。

private void emit(
        int indexOfThisSubtask,
        int numberOfParallelSubtasks,
        long checkpointId,
        Collection<CommT> committables) {
    output.collect(
            new StreamRecord<>(
                    new CommittableSummary<>(
                            indexOfThisSubtask,
                            numberOfParallelSubtasks,
                            checkpointId,
                            committables.size(),
                            committables.size(),
                            0)));
    for (CommT committable : committables) {
        output.collect(
                new StreamRecord<>(
                        new CommittableWithLineage<>(
                                committable, checkpointId, indexOfThisSubtask)));
    }
}

这个方法接收如下参数:

  • 当前subtask的index
  • subtask的并行度
  • 当前checkpoint id
  • committables集合

然后将信息封装为CommittableSummaryCommittableWithLineage发往下游。其中CommittableSummary为committable汇总信息,用来描述提交的进度,包含当前subtask的id,subtask数量,checkpoint id,committable数量,待提交的committable数量和提交失败的数量。CommittableWithLineage用于向Committer传递committable元数据。

这里需要告诉大家CommitterOperatorSinkWriterOperator的下游(至于为什么,参见SinkTransformationTranslator::addCommittingTopology方法)。

大家可能会问下游是如何处理这些信息的呢?接下来我们分析下游CommitterOperatorprocessElement方法。

@Override
public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
    // committableCollector用来保管各个subtask和checkpoint的committable提交状态
    // commit信息分为两种:CommittableSummary和CommittableWithLineage
    committableCollector.addMessage(element.getValue());

    // in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the
    // committables
    // 获取checkpoint id
    OptionalLong checkpointId = element.getValue().getCheckpointId();
    // 如果获取到的checkpoint id小于等于最近一次的checkpoint id,说明这些committable的内容需要被最终提交
    // 调用commitAndEmitCheckpoints方法
    // 这个方法在最终提交的时候也用到了,放到下一节分析
    if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) {
        commitAndEmitCheckpoints();
    }
}

CommittableCollectoraddMessage方法将committable信息暂存起来,逻辑如下:

public void addMessage(CommittableMessage<CommT> message) {
    if (message instanceof CommittableSummary) {
        addSummary((CommittableSummary<CommT>) message);
    } else if (message instanceof CommittableWithLineage) {
        addCommittable((CommittableWithLineage<CommT>) message);
    }
}

上面提到的CommittableSummaryCommittableWithLineage都是CommittableMessage的接口的实现类。这个方法区分开这两种类型消息,分别调用addSummaryaddCommittable方法。

private void addSummary(CommittableSummary<CommT> summary) {
    checkpointCommittables
            .computeIfAbsent(
                    summary.getCheckpointId().orElse(EOI),
                    key ->
                            new CheckpointCommittableManagerImpl<>(
                                    subtaskId,
                                    numberOfSubtasks,
                                    summary.getCheckpointId().orElse(EOI)))
            .upsertSummary(summary);
}

checkpointCommittables维护了checkpoint id和CheckpointCommittableManagerImpl的对应关系。CheckpointCommittableManagerImplCheckpointCommittableManager的实现类。CheckpointCommittableManager又继承自CommittableManager,它是一个包装类,用来封装提交committable的逻辑。CheckpointCommittableManager多了一个可以返回checkpoint id的方法。在addSummary方法中,创建出checkpoint id 和此次checkpoint需要提交的committable的管理器CheckpointCommittableManagerImplCheckpointCommittableManagerImpl内部保存一系列subtask id和SubtaskCommittableManager的对应关系。简单来说committable信息的维护分为两个层级,先按照checkpoint id(即按照committable在哪一次checkpoint提交)分类,然后再按照committable从属的subtask id分类。上面的方法先找到或创建出当前checkpoint对应的CheckpointCommittableManagerImpl,然后将CommittableSummary信息存放在它所属的subtask id对应的SubtaskCommittableManager中。

upsertSummary方法正是这个逻辑,代码如下:

void upsertSummary(CommittableSummary<CommT> summary) {
    SubtaskCommittableManager<CommT> existing =
            subtasksCommittableManagers.putIfAbsent(
                    summary.getSubtaskId(),
                    new SubtaskCommittableManager<>(
                            summary.getNumberOfCommittables(),
                            subtaskId,
                            summary.getCheckpointId().isPresent()
                                    ? summary.getCheckpointId().getAsLong()
                                    : null));
    if (existing != null) {
        throw new UnsupportedOperationException(
                "Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920");
    }
}

我们分析完了addSummary方法,接下来继续addCommittable方法。addCommittable首先找出commitable对应的checkpoint id,然后将committable添加到这个checkpoint id对应的CheckpointCommittableManagerImpl中。

private void addCommittable(CommittableWithLineage<CommT> committable) {
    getCheckpointCommittables(committable).addCommittable(committable);
}

private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(
        CommittableMessage<CommT> committable) {
    CheckpointCommittableManagerImpl<CommT> committables =
            this.checkpointCommittables.get(committable.getCheckpointId().orElse(EOI));
    return checkNotNull(committables, "Unknown checkpoint for %s", committable);
}

我们看下CheckpointCommittableManagerImpladdCommittable方法。它又获取这个committable所属subtask对应的SubtaskCommittableManager,调用它的add方法:

void addCommittable(CommittableWithLineage<CommT> committable) {
    getSubtaskCommittableManager(committable.getSubtaskId()).add(committable);
}

终于我们一路跟踪代码到SubtaskCommittableManager,它的add方法内容如下:

void add(CommittableWithLineage<CommT> committable) {
    add(committable.getCommittable());
}

void add(CommT committable) {
    checkState(requests.size() < numExpectedCommittables, "Already received all committables.");
    requests.add(new CommitRequestImpl<>(committable));
}

SubtaskCommittableManager内部有一个CommitRequestImpl组成的双向队列。CommitRequestImpl实现了Committer.CommitRequest<CommT>。前面分析过CommitRequest是用于提交某个committable的请求。这些CommitRequest用于在两阶段提交最终提交的时候被Committer读取出来并最终提交上去。提交逻辑在下一节最终提交分析。

最终需要提交内容的元数据保存在了CommitRequest。这个类有3个成员变量:

// 保存committable内容
private CommT committable;
// 记录重试次数
private int numRetries;
// 保存当前CommitRequest的状态
private CommitRequestState state;

CommitRequestState有如下4个值:

  • RECEIVED: 表示刚接收到/创建出Request,Request还没有得到处理。新创建出的CommitRequest为RECEIVED状态。
  • RETRY: 表示该Request稍后重试提交。
  • FAILED: 表示Request提交失败。
  • COMMITTED: Request提交成功。

两阶段提交——最终提交

当Flink各个operator都成功完成checkpoint之后,CheckpointCoordinator接下来向各个operator发送checkpoint成功的信号,调用它们的notifyCheckpointComplete方法。只有在这个时候才能确保所有operator都运行正常,数据没有丢失,因此可以执行两阶段提交的最终提交这一步,让数据在kafka集群中对外可见。

CommitterOperatornotifyCheckpointComplete方法分析如下:

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
    super.notifyCheckpointComplete(checkpointId);
    if (endInput) {
        // This is the final checkpoint, all committables should be committed
        // 如果数据源输入结束,标记上一个完成的checkpoint id为Long.MAX_VALUE,意味着所有的committable都应该提交
        lastCompletedCheckpointId = Long.MAX_VALUE;
    } else {
        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
    }
    // 调用commitAndEmitCheckpoints
    commitAndEmitCheckpoints();
}

继续分析commitAndEmitCheckpoints方法。

private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
    do {
        // 获取并遍历所有的lastCompletedCheckpointId之前的checkpoint id对应的CheckpointCommittableManager
        // 需要将它们之中存放的CommitRequest提交
        for (CheckpointCommittableManager<CommT> manager :
                committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
            // wait for all committables of the current manager before submission
            // 遍历到checkpoint id为最新的checkpoint id的manager的时候,认为committable已完全接收
            // getCheckpointCommittablesUpTo返回的是NavigableMap,是一个有序的map,遍历到上面所述的时候说明更早的checkpoint对应的CheckpointCommittableManager已遍历完毕,所以可以认为committable已完全接收
            boolean fullyReceived =
                    !endInput && manager.getCheckpointId() == lastCompletedCheckpointId;
            // 提交这些committable
            commitAndEmit(manager, fullyReceived);
        }
        // !committableCollector.isFinished() indicates that we should retry
        // Retry should be done here if this is a final checkpoint (indicated by endInput)
        // WARN: this is an endless retry, may make the job stuck while finishing
        // 如果数据输入结束(endInput为true),一直循环直到所有的SubTaskCommittableManager中的CommitRequest都得到处理
        // 如果endInput为false,上面的逻辑只执行一次
        // SubTaskCommittableManager是否finished判断条件为numExpectedCommittables - (numDrained + numFailed)是否为0
        // 即CommittableSummary中包含的所有committable数量等于已完成提交的committable数量加上提交失败的数量,会被认为是finished状态
    } while (!committableCollector.isFinished() && endInput);

    // 如果commitAndEmit之后CommittableCollector中还有没得到处理的CommitRequest,schedule一个定时器
    // 1000ms之后重新执行commitAndEmitCheckpoints
    if (!committableCollector.isFinished()) {
        // if not endInput, we can schedule retrying later
        retryWithDelay();
    }
}

继续分析commitAndEmit方法,内容如下:

private void commitAndEmit(CommittableManager<CommT> committableManager, boolean fullyReceived)
        throws IOException, InterruptedException {
    // 调用committableManager的commit方法,执行最终提交
    Collection<CommittableWithLineage<CommT>> committed =
            committableManager.commit(fullyReceived, committer);
    // 如果sink为WithPostCommitTopology类型,emitDownstream为true
    // 这个类型的sink允许高级用户在Committer之后实现一些自定义的执行计划
    // 如果emitDownstream为true,下游可以接收到committableSummary和committable信息
    if (emitDownstream && !committed.isEmpty()) {
        output.collect(new StreamRecord<>(committableManager.getSummary()));
        for (CommittableWithLineage<CommT> committable : committed) {
            output.collect(new StreamRecord<>(committable));
        }
    }
}

提交的逻辑位于CheckpointCommittableManagerImplcommit方法,分析如下:

@Override
public Collection<CommittableWithLineage<CommT>> commit(
        boolean fullyReceived, Committer<CommT> committer)
        throws IOException, InterruptedException {
    // 过滤出所有的pendingRequest(状态不是committed和failed的request)
    // 如果fullyReceived为true,过滤时候先过滤出已接收到所有CommitRequest的subtasksCommittableManagers
    // (只提交这些subtasksCommittableManagers)
    // (已接收到所有CommitRequest即requests.size() + numDrained + numFailed = numExpectedCommittables)
    // 再拿出这些subtasksCommittableManagers中的pendingRequest
    Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(fullyReceived);
    // 将所有的request状态更改为RECEIVED
    requests.forEach(CommitRequestImpl::setSelected);
    // 使用committer提交这些pendingRequest
    committer.commit(new ArrayList<>(requests));
    // 将状态修改为committed
    requests.forEach(CommitRequestImpl::setCommittedIfNoError);
    // 返回所有成功提交的request,将这些request从SubtaskCommittableManager的requests队列中移除
    return drainFinished();
}

最后,KafkaCommittercommit方法将一系列(存放在集合内的)CommitRequest提交。代码如下:

@Override
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
        throws IOException, InterruptedException {
    // 遍历这些request
    for (CommitRequest<KafkaCommittable> request : requests) {
        // 获取committable
        final KafkaCommittable committable = request.getCommittable();
        // 获取committable中的事务ID
        final String transactionalId = committable.getTransactionalId();
        LOG.debug("Committing Kafka transaction {}", transactionalId);
        // 获取FlinkKafkaInternalProducer
        // 它被包装在了Recyclable中
        // Recyclable是带有资源回收逻辑对象抽象出的接口,内部维护了对象本身还有资源回收逻辑recycler
        // Recyclable接口实现了Closable接口,在资源对象关闭的时候调用recycler将其回收
        // FlinkKafkaInternalProducer继承了KafkaProducer,拓展了Kafka对事务的支持
        Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =
                committable.getProducer();
        FlinkKafkaInternalProducer<?, ?> producer;
        try {
            // 从recyclable中获取FlinkKafkaInternalProducer
            producer =
                    recyclable
                            .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
                            .orElseGet(() -> getRecoveryProducer(committable));
            // 提交事务
            producer.commitTransaction();
            // 调用flush
            producer.flush();
            // 如果FlinkKafkaInternalProducer没有被回收,调用回收逻辑(将producer加入producer池中)
            recyclable.ifPresent(Recyclable::close);
        } catch (RetriableException e) {
            // 遇到这个错误,表示重试有可能成功
            LOG.warn(
                    "Encountered retriable exception while committing {}.", transactionalId, e);
            // 稍后重试这个request
            request.retryLater();
        } catch (ProducerFencedException e) {
            // 这个错误表示同一个transation ID被不同的KafkaProducer持有,或者是事务协调器等待producer事务状态更新超时(transaction.timeout.ms配置项)
            // initTransaction has been called on this transaction before
            LOG.error(
                    "Unable to commit transaction ({}) because its producer is already fenced."
                            + " This means that you either have a different producer with the same '{}' (this is"
                            + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
                            + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
                            + " please consult the Flink documentation for more details.",
                    request,
                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    KafkaSink.class.getSimpleName(),
                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                    kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                    e);
            recyclable.ifPresent(Recyclable::close);
            // 由于已知原因提交失败
            request.signalFailedWithKnownReason(e);
        } catch (InvalidTxnStateException e) {
            // This exception only occurs when aborting after a commit or vice versa.
            // It does not appear on double commits or double aborts.
            // 该错误仅在提交后终止(abort)或者终止后提交发生,此时事务状态会发生异常
            LOG.error(
                    "Unable to commit transaction ({}) because it's in an invalid state. "
                            + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
                    request,
                    e);
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (UnknownProducerIdException e) {
            // KAFKA-9310 bug造成,建议升级kafka到2.5版本以上
            LOG.error(
                    "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
                    request,
                    e);
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (Exception e) {
            // 其他类型错误统一调用signalFailedWithUnknownReason方法,表示错误原因未知
            LOG.error(
                    "Transaction ({}) encountered error and data has been potentially lost.",
                    request,
                    e);
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithUnknownReason(e);
        }
    }
}

到这里为止KafkaSink数据写入和两阶段提交的逻辑分析完毕。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

相关文章

网友评论

    本文标题:Flink 源码之 KafkaSink

    本文链接:https://www.haomeiwen.com/subject/ngletdtx.html