美文网首页Flink精选学习
【Flink 精选】基于 state 缩放原理的作业弹性

【Flink 精选】基于 state 缩放原理的作业弹性

作者: 熊本极客 | 来源:发表于2020-09-14 22:46 被阅读0次

flink 作业的弹性是基于 state 缩放,探讨 state 缩放和算子并发度的关系。


1.作业弹性与 state 缩放的概念

1.1 概念

flink 作业弹性是指作业的扩缩容,实质上就是算子并发度的缩容和扩容。state 缩放是指状态的缩容和扩容。下面我们主要讨论

1.2 场景

Flink 作业已经运行了一段时间,用户扩容作业的主要手段:增大算子并发度,提高作业性能和吞吐量,如下图所示。

image

2.state 的持久化和扩容分配

2.1 state 持久化

(1)state 持久化的背景

在实时计算场景中,数据流会源源不断进入 flink 系统,每条数据都会触发作业计算

问题1:如果某作业需要进行聚合 count 计算,每次计算是将历史的所有数据重新计算,还是每次计算是基于上一次的计算结果进行增量计算。

解答1:flink 的聚合 count 计算(状态)是基于上一次结果的增量计算

问题2:上一次计算结果的缓存数据保存在哪里?

解答2:如果缓存数据保存在内存中,则在节点故障恢复的时候,需要重新计算历史的所有数据。这是不可取的。因此,为了提高缓存的可靠性和性能,缓存数据需要进行本地持久化

(2)state 持久化的原理

以 RocksDB + HDFS 存储为例,state 存储有两个阶段:首先缓存数据存储到本地RocksDB,然后异步保存到分布式文件 HDFS。优点是既克服 HeapStateBackend 的内存大小和可靠性的问题,也避免了 FsStateBackend 的产生大量网络 IO 问题。

image

Flink 有4种 state 的存储方式

① 基于内存的HeapStateBackend: 在debug模式使用。

② 基于HDFS的FsStateBackend:分布式文件持久化,每次读写都产生网络IO,性能不好

③ 基于RocksDB的RocksDBStateBackend:本地文件+异步HDFS持久化。

2.2 state 扩容分配问题

Flink 是一个分布式有状态的流处理系统。Flink 作业的 DAG 图在逻辑上 StreamGraph 优化为 JobGraph,最终转化为物理执行的 ExecutionGraph 运行在 TaskManager。ExecutionGraph 的每个节点就是算子实例。每个算子实例都可以看作是一个独立的任务。

如下图所示,Flink 作业的 DAG 图在垂直方向有网络 IO,在水平方向的 stateful operator 算子之间没有网络通信。这种模型保证了每个算子实例维护一份自己的 state,并且保存在本地,不会导致算子实例之间的产生网络通信

image

Flink 有两种状态:operator state 和 keyed state。如果进行扩容(增大并发度),如何重新分配 state?例如,外部 MQ 有5个 partition,在 source 的并发由 1 扩容到 2,中间 stateful operation 的并发度由 2 扩容到 3。

image

3.operator state 扩容重新分配

选取kafka connector案例,如上图所示,kafka broker 的 partition 数量是5,source 的并发度从 2 扩容到 5 。如何恢复 state ?下面的源码是基于1.11版本。
FlinkKafkaConsumerBase.java源码分析。如果restoredState是从operator state恢复,即从savepoint或者checkpoint恢复,把restoredState的数据设置到subscribedPartitionsToStartOffsets。在设置的过程中,需要重新把partition分配到subtask,即使用KafkaTopicPartitionAssigner的assign方法重新计算该partition属于哪个subtask。

    /** The set of topic partitions that the source will read, with their initial offsets to start reading from. */

    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
    /**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
    private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;

    /** Accessor for state in the operator state backend. */
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

    // 算子初始化配置 
    @Override
    public void open(Configuration configuration) throws Exception {
        // 省略 ...

        subscribedPartitionsToStartOffsets = new HashMap<>();
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
                // 如果是从operator state恢复, 即不为null
        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                if (!restoredFromOldState) {
                    // seed the partition discoverer with the union state while filtering out
                    // restored partitions that should not be subscribed by this subtask
                    if (KafkaTopicPartitionAssigner.assign(
                        restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                            == getRuntimeContext().getIndexOfThisSubtask()){
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                } else {
                    // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                    // in this case, just use the restored state as the subscribed partitions
                    subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
                    if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
                        LOG.warn(
                            "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                            entry.getKey());
                        return true;
                    }
                    return false;
                });
            }

            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
        }
    }

    // checkpoint 恢复缓存数据
    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {
        // 省略 ...

        if (context.isRestored() && !restoredFromOldState) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // migrate from 1.2 state, if there is any
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                restoredFromOldState = true;
                unionOffsetStates.add(kafkaOffset);
            }
            oldRoundRobinListState.clear();

            if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                throw new IllegalArgumentException(
                    "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }

            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
        } else {
            LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
        }
    }

接着分析subscribedPartitionsToStartOffsets属性,类型是Map<KafkaTopicPartition, Long>,KafkaTopicPartition包含topic和partition,Long表示offset。

public final class KafkaTopicPartition implements Serializable {

    /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
     * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS. */
    private static final long serialVersionUID = 722083576322742325L;

    // ------------------------------------------------------------------------

    private final String topic;
    private final int partition;
    private final int cachedHash;

    public KafkaTopicPartition(String topic, int partition) {
        this.topic = requireNonNull(topic);
        this.partition = partition;
        this.cachedHash = 31 * topic.hashCode() + partition;
    }
}
最后我们看看KafkaTopicPartitionAssigner是如何为subtask分配partition。首先使用topic取(hash* 31) & 0x7FFFFFFF是为了保证其结果是一个质数,然后再(startIndex + partition.getPartition()) % numParallelSubtasks是为了同一个topic的partition尽量连续分配给同一个subtask。 operator state扩容.JPG

4.keyed state 扩容重新分配

思路:为了减少网络IO即本地化缓存数据,算子实例subtask需要读取连续的key,从而就有了KeyGroup的设计。这里需要解决两个问题:①如何把KeyGroup分配给subtask? ②每条数据即key如何确定KeyGroup?

4.1 KeyGroup

public class KeyGroupRange implements KeyGroupsList, Serializable {
        ...
        ...
        private final int startKeyGroup;
        private final int endKeyGroup;
        ...
        ...
}

KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性后,即subtask的KeyGroup的范围(连续数量)。

4.2 分配数据(key)给KeyGroup

分析KeyGroupRangeAssignment源码,assignToKeyGroup方法采用取mod的方式,将key划分到指定的KeyGroup中。

    /**
     * Assigns the given key to a key-group index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    /**
     * Assigns the given key to a key-group index.
     *
     * @param keyHash the hash of the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }
分配key到指定的KeyGroup的逻辑是利用key的hashCode和maxParallelism进行取余操作来分配的。如下图所示,parallelism=2,maxParallelism=10,key与KeyGroup的对应关系。 key和KeyGroup的关系.JPG

4.3 分配KeyGroup给subtask

分析KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法,每个算子实例subtask都可以分配到连续的KeyGroup范围,并且KeyGroup范围式[0,maxParallelism]。

    /**
     * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
     * parallelism.
     *
     * <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
     * to go beyond this boundary, this method must perform arithmetic on long values.
     *
     * @param maxParallelism Maximal parallelism that the job was initially created with.
     * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
     * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
     * @return the computed key-group range for the operator.
     */
    public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");

        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

如下图所示,Stateful Operation算子实例的maxParallelism=10,即10个KeyGroup。算子实例的并发度从 2 扩容到 3 的分配情况如下图。

keyed state扩容.JPG
上图分析可知,大部分state是本地的,如Task0只有KG-4被分出去,其他的还是保持在本地。如果作业的maxParallelism变化了,那么会直接影响到KeyGroup的数量和key的分配,也会打乱所有的KeyGroup的分配。因此,作业的maxParallelism要充分扩容场景,否则如果修改maxParallelism,会直接影响到缓存数据的恢复。

5.合理设置 maxParallelism

5.1 最大并发度的概念

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 实质是 KeyGroup 的总数

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法>从 Checkpoint 处恢复的。
maxParallelism的有效值在1到Short.MAX_VALUE之间。

    /**
     * Sets the maximum parallelism for the task.
     *
     * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
     */
    public void setMaxParallelism(int maxParallelism) {
        this.maxParallelism = maxParallelism;
    }

如果不设置 maxParallelism,根据并发度 parallelism 计算默认的最大并发度,算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128,最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768,即 flink 自动生成的 maxParallelism 在于 128 和 32768 之间。

    /**
     * Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not
     * explicitly configured a maximum parallelism to still allow a certain degree of scale-up.
     *
     * @param operatorParallelism the operator parallelism as basis for computation.
     * @return the computed default maximum parallelism.
     */
    public static int computeDefaultMaxParallelism(int operatorParallelism) {

        checkParallelismPreconditions(operatorParallelism);

        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

5.2 实战

一般情况,新的 flink 作业的业务数据量较小,初期设置的并行度也会很小,可能没有给每个 作业或者算子设置 maxParallelism。根据默认并发度的计算规则,flink 自动生成的 maxParallelism 是 128。然而,后期随着业务数据量暴涨,当 作业 的并发度大于 128 的时候,发现作业无法从 checkpoint 或者 savepoint 中恢复,即 "并发度不能上调"。如果有些 flink 作业是带状态的,就会有很大的问题。

因此,应该结合业务场景主动为每个 flink 作业设置合理的 maxParallelism,防止上述问题。

相关文章

网友评论

    本文标题:【Flink 精选】基于 state 缩放原理的作业弹性

    本文链接:https://www.haomeiwen.com/subject/bnbzektx.html