美文网首页
java nio 源码分析2 IO

java nio 源码分析2 IO

作者: 不存在的里皮 | 来源:发表于2019-12-17 23:08 被阅读0次

    目的

    一直想知道

    • channel.write返回时, 到底这个数据是交给操作系统了, 还是说已经发出网卡了, 还是说已经发出去收到ACK了. (答案:只是说明它写入了内核的send_queue)
    • java nio是水平触发的, 而且缓冲区超过"低水位"就触发读事件, 不超过"高水位"就触发写事件, 那这个水位到底多高? 缓冲区只什么缓冲区? ByteBuffer还是内核管理的缓冲区?(没看native源码,但应该是socket的recv_queue和send_queue,反正我们只需要根据read和write的返回值、是否抛出异常来决定下一步行为即可。)
    • "低水位"一词来自<unix网络编程卷1>, 本文假设你已经读过.
    • read和write的返回值意义(写的返回值>=0, 读的返回值>= -1, 其中-1代表EOF)

    参考

    write

    getTemporaryDirectBufferSocketChannelImpl值得分析

    探究socketChannel原理提到了socket非直接内存的读写都是两次复制

    读写的原理
    上图便回答开文的第一个问题,当你写出成功返回时,只是说明它写入了内核的send_queue

    SocketChannelImpl::write:

    public int write(ByteBuffer buf) throws IOException {
            ...
            try {
                ...
                for (;;) {
                    n = IOUtil.write(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen())
                        continue;
                    return IOStatus.normalize(n);
                }
            } finally {
                ...
            }
        }
    }
    

    依赖于IOUtil::write

    static int write(FileDescriptor fd, ByteBuffer src, long position,
                         NativeDispatcher nd)
            throws IOException
    {
        if (src instanceof DirectBuffer)
            return writeFromNativeBuffer(fd, src, position, nd);
    
        // Substitute a native buffer
        int pos = src.position();
        int lim = src.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);  // 确定直接内存大小
        ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);  // 分配直接内存
        try {
            bb.put(src);
            bb.flip();
            // Do not update src until we see how many bytes were written
            src.position(pos);
    
            int n = writeFromNativeBuffer(fd, bb, position, nd);  // 直接内存往外写
            if (n > 0) {
                // now update src
                src.position(pos + n);
            }
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }
    

    两个调用很重要:

    1. 通过Util.getTemporaryDirectBuffer分配直接内存
    2. 通过writeFromNativeBuffer(fd, bb, position, nd);从直接内存写数据

    getTemporaryDirectBuffer

     public static ByteBuffer getTemporaryDirectBuffer(int size) {
        BufferCache cache = bufferCache.get(); // 获取线程私有变量
        ByteBuffer buf = cache.get(size);  // 关键,获取指定大小的内存
        if (buf != null) {
            return buf;
        } else {
            // 没有满足条件的直接内存
            // 把第一个内存给释放
            if (!cache.isEmpty()) {
                buf = cache.removeFirst();
                free(buf);
            }
            return ByteBuffer.allocateDirect(size);  // 关键,分配直接内存
        }
    }
    ...
     private static ThreadLocal<BufferCache> bufferCache =
            new ThreadLocal<BufferCache>()
    {
        @Override
        protected BufferCache initialValue() {
            return new BufferCache();
        }
    };
    

    这里我们易看出,线程私有变量bufferCache对直接内存进行了管理, 下面我们要分析该类

    BufferCache::getTemporaryDirectBuffer

    /**
     * A simple cache of direct buffers.
     */
    private static class BufferCache {
        // the array of buffers
        private ByteBuffer[] buffers;
    
        // the number of buffers in the cache
        private int count;
    
        // the index of the first valid buffer (undefined if count == 0)
        private int start;
    
        private int next(int i) {
            return (i + 1) % TEMP_BUF_POOL_SIZE;
        }
    
        BufferCache() {
            buffers = new ByteBuffer[TEMP_BUF_POOL_SIZE];
        }
    ...
    

    懒得分析了,挺简单的一个类,总之就是

    1. 用一个循环数组来管理一组空闲的直接内存,查找内存靠线性遍历查询
    2. next取余说明它是个循环数组
    3. 用start和count来记录有直接内存的元素界限。(buffer[start]是第一个有效的内存索引)
    4. get(int size)会找到第一个大小至少大于size的元素索引。由于将其返回后该位置会为空,就把第一个元素(buffer[start])置于此即可。
    5. getTemporaryDirectBuffer会尽量返回第一个

    思考:多读读代码会发现,如果临时要输出的内存大小突然变大,会导致突然分配一个很大的ByteBuffer元素。如果之后又用不上这么大的内存,就比较浪费, 所以

    《Hadoop技术内幕》提到了可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入DirectBuffer

    IOUtil::writeFromNativeBuffer

    没找到native源码,算了不分析了

    read和write的返回值意义

    我们看到继承链:

    image.png
    强烈建议阅读这两个类的注释
    public interface WritableByteChannel
        extends Channel
    {
    
        /**
         * @return The number of bytes written, possibly zero
         *
         * @throws  ...
         */
        public int write(ByteBuffer src) throws IOException;
    
    }
    
    public interface ReadableByteChannel extends Channel {
    
        /**
         *
         * @return  The number of bytes read, possibly zero, or <tt>-1</tt> if the
         *          channel has reached end-of-stream
         *
         * @throws  ...
         */
        public int read(ByteBuffer dst) throws IOException;
    
    }
    

    上面两个说明了,write会返回的值是0或正数,read会返回的值可能是0, 正数, 或-1(当EOF时)
    然后注释懒得贴上来翻译,我总结一下吧:

    1. 读是从socket的输入缓存(socket's input buffer)读,写是从socket的输出缓存写
    2. nio中,读不一定读满,写也不一定写满,根据返回值来决定下一步行为
    3. 根据Java NIO read() End of Stream read返回-1代表的EOF,应该是说明对方优雅关闭了连接(比如对方正常发出FIN并响应之类的)。如果不优雅关闭,应该会抛出异常。
    4. 引申地说,在nio编程时,如果ByteBuffer没有写完,不应当坚持循环等ByteBuffer写完。最好是注册写事件,等下一次写。

    相关文章

      网友评论

          本文标题:java nio 源码分析2 IO

          本文链接:https://www.haomeiwen.com/subject/iuhenctx.html