启动脚本
flink run -d -m yarn-cluster -p 1 -yjm 512M -ytm 1024M -c com.bigdata.FlinkRun xxx.jar
报错信息
JobManager memory configuration failed: The configured Total Flink Memory (64.000mb (67108864 bytes)) is less than the configured Off-heap Memory (128.000mb (134217728 bytes)).
分析
从报错信息来看,flink的总内存小于堆外内存,可是脚本中-jym
中jobManager分配的内存是512m,为何这里只剩下了64m呢? 话不多说上源码
根据报错关键字找到对应的代码
JobManagerFlinkMemoryUtils.deriveFromTotalFlinkMemory
public JobManagerFlinkMemory deriveFromTotalFlinkMemory(
Configuration config, MemorySize totalFlinkMemorySize) {
MemorySize offHeapMemorySize =
ProcessMemoryUtils.getMemorySizeFromConfig(
config, JobManagerOptions.OFF_HEAP_MEMORY);
if (totalFlinkMemorySize.compareTo(offHeapMemorySize) < 1) {
throw new IllegalConfigurationException(
"The configured Total Flink Memory (%s) is less than the configured Off-heap Memory (%s).",
totalFlinkMemorySize.toHumanReadableString(),
offHeapMemorySize.toHumanReadableString());
}
MemorySize derivedJvmHeapMemorySize = totalFlinkMemorySize.subtract(offHeapMemorySize);
return createJobManagerFlinkMemory(derivedJvmHeapMemorySize, offHeapMemorySize);
}
运行源码中的测试用例
FlinkYarnSessionCliTest.testMemoryPropertyWithoutUnit
, 通过debug可得到如下的信息
这里做一个猜想, Flink JobManager进程总内存 = 堆内存 + 堆外内存 + Metaspace + Overhead
报错重现
将测试用例中的jobManager的内存改为512, 报错即可重现
通过计算结果符合预期的报错,套用上面的猜想和结合jvmOptions
的默认配置, 即可计算出堆内存为64m = 512m - 192m(jvm-overhead.min) - 256(jvm-metaspace.size)
, 小于默认的堆外内存所以校验不通过.
计算中的Overhead内存会根据参数jvm-overhead.fraction
动态调整, 默认为0.1
堆外内存默认为128M
在资源充足的情况下建议无需将默认的配置下调, 如果要调整可以从上述的参数进行调整, 在脚本中加入运行参数, 如
-yD jobmanager.memory.off-heap.size=1 -yD jobmanager.memory.jvm-overhead.min=64 等等
同理如果taskManager的内存配置小了, 也会报错, 分析定位的方法大概一致
Flink内存模型
官方的文档有详细的说明, Flink 内存模型, 在Flink的Dashboard上可以看到JobManager和TaskManager对应的内存模型和JVM的监控信息
TaskManager
网友评论