美文网首页玩转大数据
Flink 源码之两阶段提交

Flink 源码之两阶段提交

作者: AlienPaul | 来源:发表于2019-12-11 11:38 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    两阶段提交协议

    两阶段提交协议针对Flink的Sink。要求下游的系统支持事务,或者是幂等性。两阶段提交是指如下两个阶段:

    • preCommit: 预提交。在Sink进行snapshot操作的时候调用此方法。
    • commit: 真正的提交操作。当系统中各个operator的checkpoint操作都成功之后,JobManager会通知各个operator checkpoint操作已完成。此时会调用该方法。

    TwoPhaseCommitSinkFunction

    该类是实现两阶段提交Sink的父类,封装了两阶段提交的主要逻辑。

    initializeState方法。该方法在CheckpointedFunction接口中定义,在集群中执行的时候调用,用于初始化状态后端。
    该方法主要有以下逻辑:

    1. 获取状态存储变量state。
    2. 提交所有已经执行过preCommit的事务。
    3. 终止所有尚未preCommit的事务。
    4. 创建一个新事务。

    代码如下:

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // when we are restoring state with pendingCommitTransactions, we don't really know whether the
        // transactions were already committed, or whether there was a failure between
        // completing the checkpoint on the master, and notifying the writer here.
    
        // (the common case is actually that is was already committed, the window
        // between the commit on the master and the notification here is very small)
    
        // it is possible to not have any transactions at all if there was a failure before
        // the first completed checkpoint, or in case of a scale-out event, where some of the
        // new task do not have and transactions assigned to check)
    
        // we can have more than one transaction to check in case of a scale-in event, or
        // for the reasons discussed in the 'notifyCheckpointComplete()' method.
    
        // 获取状态存储
        state = context.getOperatorStateStore().getListState(stateDescriptor);
    
        boolean recoveredUserContext = false;
        // 从上一个snapshot恢复完成的时候返回true,如果任务不支持snapshot,永远返回false
        if (context.isRestored()) {
            LOG.info("{} - restoring state", name());
            for (State<TXN, CONTEXT> operatorState : state.get()) {
                userContext = operatorState.getContext();
                // 获取待提交的事务
                // 在snapshotState方法调用preCommit之后,事务会被存储到该列表
                List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
                List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
                for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
                    // If this fails to succeed eventually, there is actually data loss
                    // 恢复并提交这些之前在state中保存的事务
                    recoverAndCommitInternal(recoveredTransaction);
                    handledTransactions.add(recoveredTransaction.handle);
                    LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
                }
    
                {
                    // 获取到尚未preCommit的事务
                    TXN transaction = operatorState.getPendingTransaction().handle;
                    // 恢复并终止该事务
                    recoverAndAbort(transaction);
                    handledTransactions.add(transaction);
                    LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
                }
    
                if (userContext.isPresent()) {
                    finishRecoveringContext(handledTransactions);
                    recoveredUserContext = true;
                }
            }
        }
    
        // if in restore we didn't get any userContext or we are initializing from scratch
        if (!recoveredUserContext) {
            LOG.info("{} - no state to restore", name());
    
            userContext = initializeUserContext();
        }
        this.pendingCommitTransactions.clear();
    
        // 创建一个新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    }
    

    preCommit的调用时机:Sink的snapshotState方法。该方法在Sink保存快照的时候调用。

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction
    
        // 检查确保进行snapshot的时候必须存在事务
        checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
    
        long checkpointId = context.getCheckpointId();
        LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
    
        // 调用preCommit方法
        preCommit(currentTransactionHolder.handle);
        // 在未提交事务列表(pendingCommitTransactions)中记录该事务
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
    
        // 开启新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    
        // 清空state,然后记录当前事务和待提交事务
        state.clear();
        state.add(new State<>(
            this.currentTransactionHolder,
            new ArrayList<>(pendingCommitTransactions.values()),
            userContext));
    }
    

    commit方法的调用时机。notifyCheckpointComplete方法,当所有的operator都checkpoint成功的时候,JobManager会通知各个operator checkpoint过程已完成。此时会调用该方法。

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        // the following scenarios are possible here
        //
        //  (1) there is exactly one transaction from the latest checkpoint that
        //      was triggered and completed. That should be the common case.
        //      Simply commit that transaction in that case.
        //
        //  (2) there are multiple pending transactions because one previous
        //      checkpoint was skipped. That is a rare case, but can happen
        //      for example when:
        //
        //        - the master cannot persist the metadata of the last
        //          checkpoint (temporary outage in the storage system) but
        //          could persist a successive checkpoint (the one notified here)
        //
        //        - other tasks could not persist their status during
        //          the previous checkpoint, but did not trigger a failure because they
        //          could hold onto their state and could successfully persist it in
        //          a successive checkpoint (the one notified here)
        //
        //      In both cases, the prior checkpoint never reach a committed state, but
        //      this checkpoint is always expected to subsume the prior one and cover all
        //      changes since the last successful one. As a consequence, we need to commit
        //      all pending transactions.
        //
        //  (3) Multiple transactions are pending, but the checkpoint complete notification
        //      relates not to the latest. That is possible, because notification messages
        //      can be delayed (in an extreme case till arrive after a succeeding checkpoint
        //      was triggered) and because there can be concurrent overlapping checkpoints
        //      (a new one is started before the previous fully finished).
        //
        // ==> There should never be a case where we have no pending transaction here
        //
        // 获取所有待提交的事务
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
        Throwable firstError = null;
    
        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            // 只提交在checkpointId之前的事务
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }
    
            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
    
            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                // 逐个提交之前preCommit过的事务
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }
    
            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
    
            // 将提交过的事务从待提交事务列表中清除
            pendingTransactionIterator.remove();
        }
    
        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }
    

    FlinkKafkaInternalProducer

    FlinkKafkaInternalProducer为Flink对Kafka Producer的一个封装。
    其中引入了producerClosingLock变量,用于对事务提交,回滚和关闭producer等操作加锁。在kafka 2.3.0之前有一个bug,关闭producer的线程和提交/终止事务的线程会发生死锁。在FlinkKafkaInternalProducer对这些操作手工加锁,避免了此类问题。
    FlinkKafkaInternalProducer还持有一个transactionId。创建的时候会从ProducerConfig配置中获取。
    主要代码如下所示:

    @Override
    public void beginTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.beginTransaction();
        }
    }
    
    @Override
    public void commitTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.commitTransaction();
        }
    }
    
    @Override
    public void abortTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.abortTransaction();
        }
    }
    
    @Override
    public void close() {
        closed = true;
        synchronized (producerClosingLock) {
            kafkaProducer.close();
        }
    }
    

    调用事务的每个方法前先加锁(包括close方法)。防止上述的死锁情况发生。

    FlinkKafkaProducer

    FlinkKafkaProducer实现了TwoPhaseCommitSinkFunction。实现了两阶段提交的主要逻辑。

    beginTransaction 方法。该方法会创建一个新的事务。

    protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
        switch (semantic) {
            case EXACTLY_ONCE:
                // 获取一个支持事务的kafka producer,类型为FlinkKafkaInternalProducer
                FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
                // 开启kafka producer的事务
                producer.beginTransaction();
                // 返回事务的状态,包含有transaction Id
                return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
            case AT_LEAST_ONCE:
            case NONE:
                // Do not create new producer on each beginTransaction() if it is not necessary
                // 获取当前的transaction
                final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
                // 如果当前有事务,返回当前事务对应的kafka producer
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
                }
                // 否则直接返回不支持事务的kafka producer
                return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }
    

    preCommit方法如下所示:

    @Override
    protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
        switch (semantic) {
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                // EXACTLY_ONCE和AT_LEAST_ONCE需要flush
                flush(transaction);
                break;
            // NONE的话不进行任何操作
            case NONE:
                break;
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        checkErroneous();
    }
    

    preCommit方法会调用kafka producer的flush方法,确保producer缓冲区的消息都已经发送至kafka broker。
    flush方法的源码如下:

    private void flush(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
        // 调用kafka producer的flush方法,清空发送队列,flush过程中会阻塞
        if (transaction.producer != null) {
            transaction.producer.flush();
        }
        // 获取待发送记录数量。flush过后得发送消息条数应为0,如果不为0,抛出异常
        long pendingRecordsCount = pendingRecords.get();
        if (pendingRecordsCount != 0) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
        }
    
        // if the flushed requests has errors, we should propagate it also and fail the checkpoint
        checkErroneous();
    }
    

    以下是commit方法。commit方法中又调用了kafka producer的commitTransaction方法。然后回收循环使用transactionId,关闭kafka producer。

    @Override
    protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                // 提交任务
                transaction.producer.commitTransaction();
            } finally {
                // 循环使用producer的transactionId(加入到availableTransactionalIds),并且关闭producer
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }
    

    recoverAndCommit方法。该方法用于从transactionalId恢复出对应的kafka producer,然后在提交任务。

    @Override
    protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (
                // 尝试根据已有的transactionId重新建立producer,然后提交任务
                FlinkKafkaInternalProducer<byte[], byte[]> producer =
                    initTransactionalProducer(transaction.transactionalId, false)) {
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();
            } catch (InvalidTxnStateException | ProducerFencedException ex) {
                // That means we have committed this transaction before.
                LOG.warn("Encountered error {} while recovering transaction {}. " +
                        "Presumably this transaction has been already committed before",
                    ex,
                    transaction);
            }
        }
    }
    

    abort方法。用于终止事务,会放弃事务的提交,回收transactionId并关闭producer。

    @Override
    protected void abort(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            // 终止transaction
            transaction.producer.abortTransaction();
            // 回收transactionId,关闭producer
            recycleTransactionalProducer(transaction.producer);
        }
    }
    

    附录

    SinkFunction

    SinkFunction是Flink所有数据落地端逻辑必须要实现的接口。

    SinkFunction的接口的代码内容如下所示:

    @Public
    public interface SinkFunction<IN> extends Function, Serializable {
    
        /** @deprecated Use {@link #invoke(Object, Context)}. */
        @Deprecated
        default void invoke(IN value) throws Exception {}
    
        /**
         * Writes the given value to the sink. This function is called for every record.
         *
         * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
         * {@code default} method for backward compatibility with the old-style method only.
         *
         * @param value The input record.
         * @param context Additional context about the input record.
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the
         *     operation to fail and may trigger recovery.
         */
        default void invoke(IN value, Context context) throws Exception {
            invoke(value);
        }
    
        /**
         * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about an
         * input record.
         *
         * <p>The context is only valid for the duration of a {@link SinkFunction#invoke(Object,
         * Context)} call. Do not store the context and use afterwards!
         */
        @Public // Interface might be extended in the future with additional methods.
        interface Context {
    
            /** Returns the current processing time. */
            long currentProcessingTime();
    
            /** Returns the current event-time watermark. */
            long currentWatermark();
    
            /**
             * Returns the timestamp of the current input record or {@code null} if the element does not
             * have an assigned timestamp.
             */
            Long timestamp();
        }
    }
    

    SinkFunction的代码并不是很复杂,只包含一个方法invoke(IN value, Context context)(另一个版本的invoke方法已经被废弃,这里不再介绍)。这个方法的第一个参数为数据流中到达sink算子的元素,第二个参数为context对象。这个context对象包装了元素的附带信息,如下所示:

    • currentProcessingTime:返回当前的处理时间。
    • currentWatermark:返回当前watermark的时间戳。
    • timestamp:返回当前元素附带的timestamp(通过timestamp extractor提取或者是通过数据源指定)。如果没有指定为元素指定timestamp,返回null。

    invoke是实现sink逻辑的关键。对于任何数据落地(从Flink中输出)的逻辑,我们只需要实现SinkFunction接口,将数据落地逻辑编写在invoke方法中。

    RichSinkFunction

    Flink中所有的RichFunction都是普通function的加强版。RichFunction除了支持编写自定义的启动和停止逻辑外,还支持在方法内部获取RuntimeContext。对于RichSinkFunction也不例外。

    PrintSinkFunction

    下面我们分析一个Flink官方最简单的Flink sink的实现PrintSinkFunction。这个sink的作用为将数据流中的元素逐个打印出来。

    接下来我们分析下PrintSinkFunction中的重要部分。

    private final PrintSinkOutputWriter<IN> writer;
    
    /** Instantiates a print sink function that prints to standard out. */
    public PrintSinkFunction() {
      writer = new PrintSinkOutputWriter<>(false);
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
      writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
    }
    
    @Override
    public void invoke(IN record) {
      writer.write(record);
    }
    

    从上面代码中可知PrintSinkFunction持有一个PrintSinkOutputWriter对象,用于将元素打印到标准输出或者是标准错误。在open方法中将这个对象初始化。invoke方法将数据流中的元素写入writer。

    我们简单看下PrintSinkOutputWriter的代码,相关分析在注释中标明。

    // 布尔值stdErr决定了输出到标准输出还是标准错误
    public PrintSinkOutputWriter(final boolean stdErr) {
      this("", stdErr);
    }
    
    public void open(int subtaskIndex, int numParallelSubtasks) {
      // get the target stream
      // 设置输出流
      stream = target == STD_OUT ? System.out : System.err;
    
      completedPrefix = sinkIdentifier;
    
      // 如果并行度大于1,输出加上completedPrefix和subTask索引
      if (numParallelSubtasks > 1) {
        if (!completedPrefix.isEmpty()) {
          completedPrefix += ":";
        }
        completedPrefix += (subtaskIndex + 1);
      }
    
      if (!completedPrefix.isEmpty()) {
        completedPrefix += "> ";
      }
    }
    
    public void write(IN record) {
      // 打印到输出流
      stream.println(completedPrefix + record.toString());
    }
    

    相关文章

      网友评论

        本文标题:Flink 源码之两阶段提交

        本文链接:https://www.haomeiwen.com/subject/akvwgctx.html