美文网首页
JAVA IO专题二:java NIO读取文件并通过socket

JAVA IO专题二:java NIO读取文件并通过socket

作者: 挡不住的柳Willow | 来源:发表于2021-01-14 18:10 被阅读0次

    相关IO专题

    JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝
    JAVA IO专题二:java NIO读取文件并通过socket发送,最少拷贝了几次?堆外内存和所谓的零拷贝到底是什么关系
    JAVA IO专题三:java的内存映射和应用场景
    JAVA IO专题四:java顺序IO原理以及对应的应用场景

    内核的零拷贝

    内核的零拷贝,指的是不需要消耗CPU资源,完全交给DMA来处理,内核空间的数据没有多余的拷贝。主要经历了这么几个发展历程:

    一、传统的read + send

    1、先调用操作系统的read函数,由DMA将文件拷贝到内核,然后CPU把内核数据拷贝到用户缓冲区(堆外内存)
    2、调用操作系统的send函数,由CPU把用户缓冲区的数据拷贝到socket缓冲区,最后DMA把socket缓冲区数据拷贝到网卡进行发送。

    这个过程中内核数据拷贝到用户空间,用户空间又拷贝回内存,有两次多余的拷贝。

    二、sendfile初始版本

    直接调用sendfile来发送文件,流程如下:
    1、首先通过 DMA将数据从磁盘读取到内核
    2、然后通过 CPU将数据从内核拷贝到socket缓冲区
    3、最终通过 DMA将socket缓冲区数据拷贝到网卡发送

    sendfile 与 read + send 方式相比,少了一次 CPU的拷贝。但是从上述过程中也可以发现从内核缓冲区拷贝到socket缓冲区是没必要的。

    三、sendfile改进版本,真正的零拷贝

    内核为2.4或者以上版本的linux系统上,改进后的处理过程如下:
    1、DMA 将磁盘数据拷贝到内核缓冲区,向socket缓冲区中追加当前要发送的数据在内核缓冲区中的位置和偏移量
    2、DMA gather copy 根据 socket缓冲区中的位置和偏移量,直接将内核缓冲区中的数据拷贝到网卡上。
    经过上述过程,数据只经过了 2 次 copy 就从磁盘传送出去了。并且没有CPU的参与。

    java的零拷贝

    一、利用directBuffer

    在上一篇文章JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝中,我们提到了基于BIO读取文件发送消息,一共涉及六次拷贝,其中堆外和堆内内存的拷贝是多余的,我们可以利用directBuffer来减少这两次拷贝:

    //打开文件通道
    FileChannel fileChannel = FileChannel.open(Paths.get("/test.txt"));
    //申请堆外内存
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    //读取到堆外内存
    fileChannel.read(byteBuffer);
    //打开socket通道
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9099));
    //堆外内存写入socket通道
    socketChannel.write(byteBuffer);
    

    每一行代码都有清楚的注释,我们主要来看一下fileChannel.read、socketChannel.write做了什么:

    • fileChannel.read 分析
    //FileChannelImpl
    public int read(ByteBuffer dst) throws IOException {
        ... 忽略了一堆不重要代码
            synchronized (positionLock) {
                int n = 0;
                int ti = -1;
                try {
                    do {
                       // 调用IOUtil,根据文件描述符fd读取数据到直接缓冲区dst中
                        n = IOUtil.read(fd, dst, -1, nd);
                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
                    return IOStatus.normalize(n);
                } finally {
                    threads.remove(ti);
                    end(n > 0);
                    assert IOStatus.check(n);
                }
            }
        }
    //IOUtil
    static int read(FileDescriptor fd, ByteBuffer dst, long position,
                        NativeDispatcher nd)
            throws IOException
        {
            ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
            try {
                int n = readIntoNativeBuffer(fd, bb, position, nd);
                bb.flip();
                if (n > 0)
                    dst.put(bb);
                return n;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(bb);
            }
        }
    
    private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                                long position, NativeDispatcher nd)
            throws IOException
        {
            int pos = bb.position();
            int lim = bb.limit();
            assert (pos <= lim);
            int rem = (pos <= lim ? lim - pos : 0);
    
            if (rem == 0)
                return 0;
            int n = 0;
            if (position != -1) {
                n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                             rem, position);
            } else {
               //第一次读取会走到这里,否则走上面的分支
                n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
            }
            if (n > 0)
                bb.position(pos + n);
            return n;
        }
    //FileDispatcherImpl
     int read(FileDescriptor fd, long address, int len) throws IOException {
            return read0(fd, address, len);
        }
    

    这里的调用链比较深,我们一步一步梳理:

    1. 调用fileChannel.read实际是走到了FileChannelImpl.read方法,然后走到n = IOUtil.read(fd, dst, -1, nd);调用IOUtil的read,传入了文件描述符、directBuffer
    2. IOUtil 调用自己的readIntoNativeBuffer方法,字面意思是讲数据读取到native缓存,即堆外内存
    3. IOUtil 的 readIntoNativeBuffer 方法调用n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);,即NativeDispatcher 的read方法,传入文件描述符,堆外内存地址以及要读取的长度
    4. 这里的 NativeDispatcher 实现类为 FileDispatcherImpl,实际调用的是native方法read0,并传入了文件描述符、堆外内存地址和读取长度

    我们简单看一下native的read0方法做了什么:

    // 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileDispatcherImpl.c
    
    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
                                 jobject fdo, jlong address, jint len)
    {
        //拿到文件描述符
        jint fd = fdval(env, fdo);
        //根据地址拿到堆外内存的指针
        void *buf = (void *)jlong_to_ptr(address);
      //直接调用系统函数read把文件描述符读取到buf中
        return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
    }
    

    可以看到native的read0方法是直接调用系统函数read,根据jvm传过来的堆外内存地址,将文件数据读取到堆外内存中(read方法的作用在内核零拷贝小节里已经提到了)。即直接操作堆外内存,而不使用DirectByteBuffer的时候,还需要将堆外内存拷贝到堆内进行读写JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝),因此使用堆外内存+channel的方式,可以避免堆内外内存拷贝,一定程度上也能提高效率。

    • socketChannel.write 分析
    //SocketChannelImpl.java
      public int write(ByteBuffer buf) throws IOException {
            synchronized (writeLock) {
                  ... 忽略不重要代码
                int n = 0;
                try {
                    for (;;) {
                        //调用IOUtil.write写数据
                        n = IOUtil.write(fd, buf, -1, nd);
                        if ((n == IOStatus.INTERRUPTED) && isOpen())
                            continue;
                        return IOStatus.normalize(n);
                    }
                } finally {
                    writerCleanup();
                }
            }
        }
    //IOUtil.java
    static int write(FileDescriptor fd, ByteBuffer src, long position,
                         NativeDispatcher nd)
            throws IOException
        {
            if (src instanceof DirectBuffer)
                 //directBuffer直接走这里
                return writeFromNativeBuffer(fd, src, position, nd);
        }
    
      private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                                 long position, NativeDispatcher nd) throws IOException
        {
            int pos = bb.position();
            int lim = bb.limit();
            assert (pos <= lim);
            int rem = (pos <= lim ? lim - pos : 0);
    
            int written = 0;
            if (rem == 0)
                return 0;
            if (position != -1) {
                written = nd.pwrite(fd,
                                    ((DirectBuffer)bb).address() + pos,
                                    rem, position);
            } else {
                //调用SocketDispatcher写数据
                written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
            }
            if (written > 0)
                bb.position(pos + written);
            return written;
        }
    
    //SocketDispatcher.java
    int write(FileDescriptor fd, long address, int len) throws IOException {
            //直接调用了FileDispatcherImpl的native方法write0
            return FileDispatcherImpl.write0(fd, address, len);
        }
    

    在看native方法之前还是先做简单的梳理:

    1. socketChannel.write 实际调用了SocketChannelImpl.write,然后调用IOUtil.write(fd, buf, -1, nd); 传入文件描述符和堆外内存引用
    2. IOUtil.write调用自己的私有方法 writeFromNativeBuffer,内部调用了written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);,将文件描述符、堆外内存地址交给了NativeDispatcher
    3. 此处的NativeDispatcher实际是 SocketDispatcher,里面直接调用了FileDispatcherImpl.write0(fd, address, len);native方法

    接着跟踪FileDispatcherImpl.write0(fd, address, len);这个native方法:

    // 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileDispatcherImpl.c
    
    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_FileDispatcherImpl_write0(JNIEnv *env, jclass clazz,
                                  jobject fdo, jlong address, jint len)
    {
        //转换文件描述符
        jint fd = fdval(env, fdo);
        //转换为堆外内存指针
        void *buf = (void *)jlong_to_ptr(address);
        //直接调用系统函数write将堆外内存数据发送出去
        return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
    }
    

    可以看到native的write0方法是直接调用系统函数write将堆外内存数据发送出去(write方法的作用在内核零拷贝小节里已经提到了)。

    • 小结
      fileChannel和socketChannel配合directBuffer,本质上区别不大,都是配合系统函数write和read对文件描述符,直接操作堆外内存。因此相比较于BIO可以省去两次拷贝。
    二、channel.transferTo

    java中的零拷贝就是依赖操作系统的sendfile函数来实现的,提供了channel.transferTo方法,允许将一个channel的数据直接发送到另一个channel,接下来我们通过示例代码和具体的源码来分析和验证前面的说法。
    示例代码如下:

    //打开socketChannel
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9099));
    //
    FileChannel fileChannel = FileChannel.open(Paths.get("/test.txt"));
    fileChannel.transferTo(0, fileChannel.size(), socketChannel);
    

    只用了一行代码fileChannel.transferTo(0, fileChannel.size(), socketChannel);就把文件数据写到了socket,继续看源码:

    //FileChannelImpl.java
    public long transferTo(long position, long count,
                               WritableByteChannel target)
            throws IOException
        {
             ... 忽略不重要代码
            long sz = size();
            if (position > sz)
                return 0;
            int icount = (int)Math.min(count, Integer.MAX_VALUE);
            if ((sz - position) < icount)
                icount = (int)(sz - position);
    
            long n;
              //先尝试直接tranfer,如果内核支持的话
            if ((n = transferToDirectly(position, icount, target)) >= 0)
                return n;
            //尝试mappedTransfer,只适用于受信任的channel类型
            if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
                return n;
              //channel不受信任的话,会走最慢的方式
            return transferToArbitraryChannel(position, icount, target);
        }
    
    // FileChannelimpl.java
    private long transferToDirectly(long position, int icount,
                                        WritableByteChannel target)
            throws IOException
        {
            if (!transferSupported)
                //系统不支持就直接返回
                return IOStatus.UNSUPPORTED;
    
            FileDescriptor targetFD = null;
            if (target instanceof FileChannelImpl) { //如果目标是fileChannel则走这里
                if (!fileSupported)
                    return IOStatus.UNSUPPORTED_CASE;
                targetFD = ((FileChannelImpl)target).fd;
            } else if (target instanceof SelChImpl) { 
                //SocketChannel实现了SelChImpl接口,因此会走这里
                if ((target instanceof SinkChannelImpl) && !pipeSupported)
                    return IOStatus.UNSUPPORTED_CASE;
                //给targetFD赋值
                targetFD = ((SelChImpl)target).getFD();
            }
            if (targetFD == null)
                return IOStatus.UNSUPPORTED;
            //将fileChannel和socketChannel对应的fd转换为具体的值
            int thisFDVal = IOUtil.fdVal(fd);
            int targetFDVal = IOUtil.fdVal(targetFD);
            //不支持自己给自己传输
            if (thisFDVal == targetFDVal) 
                return IOStatus.UNSUPPORTED;
    
            long n = -1;
            int ti = -1;
            try {
                begin();
                ti = threads.add();
                if (!isOpen())
                    return -1;
                do { 
                    //调用native方法transferTo0
                    n = transferTo0(thisFDVal, position, icount, targetFDVal);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
                if (n == IOStatus.UNSUPPORTED_CASE) {
                    if (target instanceof SinkChannelImpl)
                        pipeSupported = false;
                    if (target instanceof FileChannelImpl)
                        fileSupported = false;
                    return IOStatus.UNSUPPORTED_CASE;
                }
                if (n == IOStatus.UNSUPPORTED) {
                    // Don't bother trying again
                    transferSupported = false;
                    return IOStatus.UNSUPPORTED;
                }
                return IOStatus.normalize(n);
            } finally {
                threads.remove(ti);
                end (n > -1);
            }
        }
    

    代码有点长:

    1. 调用FileChannelImpl的transferTo,会尝试三种情况,如果系统支持零拷贝,则走 transferToDirectly
    2. transferToDirectly 方法前面做了各种判断,其实可以理解为直接调用了n = transferTo0(thisFDVal, position, icount, targetFDVal);native方法

    再来跟踪transferTo0:

    // 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileChannelImpl.c
    JNIEXPORT jlong JNICALL
    Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                                jint srcFD,
                                                jlong position, jlong count,
                                                jint dstFD)
    {
    #if defined(__linux__)
        off64_t offset = (off64_t)position;
        //直接调用sendfile
        jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
        if (n < 0) {
            if (errno == EAGAIN)
                return IOS_UNAVAILABLE;
            if ((errno == EINVAL) && ((ssize_t)count >= 0))
                return IOS_UNSUPPORTED_CASE;
            if (errno == EINTR) {
                return IOS_INTERRUPTED;
            }
            JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
            return IOS_THROWN;
        }
        return n;
    }
    

    这个方法里其实有linux、solaris、APPLE等多个平台的实现,这里只截取linux下的实现,可以看到是直接调用了系统函数sendfile来实现的数据发送,具体的拷贝次数则要看linux内核的版本了。

    总结

    • NIO读取文件并通过socket发送,最少拷贝几次?
      直接调用channel.transferTo,同时linux内核版本大于等于2.4,则可以将拷贝次数降低到2次,并且CPU不参与拷贝。
    • 堆外内存和所谓的零拷贝到底是什么关系
      笔者理解网上说的零拷贝,可以理解为内核层面的零拷贝和java层面的零拷贝,所谓的0并不是一次拷贝都没有,而是在不同的场景下尽可能减少拷贝次数。

    参考文章

    Java 堆外内存、零拷贝、直接内存以及针对于NIO中的FileChannel的思考
    JavaIO原理剖析之 网络IO

    相关文章

      网友评论

          本文标题:JAVA IO专题二:java NIO读取文件并通过socket

          本文链接:https://www.haomeiwen.com/subject/hgcuaktx.html