美文网首页
flink TaskManager 内存模型(二)

flink TaskManager 内存模型(二)

作者: 邵红晓 | 来源:发表于2020-04-30 18:48 被阅读0次
    • 抛开 JVM 内存模型,单从 TaskManager 内存的主要使用方式来看,TaskManager 的内存主要分为三个部分:
      Network Buffers:一定数量的 MemorySegment, 主要用于网络传输。在 TaskManager 启动时分配, 通过 NetworkEnvironmentNetworkBufferPool 进行管理
      Managed Memory:由 MemoryManager 管理的一组 MemorySegment 集合, 主要用于 Batch 模式下的 sorting, hashing, 和 cache 等。
      Remaining JVM heap:余下的堆内存留给 TaskManager 的数据结构以及用户代码处理数据时使用。TaskManager 自身的数据结构并不会占用太多内存,因而主要都是供用户代码使用,用户代码创建的对象通常生命周期都较短

    MemorySegment

    堆内相对地址 protected final byte[] heapMemory;
    堆外绝对内存地址 protected long address;
    MemorySegment的实现类有两个HeapMemorySegment(unused),HybridMemorySegment
    有两个实现类就无法使用 JIT 优化的性能,具体是调用可以通过去虚化(de-virtualized)和内联(inlined)来提升性能,性能提高2.7倍,所以flink采用了HybridMemorySegment一个实现类,可以同时操作堆外和堆内内存,这得益于sun.misc.Unsafe.getLong(Object reference, long offset),如果refer为空,则会访问内存绝对地址,否则访问相对地址

    • MemorySegment 管理
      flink Buffer同netty bytebuf,提供读写指针,引用计数功能
      image.png
      NetworkBuffer继承了netty的AbstractReferenceCountedByteBuf 并且实现了Buffer
      public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer
    优点

    1.不需要每次为读写做准备,直接设置读写指针进行读写操作,可以直接调用discardReadBytes,复用之前读取过的内存0<readerIndex<=writeIndex<capacity ,clear即清除缓冲区的指针状态,回复到初始值,readerIndex=writeIndex=0<capacity
    2.当refCnt引用计数=0,自动回收到池中,而不是真正释放
    由于Netty是一个NIO网络框架,因此对于Buffer的使用如果基于直接内存(DirectBuffer)实现的话,将会大大提高I/O操作的效率,然而DirectBuffer和HeapBuffer相比之下除了I/O操作效率高之外还有一个天生的缺点,即对于DirectBuffer的申请相比HeapBuffer效率更低,因此Netty结合引用计数实现了PolledBuffer,即池化的用法,当引用计数等于0的时候,Netty将Buffer回收致池中,在下一次申请Buffer的没某个时刻会被复用,flink用法同netty相似
    3.通过内置的composite buffer类型可以实现zero-copy

    AbstractReferenceCountedByteBuf .java

     private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
        static {
            AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                    PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
            if (updater == null) {
                updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
            }
            refCntUpdater = updater;
        }
     cas 使用自旋锁直到compareAndSet成功跳出
     private ByteBuf retain0(int increment) {
            for (;;) {
                int refCnt = this.refCnt;
                final int nextCnt = refCnt + increment;
    
                // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
                if (nextCnt <= increment) {
                    throw new IllegalReferenceCountException(refCnt, increment);
                }
                if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
                    break;
                }
            }
            return this;
        }
    

    flink 自己实现了release释放,当前引用refCnt - decrement,
    AbstractReferenceCountedByteBuf #protected abstract void deallocate();-> NetworkBuffer #recycler.recycle(memorySegment);

     private boolean release0(int decrement) {
            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt < decrement) {
                    throw new IllegalReferenceCountException(refCnt, -decrement);
                }
    
                if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                    if (refCnt == decrement) {
                        deallocate();
                        return true;
                    }
                    return false;
                }
            }
        }
        private void returnMemorySegment(MemorySegment segment) {
            assert Thread.holdsLock(availableMemorySegments);
            numberOfRequestedMemorySegments--;
            networkBufferPool.recycle(segment);
        }
    

    MemoryManager

    MemoryManager 是管理 Managed Memory 的类,这部分主要是在 Batch 模式下使用,在 Streaming 模式下这一块内存不会分配。MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment。
    MemoryManagerBuilder # private final Map<MemoryType, Long> memoryPools = new EnumMap<>(MemoryType.class)
    MemoryManager#allocateManagedSegment如下

    private MemorySegment allocateManagedSegment(MemoryType memoryType, Object owner) {
            switch (memoryType) {
                case HEAP:
                    return allocateUnpooledSegment(getPageSize(), owner);
                case OFF_HEAP:
                    return allocateOffHeapUnsafeMemory(getPageSize(), owner);
                default:
                    throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
            }
        }
    

    总结:

    • JIT 即使编译器:在Java编程语言和环境中,即时编译器(JIT compiler,just-in-time compiler)是一个把Java的字节码(包括需要被解释的指令的程序)转换成可以直接发送给处理器的指令的程序

    • 去虚拟化
      是指在装载class文件后,进行类层次的分析,如果发现类中的方法只提供一个实现类,那么对于调用了此方法的代码,也可以进行方法内联,从而提升执行的性能。

    • 内联
      多个方法调用,执行时要经历多次参数传递,返回值传递及跳转等,jvm的Client Compiler 采用方法内联,把调用到的方法的指令直接植入当前方法中。-XX:+PringInlining来查看方法内联信息,-XX:MaxInlineSize=35控制编译后文件大小。

    • jvm 解释器
      当虚拟机启动时,解释器可以首先发挥作用,而不必等待编译器全部编译完成再执行,这样可以省去许多不必要的编译时间。并且随着程序运行时间的推移,编译器逐渐发挥作用,根据热点探测功能,将有价值的字节码编译为本地机器指令,以换取更高的程序执行效率。

    • zero-copy
      就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 少了一次内存的拷贝, 减少了cpu的执行,节省了内存带宽。
      在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.

    • Netty 的 Zero-copy 体现在如下几个方面:
      1.Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝.
      2.通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作.
      3.ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.
      4.通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题.

    • JDK之所以未选择在启动时即编译成机器码的原因如下:
      (1)静态编译并不能根据程序的运行状态来优化执行的代码,Server Compiler 这种方式是根据运行状态来进行动态编译的,例如分支判断、逃逸分析等,这些措施会对提升程序执行的性能起到很大的帮助,在静态编译的情况下是无法实现的,给Server Compiler收集运行数据越长的时间,编译出来的代码会越优。
      (2)解释执行比编译执行更节省内存
      (3)启动时解释执行的启动速度比编译再启动更快。

    参考:
    https://blog.jrwang.me/2019/flink-source-code-memory-management/#memorymanager
    https://www.jianshu.com/p/61a7916b37fd
    http://www.whitewood.me/2019/10/17/Flink-1-10-%E7%BB%86%E7%B2%92%E5%BA%A6%E8%B5%84%E6%BA%90%E7%AE%A1%E7%90%86%E8%A7%A3%E6%9E%90/

    相关文章

      网友评论

          本文标题:flink TaskManager 内存模型(二)

          本文链接:https://www.haomeiwen.com/subject/dxlzwhtx.html