- 抛开 JVM 内存模型,单从 TaskManager 内存的主要使用方式来看,TaskManager 的内存主要分为三个部分:
Network Buffers:一定数量的MemorySegment
, 主要用于网络传输。在TaskManager
启动时分配, 通过NetworkEnvironment
和NetworkBufferPool
进行管理
Managed Memory
:由MemoryManager
管理的一组MemorySegment
集合, 主要用于 Batch 模式下的 sorting, hashing, 和 cache 等。
Remaining JVM heap
:余下的堆内存留给TaskManager
的数据结构以及用户代码处理数据时使用。TaskManager 自身的数据结构并不会占用太多内存,因而主要都是供用户代码使用,用户代码创建的对象通常生命周期都较短
MemorySegment
堆内相对地址 protected final byte[] heapMemory;
堆外绝对内存地址 protected long address;
MemorySegment的实现类有两个HeapMemorySegment(unused),HybridMemorySegment
有两个实现类就无法使用 JIT 优化的性能,具体是调用可以通过去虚化(de-virtualized)和内联(inlined)来提升性能,性能提高2.7倍,所以flink采用了HybridMemorySegment一个实现类,可以同时操作堆外和堆内内存,这得益于sun.misc.Unsafe.getLong(Object reference, long offset)
,如果refer为空,则会访问内存绝对地址,否则访问相对地址
- MemorySegment 管理
flink Buffer
同netty bytebuf,提供读写指针,引用计数功能
image.png
NetworkBuffer继承了netty的AbstractReferenceCountedByteBuf 并且实现了Buffer
public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer
优点
1.不需要每次为读写做准备,直接设置读写指针进行读写操作,可以直接调用discardReadBytes,复用之前读取过的内存0<readerIndex<=writeIndex<capacity
,clear即清除缓冲区的指针状态,回复到初始值,readerIndex=writeIndex=0<capacity
2.当refCnt引用计数=0,自动回收到池中,而不是真正释放
由于Netty是一个NIO网络框架,因此对于Buffer的使用如果基于直接内存(DirectBuffer)实现的话,将会大大提高I/O操作的效率,然而DirectBuffer和HeapBuffer相比之下除了I/O操作效率高之外还有一个天生的缺点,即对于DirectBuffer的申请相比HeapBuffer效率更低,因此Netty结合引用计数实现了PolledBuffer,即池化的用法,当引用计数等于0的时候,Netty将Buffer回收致池中,在下一次申请Buffer的没某个时刻会被复用,flink用法同netty相似
3.通过内置的composite buffer类型可以实现zero-copy
AbstractReferenceCountedByteBuf .java
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
cas 使用自旋锁直到compareAndSet成功跳出
private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}
flink 自己实现了release释放,当前引用refCnt - decrement,
AbstractReferenceCountedByteBuf #protected abstract void deallocate();-> NetworkBuffer #recycler.recycle(memorySegment);
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);
numberOfRequestedMemorySegments--;
networkBufferPool.recycle(segment);
}
MemoryManager
MemoryManager 是管理 Managed Memory 的类,这部分主要是在 Batch 模式下使用,在 Streaming 模式下这一块内存不会分配。MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment。
MemoryManagerBuilder # private final Map<MemoryType, Long> memoryPools = new EnumMap<>(MemoryType.class)
MemoryManager#allocateManagedSegment
如下
private MemorySegment allocateManagedSegment(MemoryType memoryType, Object owner) {
switch (memoryType) {
case HEAP:
return allocateUnpooledSegment(getPageSize(), owner);
case OFF_HEAP:
return allocateOffHeapUnsafeMemory(getPageSize(), owner);
default:
throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
}
}
总结:
-
JIT 即使编译器:在Java编程语言和环境中,即时编译器(JIT compiler,just-in-time compiler)是一个把Java的字节码(包括需要被解释的指令的程序)转换成可以直接发送给处理器的指令的程序
-
去虚拟化
是指在装载class文件后,进行类层次的分析,如果发现类中的方法只提供一个实现类,那么对于调用了此方法的代码,也可以进行方法内联,从而提升执行的性能。 -
内联
多个方法调用,执行时要经历多次参数传递,返回值传递及跳转等,jvm的Client Compiler 采用方法内联,把调用到的方法的指令直接植入当前方法中。-XX:+PringInlining来查看方法内联信息,-XX:MaxInlineSize=35控制编译后文件大小。 -
jvm 解释器
当虚拟机启动时,解释器可以首先发挥作用,而不必等待编译器全部编译完成再执行,这样可以省去许多不必要的编译时间。并且随着程序运行时间的推移,编译器逐渐发挥作用,根据热点探测功能,将有价值的字节码编译为本地机器指令,以换取更高的程序执行效率。 -
zero-copy
就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 少了一次内存的拷贝, 减少了cpu的执行,节省了内存带宽。
在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率. -
Netty 的 Zero-copy 体现在如下几个方面:
1.Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝.
2.通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作.
3.ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.
4.通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题. -
JDK之所以未选择在启动时即编译成机器码的原因如下:
(1)静态编译并不能根据程序的运行状态来优化执行的代码,Server Compiler 这种方式是根据运行状态来进行动态编译的,例如分支判断、逃逸分析等,这些措施会对提升程序执行的性能起到很大的帮助,在静态编译的情况下是无法实现的,给Server Compiler收集运行数据越长的时间,编译出来的代码会越优。
(2)解释执行比编译执行更节省内存
(3)启动时解释执行的启动速度比编译再启动更快。
参考:
https://blog.jrwang.me/2019/flink-source-code-memory-management/#memorymanager
https://www.jianshu.com/p/61a7916b37fd
http://www.whitewood.me/2019/10/17/Flink-1-10-%E7%BB%86%E7%B2%92%E5%BA%A6%E8%B5%84%E6%BA%90%E7%AE%A1%E7%90%86%E8%A7%A3%E6%9E%90/
网友评论