flink 作业的弹性是基于 state 缩放,探讨 state 缩放和算子并发度的关系。
1.作业弹性与 state 缩放的概念
1.1 概念
flink 作业弹性是指作业的扩缩容,实质上就是算子并发度的缩容和扩容。state 缩放是指状态的缩容和扩容。下面我们主要讨论
1.2 场景
Flink 作业已经运行了一段时间,用户扩容作业的主要手段:增大算子并发度,提高作业性能和吞吐量,如下图所示。
image2.state 的持久化和扩容分配
2.1 state 持久化
(1)state 持久化的背景
在实时计算场景中,数据流会源源不断进入 flink 系统,每条数据都会触发作业计算。
问题1:如果某作业需要进行聚合 count 计算,每次计算是将历史的所有数据重新计算,还是每次计算是基于上一次的计算结果进行增量计算。
解答1:flink 的聚合 count 计算(状态)是基于上一次结果的增量计算。
问题2:上一次计算结果的缓存数据保存在哪里?
解答2:如果缓存数据保存在内存中,则在节点故障恢复的时候,需要重新计算历史的所有数据。这是不可取的。因此,为了提高缓存的可靠性和性能,缓存数据需要进行本地持久化。
(2)state 持久化的原理
以 RocksDB + HDFS 存储为例,state 存储有两个阶段:首先缓存数据存储到本地RocksDB,然后异步保存到分布式文件 HDFS。优点是既克服 HeapStateBackend 的内存大小和可靠性的问题,也避免了 FsStateBackend 的产生大量网络 IO 问题。
imageFlink 有4种 state 的存储方式
① 基于内存的HeapStateBackend: 在debug模式使用。
② 基于HDFS的FsStateBackend:分布式文件持久化,每次读写都产生网络IO,性能不好
③ 基于RocksDB的RocksDBStateBackend:本地文件+异步HDFS持久化。
2.2 state 扩容分配问题
Flink 是一个分布式有状态的流处理系统。Flink 作业的 DAG 图在逻辑上 StreamGraph 优化为 JobGraph,最终转化为物理执行的 ExecutionGraph 运行在 TaskManager。ExecutionGraph 的每个节点就是算子实例。每个算子实例都可以看作是一个独立的任务。
如下图所示,Flink 作业的 DAG 图在垂直方向有网络 IO,在水平方向的 stateful operator 算子之间没有网络通信。这种模型保证了每个算子实例维护一份自己的 state,并且保存在本地,不会导致算子实例之间的产生网络通信。
imageFlink 有两种状态:operator state 和 keyed state。如果进行扩容(增大并发度),如何重新分配 state?例如,外部 MQ 有5个 partition,在 source 的并发由 1 扩容到 2,中间 stateful operation 的并发度由 2 扩容到 3。
image3.operator state 扩容重新分配
选取kafka connector案例,如上图所示,kafka broker 的 partition 数量是5,source 的并发度从 2 扩容到 5 。如何恢复 state ?下面的源码是基于1.11版本。
FlinkKafkaConsumerBase.java源码分析。如果restoredState是从operator state恢复,即从savepoint或者checkpoint恢复,把restoredState的数据设置到subscribedPartitionsToStartOffsets。在设置的过程中,需要重新把partition分配到subtask,即使用KafkaTopicPartitionAssigner的assign方法重新计算该partition属于哪个subtask。
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
* <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.
*/
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
// 算子初始化配置
@Override
public void open(Configuration configuration) throws Exception {
// 省略 ...
subscribedPartitionsToStartOffsets = new HashMap<>();
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
// 如果是从operator state恢复, 即不为null
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
entry.getKey());
return true;
}
return false;
});
}
LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
}
}
// checkpoint 恢复缓存数据
@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {
// 省略 ...
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// migrate from 1.2 state, if there is any
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
oldRoundRobinListState.clear();
if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}
// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
} else {
LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
}
}
接着分析subscribedPartitionsToStartOffsets属性,类型是Map<KafkaTopicPartition, Long>,KafkaTopicPartition包含topic和partition,Long表示offset。
public final class KafkaTopicPartition implements Serializable {
/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
* READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS. */
private static final long serialVersionUID = 722083576322742325L;
// ------------------------------------------------------------------------
private final String topic;
private final int partition;
private final int cachedHash;
public KafkaTopicPartition(String topic, int partition) {
this.topic = requireNonNull(topic);
this.partition = partition;
this.cachedHash = 31 * topic.hashCode() + partition;
}
}
最后我们看看KafkaTopicPartitionAssigner是如何为subtask分配partition。首先使用topic取(hash* 31) & 0x7FFFFFFF是为了保证其结果是一个质数,然后再(startIndex + partition.getPartition()) % numParallelSubtasks是为了同一个topic的partition尽量连续分配给同一个subtask。
operator state扩容.JPG
4.keyed state 扩容重新分配
思路:为了减少网络IO即本地化缓存数据,算子实例subtask需要读取连续的key,从而就有了KeyGroup的设计。这里需要解决两个问题:①如何把KeyGroup分配给subtask? ②每条数据即key如何确定KeyGroup?
4.1 KeyGroup
public class KeyGroupRange implements KeyGroupsList, Serializable {
...
...
private final int startKeyGroup;
private final int endKeyGroup;
...
...
}
KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性后,即subtask的KeyGroup的范围(连续数量)。
4.2 分配数据(key)给KeyGroup
分析KeyGroupRangeAssignment源码,assignToKeyGroup方法采用取mod的方式,将key划分到指定的KeyGroup中。
/**
* Assigns the given key to a key-group index.
*
* @param key the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @return the key-group to which the given key is assigned
*/
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
/**
* Assigns the given key to a key-group index.
*
* @param keyHash the hash of the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @return the key-group to which the given key is assigned
*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
分配key到指定的KeyGroup的逻辑是利用key的hashCode和maxParallelism进行取余操作来分配的。如下图所示,parallelism=2,maxParallelism=10,key与KeyGroup的对应关系。
key和KeyGroup的关系.JPG
4.3 分配KeyGroup给subtask
分析KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法,每个算子实例subtask都可以分配到连续的KeyGroup范围,并且KeyGroup范围式[0,maxParallelism]。
/**
* Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
* parallelism.
*
* <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
* to go beyond this boundary, this method must perform arithmetic on long values.
*
* @param maxParallelism Maximal parallelism that the job was initially created with.
* @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
* @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
* @return the computed key-group range for the operator.
*/
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
如下图所示,Stateful Operation算子实例的maxParallelism=10,即10个KeyGroup。算子实例的并发度从 2 扩容到 3 的分配情况如下图。
keyed state扩容.JPG上图分析可知,大部分state是本地的,如Task0只有KG-4被分出去,其他的还是保持在本地。如果作业的maxParallelism变化了,那么会直接影响到KeyGroup的数量和key的分配,也会打乱所有的KeyGroup的分配。因此,作业的maxParallelism要充分扩容场景,否则如果修改maxParallelism,会直接影响到缓存数据的恢复。
5.合理设置 maxParallelism
5.1 最大并发度的概念
maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 实质是 KeyGroup 的总数。
当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法>从 Checkpoint 处恢复的。
maxParallelism的有效值在1到Short.MAX_VALUE之间。
/**
* Sets the maximum parallelism for the task.
*
* @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
*/
public void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
如果不设置 maxParallelism,根据并发度 parallelism 计算默认的最大并发度,算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128,最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768,即 flink 自动生成的 maxParallelism 在于 128 和 32768 之间。
/**
* Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not
* explicitly configured a maximum parallelism to still allow a certain degree of scale-up.
*
* @param operatorParallelism the operator parallelism as basis for computation.
* @return the computed default maximum parallelism.
*/
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
5.2 实战
一般情况,新的 flink 作业的业务数据量较小,初期设置的并行度也会很小,可能没有给每个 作业或者算子设置 maxParallelism。根据默认并发度的计算规则,flink 自动生成的 maxParallelism 是 128。然而,后期随着业务数据量暴涨,当 作业 的并发度大于 128 的时候,发现作业无法从 checkpoint 或者 savepoint 中恢复,即 "并发度不能上调"。如果有些 flink 作业是带状态的,就会有很大的问题。
因此,应该结合业务场景主动为每个 flink 作业设置合理的 maxParallelism,防止上述问题。
网友评论