美文网首页
flink state 源码分析

flink state 源码分析

作者: 邵红晓 | 来源:发表于2020-04-21 16:27 被阅读0次
  • 创建keyedstate后台参数解释
StateBackend.java
创建接口
createOperatorStateBackend
createKeyedStateBackend
实现类
FsStateBackend
MemoryStateBackend
RocksDBStateBackend
使用
StreamTaskStateInitializerImpl.java
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =
   new BackendRestorerProcedure<>(
      () -> stateBackend.createKeyedStateBackend(
         environment,
         environment.getJobID(),
         operatorIdentifierText,
         keySerializer,
         taskInfo.getMaxNumberOfParallelSubtasks(),
         keyGroupRange,
         environment.getTaskKvStateRegistry(),
         TtlTimeProvider.DEFAULT,
         metricGroup),
      backendCloseableRegistry,
      logDescription);

<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
   Environment env,
   JobID jobID,
   String operatorIdentifier,
   TypeSerializer<K> keySerializer,
   int numberOfKeyGroups,
   KeyGroupRange keyGroupRange,
   TaskKvStateRegistry kvStateRegistry,
   TtlTimeProvider ttlTimeProvider,
   MetricGroup metricGroup) throws Exception;
  • numberOfKeyGroups = subTask最大并行度 operator.setMaxParallelism(int maxParallelism)
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
   taskInfo.getMaxNumberOfParallelSubtasks(),
   taskInfo.getNumberOfParallelSubtasks(),
   taskInfo.getIndexOfThisSubtask());

StreamTransformation.java--
public void setMaxParallelism(int maxParallelism) {
   Preconditions.checkArgument(maxParallelism > 0
               && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
         "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
               + ". Found: " + maxParallelism);
   this.maxParallelism = maxParallelism;
}

ExecutionJobVertex.java--
final int configuredMaxParallelism = jobVertex.getMaxParallelism();
this.maxParallelismConfigured = (VALUE_NOT_SET(-1) != configuredMaxParallelism);

//如果没有设置,则计算默认最大并行度
setMaxParallelismInternal(maxParallelismConfigured ?
      configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));
// 最小128
Math.min(
      Math.max(
            MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
            DEFAULT_LOWER_BOUND_MAX_PARALLELISM 128 ),
      UPPER_BOUND_MAX_PARALLELISM 32768);
  • keyGroupRange
KeyGroupRangeAssignment.java 
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
   taskInfo.getMaxNumberOfParallelSubtasks(),
   taskInfo.getNumberOfParallelSubtasks(),
   taskInfo.getIndexOfThisSubtask() 子任务的index [0-maxParallelism-1]);

计算单个key属于哪个keyGroup   ==  Assigns the given key to a key-group index.
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
   return MathUtils.murmurHash(keyHash) % maxParallelism;
}
计算单个任务的keyGroupRange边界
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
   int maxParallelism,
   int parallelism,
   int operatorIndex) {
            
   int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
   int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;

   return new KeyGroupRange(start, end);
}
并行度和最大并行度区别?
parallelism 系统默认并行度
maxParallelism operator 单个设置operator.setMaxParallelism(int maxParallelism)
DataStream.java setMaxParallelism()

参考
rocksdb概念
https://cloud.tencent.com/developer/article/1403939
namespace is operator-uid @ StateTable 概念
state最佳实践
https://www.cnblogs.com/rossiXYZ/p/12594315.html
State Processor API namespace-> operator.setUid(),就是算子uid
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html

相关文章

网友评论

      本文标题:flink state 源码分析

      本文链接:https://www.haomeiwen.com/subject/xkqpihtx.html