Java NIO

作者: Teddy_b | 来源:发表于2018-08-21 21:29 被阅读0次

    # Java NIO #

    Java NIO属于非阻塞IO,这是与传统IO最本质的区别。传统IO包括socket和文件IO都是阻塞式的,即一个动作的执行,必须等待先前的动作完成;非阻塞的IO在线程执行一个动作,不用等待动作执行完,可以去做别的事情,这是因为NIO是基于Channel的,而不是基于流的。每个线程可以同时监听多个注册到Selector上的Channel,   NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写到缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

    ## Buffer 缓冲区 ##

    Java NIO中所有的缓冲区都继承于 Buffer这个抽象类。Buffer类似于一块可以被读写的区域,因此有读模式和写模式两种模式,在Buffer类中通过三个变量控制缓冲区的读和写position(下一个读或写的数组下标),limit(在读模式和写模式下有不同的含义),capacity(数组的容量):

    - 写模式:往数组中写入数据时候,将limit设置为capacity,表示最多可以写入capacity个元素,position就表示下一个写入的下标。通过Buffer的flip()方法可以从写模式切换到读模式。

            public final Buffer flip() {

                limit = position;

                position = 0;

                mark = -1;

                return this;

            }

    - 读模式:从数组中读取数据的时候,将limit设置为position,表示最多只有这么多个元素可以读,position设置为0,表示从头开始读数据。通过调用Buffer的clear()方法或者compact()方法可以从读模式切换到写模式。

            public final Buffer clear() {

                position = 0;

                limit = capacity;

                mark = -1;

                return this;

            }

    clear()方法是不保留数据的(即使有些数据还没有读完),因为直接将position设置为0,表示从头开始写入数据,limit重新设置capacity。

            public ByteBuffer compact() {

                System.arraycopy(hb, ix(position()), hb, ix(0), remaining());

                position(remaining());

                limit(capacity());

                discardMark();

                return this;

            }

    compact()方法不是Buffer类中的方式,是在Buffer的各个子类(ByteBuffer、CharBuffer等)中定义的抽象方法,并且在(ByteBuffer、CharBuffer等)的子类中提供了实现,compact()方法会保留还没读的数据,先将没读的数据拷贝到数组的最前面,然后设置position为下一个写入的下标,limit重写设置为capacity。

    ## ByteBuffer 字节缓冲区 ##

    ByteBuffer继承于Buffer,ByteBuffer仍然是个抽象类,我们只能通过它的allocate(int capacity)方法获取一个非直接字节缓冲区(缓冲区不是直接在内存中的),也可以通过warp(byte[] b)方法创建一个非直接字节缓冲区。如果想要创建一个直接字节缓冲区,可以使用allocateDirect(int capacity)方法,直接缓冲区不是在堆上分配的,因此不受GC的管理,其创建和释放过程比较耗时,但是直接缓冲区上数据的读写比较快。

        //在堆上创建一个字节缓冲区

        public static ByteBuffer allocate(int capacity) {

            if (capacity < 0)

                throw new IllegalArgumentException();

            return new HeapByteBuffer(capacity, capacity);

        }

        //创建一个直接字节缓冲区

        public static ByteBuffer allocateDirect(int capacity) {

            return new DirectByteBuffer(capacity);

        }

    **HeapByteBuffer:**HeapByteBuffer是 ByteBuffer的默认实现类,在堆上创建一个字节缓冲区.HeapByteBuffer主要有以下几个方面的功能:

    - 读数据:

            //获取position处的元素,并将position++

            public byte get() {

                return hb[ix(nextGetIndex())];

            }

            //获取指定下标处的元素,此时position是没有变的。

            public byte get(int i) {

                return hb[ix(checkIndex(i))];

            }

            //获取从position开始的length个元素,并拷贝到指定数组中,position会更新为position + length

            public ByteBuffer get(byte[] dst, int offset, int length) {

                checkBounds(offset, length, dst.length);

                if (length > remaining())

                    throw new BufferUnderflowException();

                System.arraycopy(hb, ix(position()), dst, offset, length);

                position(position() + length);

            return this;

        }

    - 写数据:

            //将指定元素插入到position处,并将position++(如果满了将会抛出异常)

            public ByteBuffer put(byte x) {

                    hb[ix(nextPutIndex())] = x;

                    return this;

            }

            //将指定元素插入到指定位置,position位置不会改变

             public ByteBuffer put(int i, byte x) {

                hb[ix(checkIndex(i))] = x;

                return this;

            }

            //从position处开始写入指定数组中的元素,position会被更新为(position + length)

            public ByteBuffer put(byte[] src, int offset, int length) {

                checkBounds(offset, length, src.length);

                if (length > remaining())

                    throw new BufferOverflowException();

                System.arraycopy(src, offset, hb, ix(position()), length);

                position(position() + length);

                return this;

            }

    - 读写char, int ,short, double等其他基本数据类型

            //获取一个字符,Bits类根据大端还是小端用不同的方式组合两个字节

            public char getChar() {

                return Bits.getChar(this, ix(nextGetIndex(2)), bigEndian);

            }

            //大端存储模式,高位字节保存在低字节部分,因此组合的时候低字节在前面,高字节在后面

            static char getCharB(ByteBuffer bb, int bi) {

                return makeChar(bb._get(bi    ),

                            bb._get(bi + 1));

            }

            //小端存储模式,高位字节保存在高字节部分,因此组合的时候高字节在前面,低字节在后面

            static char getCharL(ByteBuffer bb, int bi) {

                return makeChar(bb._get(bi + 1),

                            bb._get(bi    ));

            }

            //写入一个char字符的时候也是一样

            public ByteBuffer putChar(char x) {

                Bits.putChar(this, ix(nextPutIndex(2)), x, bigEndian);

                return this;

            }

    - 用ByteBuffer包装成其他基本数据类型的缓冲区(CharBuffer、IntBuffer等)

            //ByteBuffer的asCharBuffer()方法

            public CharBuffer asCharBuffer() {

                int size = this.remaining() >> 1;

                int off = offset + position();

                return (bigEndian

                        ? (CharBuffer)(new ByteBufferAsCharBufferB(this,

                                                                       -1,

                                                                       0,

                                                                       size,

                                                                       size,

                                                                       off))

                        : (CharBuffer)(new ByteBufferAsCharBufferL(this,

                                                                       -1,

                                                                       0,

                                                                       size,

                                                                       size,

                                                                       off)));

            }

    asCharBuffer()方法也会根据机器是大端模式还是小端模式,创建不同的对象。如果是大端模式,将会创建ByteBufferAsCharBufferB类对象,由于ByteBufferAsCharBufferB对象是由ByteBuffer对象包装而来的,虽然ByteBufferAsCharBufferB的父类是CharBuffer,但是ByteBufferAsCharBufferB类中操作的并不是字符数组而是字节数组,所以ByteBufferAsCharBufferB类对象在读写的时候仍然借助Bits类完成,即先通过字节组成字符再读写。根据ByteBuffer得到其他类型的缓冲区也是一样的实现原理。

    - 创建只读型缓冲区

            public ByteBuffer asReadOnlyBuffer() {

                return new HeapByteBufferR(hb,

                                             this.markValue(),

                                             this.position(),

                                             this.limit(),

                                             this.capacity(),

                                             offset);

            }

    asReadOnlyBuffer()会创建HeapByteBufferR类对象,每种缓冲区都可以创建对应的只读型缓冲区,只读型缓冲区就直接继承创建它的这个类,如HeapByteBufferR继承于HeapByteBuffer,在HeapByteBufferR类中重写所有的put方法,所有的put方法都直接抛出异常,而get方法仍然使用父类的get方法。

    **MappedByteBuffer**继承于ByteBuffer,是直接字节数组的抽象父类,在MappedByteBuffer类中只定义了三个和物理磁盘相关的方法:

    - isLoaded():用于判断这个字节数组是否存在物理磁盘上

    - load():将直接字节数组中的数据写到物理磁盘上

    - force():强制将直接字节数组中的数据写到物理磁盘上

    **DirectByteBuffer**继承于MappedByteBuffer,用于创建直接字节数组的类。由于直接字节数组不是在堆上分配内存,因此不受GC控制,创建和释放过程比较繁琐,通过通过Unsafe类的allocateMemory()方法分配内存空间,而且DirectByteBuffer的所有get()和put()方法都是通过Unsafe类完成

        //读取position处的元素

        public byte get() {

            return ((unsafe.getByte(ix(nextGetIndex()))));

        }

        //在position处写入元素

        public ByteBuffer put(byte x) {

            unsafe.putByte(ix(nextPutIndex()), ((x)));

            return this;

        }

    ## Channel ##

    关于同步、异步、阻塞和非阻塞的区别:同步和异步说的是消息的通知机制,阻塞非阻塞说的是线程的状态 。

    - 同步阻塞IO:client在调用read()方法时,stream里没有数据可读,线程停止向下执行,直至stream有数据。

    - 同步非阻塞IO:client在调用read()方法时,stream里没有数据可读,read()方法就返回了,线程可以去干别的事,但是需要有一个线程监听这stream中是否有数据准备好。

    - 异步非阻塞IO:服务端调用read()方法,若stream中无数据则返回,程序继续向下执行。当stream中有数据时,操作系统会负责把数据拷贝到用户空间,然后通知这个线程,这里的消息通知机制就是异步!

    NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写道缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

    **Channel**:是NIO中通道的接口类,只提供了两个抽象方法,isOpen()判断通道是否打开,close()关闭一个通道。

    **AbstractInterruptibleChannel**:是一个抽象类,实现了Channel接口。任何一个通道如果想要实现在中断时实现异步关闭通道,那么必须继承这个类,这主要体现在两个方面:

    - 当某个线程阻塞在channel上,而另一个线程调用了channel的close()方法,那么阻塞的线程会收到AsynchronousCloseException

    - 如果某个线程阻塞在channel上,另一个线程调用了阻塞线程的interrupt()方法,那么阻塞线程会收到ClosedByInterruptException,并且通道会被关闭。

    AbstractInterruptibleChannel抽象类中定义了一组协同方法begin()和end()方法来完成这两个功能,因此当线程执行一个可能阻塞的IO操作时,必须把这个IO操作放在begin()方法和end()方法之间,才能实现channel的异步关闭。

        protected final void begin() {

            if (interruptor == null) {

                interruptor = new Interruptible() {

                        public void interrupt(Thread target) {

                            synchronized (closeLock) {

                                if (!open)

                                    return;

                                open = false;

                                interrupted = target;

                                try {

                                    //收到中断请求后会回调AbstractInterruptibleChannel类的close()方法关闭通道

                                    AbstractInterruptibleChannel.this.implCloseChannel();

                                } catch (IOException x) { }

                            }

                        }};

            }

            blockedOn(interruptor);

            Thread me = Thread.currentThread();

            if (me.isInterrupted())

                interruptor.interrupt(me);

        }

        protected final void end(boolean completed)

            throws AsynchronousCloseException

        {

            blockedOn(null);

            Thread interrupted = this.interrupted;

            //中断触发器不为空,会抛出ClosedByInterruptException

            if (interrupted != null && interrupted == Thread.currentThread()) {

                interrupted = null;

                throw new ClosedByInterruptException();

            }

            //触发器为空,没有中断,但是在阻塞的过程中channel被关闭了,抛出AsynchronousCloseException

            if (!completed && !open)

                throw new AsynchronousCloseException();

        }

    **SelectableChannel**:是一个抽象类,继承于AbstractInterruptibleChannel。需要注册到Selector上的通道必须继承这个类

    - public abstract int validOps();获取这个通道支持的事件

    - public abstract boolean isRegistered();通道是否注册到了某个Selector上

    - public abstract SelectionKey keyFor(Selector sel);这个通道在指定Selector上注册的事件

    - public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;注册这个通道到指定的Selector上

    - public abstract SelectableChannel configureBlocking(boolean block) throws IOException;修改这个通道为非阻塞或阻塞

    - public abstract boolean isBlocking();如果这个通道是阻塞模式,返回true

    **AbstractSelectableChannel**:是一个抽象类,是SelectableChannel的基础实现类。在AbstractSelectableChannel类中定义了一个SelectionKey数组,记录这个channel注册到了哪些Selector上,定义了一个keyCount记录这个channel注册的次数。并且channel的最初模式设置为阻塞模式。

    - 判断这个channel是否注册了:keyCount不为0就表示注册了

            public final boolean isRegistered() {

                synchronized (keyLock) {

                    return keyCount != 0;

                }

            }

    - 获取这个通道在指定Selector上的注册事件,给定Selector,在SelectionKey[]数组中查找Selector与指定Selector相等的SelectionKey

            public final SelectionKey keyFor(Selector sel) {

                return findKey(sel);

            }

    - 将这个通道注册到指定Selector上,如果给定的事件ops不是这个通道支持的事件validOps()将会抛出异常,而且阻塞的channel无法注册到Selector上,具体在注册的时候如果channel已经注册到了这个Selector上,那么更新ops和附加信息,如果这个channel还没有注册到这个Selector上,将会调用Selector 类的register(SelectableChannel ch, int ops, Object o)方法完成注册

            public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{

                synchronized (regLock) {

                    if (!isOpen())

                        throw new ClosedChannelException();

                    if ((ops & ~validOps()) != 0)

                        throw new IllegalArgumentException();

                    if (blocking)

                        throw new IllegalBlockingModeException();

                    SelectionKey k = findKey(sel);

                    if (k != null) {

                        k.interestOps(ops);

                        k.attach(att);

                    }

                    if (k == null) {

                        // New registration

                        synchronized (keyLock) {

                            if (!isOpen())

                                throw new ClosedChannelException();

                            k = ((AbstractSelector)sel).register(this, ops, att);

                            addKey(k);

                        }

                    }

                    return k;

                }

            }

    - 关闭通道除了需要关闭通道之外,还需要把SelectionKey[]数组上的SelectionKey取消,而SelectionKey类中的cancel()方法又会调用AbstractSelector中的cancel()方法,AbstractSelector类中的cancel()方法将这个SelectionKey加入到cancelledKeys集合中。

            protected final void implCloseChannel() throws IOException {

                implCloseSelectableChannel();

                synchronized (keyLock) {

                    int count = (keys == null) ? 0 : keys.length;

                    for (int i = 0; i < count; i++) {

                        SelectionKey k = keys[i];

                        if (k != null)

                            k.cancel();

                    }

                }

            }

    - 判断channel是否是阻塞模式,只有非阻塞channel才能注册到Selector上

            public final boolean isBlocking() {

                synchronized (regLock) {

                    return blocking;

                }

            }

    - 修改channel的阻塞模式

            public final SelectableChannel configureBlocking(boolean block) throws IOException {

                synchronized (regLock) {

                    if (!isOpen())

                        throw new ClosedChannelException();

                    if (blocking == block)

                        return this;

                    if (block && haveValidKeys())

                        throw new IllegalBlockingModeException();

                    implConfigureBlocking(block);

                    blocking = block;

                }

                return this;

            }

    **SelectionKey**:每次channel注册到一个Selector都会返回一个SelectionKey对象,因此SelectionKey描述的是一次注册事件中channel和Selector之间的映射关系。

    - public abstract SelectableChannel channel();获取SelectionKey对应的channel

    - public abstract Selector selector();获取对应SelectionKey对应的channel

    - public abstract boolean isValid();这个SelectionKey是否有效

    - public abstract void cancel();把这个SelectionKey置为无效

    - public abstract int interestOps();获取这个SelectionKey关注的事件

    - public abstract SelectionKey interestOps(int ops);设置这个SelectionKey关注的事件

    - public abstract int readyOps();这个SelectionKey关注的事件中就绪了的事件

    - public static final int OP_READ = 1 << 0;读事件

    - public static final int OP_WRITE = 1 << 2;写事件

    - public static final int OP_CONNECT = 1 << 3;channel连接到服务器的连接事件

    - public static final int OP_ACCEPT = 1 << 4;服务器准备好了接受连接事件

    - channel上有读事件就绪、写事件就绪(isWritable)、连接事件就绪(isConnectable)、接受连接事件就绪(isAcceptable)

        public final boolean isReadable() {

                return (readyOps() & OP_READ) != 0;

            }

    **AbstractSelectionKey**:是一个抽象类,是SelectionKey的基础实现类。一个channel注册到一个Selector上返回一个SelectionKey,这个SelectionKey初始就是有效的。

    - 取消一个SelectionKey,先将SelectionKey置为无效,然后调用Selector的cancel(SelectionKey key)方法完成具体的取消操作

            public final void cancel() {

                synchronized (this) {

                    if (valid) {

                        valid = false;

                        ((AbstractSelector)selector()).cancel(this);

                    }

                }

            }

    **SelectionKeyImpl**:SelectionKey的最终实现类,继承于AbstractSelectionKey。在SelectionKeyImpl类中定义了int类型interestOps、int类型的readyOps,实现了SelectionKey抽象类中的interestOps()、readyOps()、interestOps(int ops)这三个方法。

    ## Selector ##

    Selector是NIO得以实现的核心模块之一,NIO属于同步非阻塞IO,同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;

    **Selector**:是一个抽象类。提供了静态方法open()创建一个Selector对象,open()方法是使用SelectorProvider类完成的。

    - public abstract boolean isOpen();这个Selector是否打开了

    - public abstract SelectorProvider provider();获取创建这个Selector的SelectorProvider

    - public abstract Set keys();获取这个Selector上所有的注册的channel对应的SelectionKey

    - public abstract Set selectedKeys();获取这个Selector上所有就绪了的channel对应的SelectionKey

    - public abstract int selectNow() throws IOException;非阻塞的执行一次选择操作,没有就绪的channel就立即返回0,否则返回有多少个channel就绪了。

    - public abstract int select(long timeout) throws IOException;执行一次选择操作,并且阻塞一段时间,在这段时间里,如果有channel就绪了,将会返回有多少个channel就绪了,如果到达了指定时间还没有channel就绪,就返回0.

    - public abstract int select() throws IOException;阻塞执行选择操作,一直阻塞到有channel就绪,然后返回有多少个channel就绪了

    - public abstract Selector wakeup();使阻塞在select()方法上的线程立即返回。

            Selector selector = Selector.open();

            channel.configureBlocking(false);

            SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

            while(true) {

              int readyChannels = selector.select();

              if(readyChannels == 0) continue;

              Set selectedKeys = selector.selectedKeys();

              Iterator keyIterator = selectedKeys.iterator();

              while(keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if(key.isAcceptable()) {

                    // a connection was accepted by a ServerSocketChannel.

                } else if (key.isConnectable()) {

                    // a connection was established with a remote server.

                } else if (key.isReadable()) {

                    // a channel is ready for reading

                } else if (key.isWritable()) {

                    // a channel is ready for writing

                }

                keyIterator.remove();

              }

            }

    **AbstractSelector**:是一个抽象类,是Selector的基础实现类.

    - 定义了一个boolean类型的open变量,初始化为true,表示一个Selector创建时就为打开状态。

    - 定义了一个cancelledKeys集合,表示已经取消的SelectionKey的集合。SelectionKey中的cancel()方法会调用AbstractSelector类的cancel()方法,cancel()方法将SelectionKey加入到cancelledKeys集合中。

            void cancel(SelectionKey k) {                       // package-private

                synchronized (cancelledKeys) {

                    cancelledKeys.add(k);

                }

            }

    - 定义了一个SelectorProvider对象,用于实现Selector类中的provider()方法。

    - 定义了一个反注册方法deregister(SelectionKey key),调用的是channel的方法,将key从SelectionKey[] keys数组中移除

            protected final void deregister(AbstractSelectionKey key) {

                ((AbstractSelectableChannel)key.channel()).removeKey(key);

            }

    - 定义了一组协同方法begin()和end(),与AbstractInterruptibleChannel类中协同方法类似,在一个可能阻塞的IO操作前使用begin()方法,在IO操作之后使用end()方法,但是这里的协同方法是为了在产生中断之后使select()方法立即返回。

            protected final void begin() {

                if (interruptor == null) {

                    interruptor = new Interruptible() {

                            public void interrupt(Thread ignore) {

                                //产生中断之后,调用wakeup()方法唤醒select()方法

                                AbstractSelector.this.wakeup();

                            }};

                }

                AbstractInterruptibleChannel.blockedOn(interruptor);

                Thread me = Thread.currentThread();

                if (me.isInterrupted())

                    interruptor.interrupt(me);

            }

    **SelectorImpl**:仍然是一个抽象类,继承于AbstractSelector类,进一步实现了Selector类。

    - 定义了一个keys集合,用于实现Selector类的keys()方法,直接返回这个集合。

    - 定义了一个selectedKeys集合,用于实现Selector类的selectedKeys()方法,直接返回这个集合。

    - 将三个select()方法均委托给一个抽象方法,待子类进一步实现。

            //委托给lockAndDoSelect()方法

            public int select(long timeout) throws IOException{

                      if (timeout < 0)

                          throw new IllegalArgumentException("Negative timeout");

                      return lockAndDoSelect((timeout == 0) ? -1 : timeout);

            }

            //调用上一个方法

            public int select() throws IOException {

                 return select(0);

            }

            //委托给lockAndDoSelect()方法

            public int selectNow() throws IOException {

                 return lockAndDoSelect(0);

            }

            //同步锁,进一步委托给doSelect(long time)这个抽象方法。

            private int lockAndDoSelect(long timeout) throws IOException {

                  synchronized (this) {

                      if (!isOpen())

                          throw new ClosedSelectorException();

                      synchronized (publicKeys) {

                          synchronized (publicSelectedKeys) {

                              return doSelect(timeout);

                          }

                      }

                  }

              }

    - 进一步实现了close()方法,但是没有完全实现,还是委托给一个抽象方法

    //先调用wakeup()方法使select()方法立即返回,然后同步锁,调用抽象方法implClose()

            public void implCloseSelector() throws IOException {

                 wakeup();

                 synchronized (this) {

                     synchronized (publicKeys) {

                         synchronized (publicSelectedKeys) {

                             implClose();

                         }

                     }

                 }

             }

    - 初步实现了register(channel, int ops, Object att)方法,委托给抽象方法implRegister(SelectionKey k)方法

            protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment){

                 if (!(ch instanceof SelChImpl))

                     throw new IllegalSelectorException();

                //新建一个SelectionKey对象

                 SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);

                //设置附加信息

                 k.attach(attachment);

                 synchronized (publicKeys) {

                     implRegister(k);

                 }

                //设置感兴趣事件

                 k.interestOps(ops);

                 return k;

             }

    - 定义了一个方法处理cancelledKeys集合,委托给抽象方法implDereg(SelectionKey k)方法

               void processDeregisterQueue() throws IOException {

                     // Precondition: Synchronized on this, keys, and selectedKeys

                    //获取cancelledKeys集合

                     Set cks = cancelledKeys();

                     synchronized (cks) {

                         if (!cks.isEmpty()) {

                             Iterator i = cks.iterator();

                             while (i.hasNext()) {

                                 SelectionKeyImpl ski = (SelectionKeyImpl)i.next();

                                 try {

                                     implDereg(ski);

                                 } catch (SocketException se) {

                                     IOException ioe = new IOException("Error deregistering key");

                                     ioe.initCause(se);

                                     throw ioe;

                                 } finally {

                                     i.remove();

                                 }

                             }

                         }

                     }

                 }

    ## SelectorProvider ##

    SelectorProvider为Selector、DatagramChannel、SocketChannel、ServerSocketChannel、Pipe这些Selector和channel提供打开方法。

    **SelectorProvider**:是一个抽象类

    -     public abstract DatagramChannel openDatagramChannel() throws IOException;打开UDP通信channel

    -     public abstract Pipe openPipe() throws IOException;打开一个管道

    -     public abstract AbstractSelector openSelector() throws IOException;打开一个Selector

    - public abstract ServerSocketChannel openServerSocketChannel() throws IOException;打开一个服务器socket channel

    -     public abstract SocketChannel openSocketChannel() throws IOException;打开一个TCP通信channel

    **SelectorProviderImpl**:是一个抽象类,是SelectorProvider的基础实现类。

    - 打开UDP通信channel,返回的是UDP channel的实现类对象。

             public DatagramChannel openDatagramChannel() throws IOException   {  

                return new DatagramChannelImpl(this);  

            }  

    - 打开TCP通信channel,返回的是Socket channel的实现类对象。

            public SocketChannel openSocketChannel() throws IOException  {  

                return new SocketChannelImpl(this);  

            }  

    - 打开TCP通信服务器端的channel,返回的是ServerSocket channel的实现类对象。

            public ServerSocketChannel openServerSocketChannel() throws IOException  {  

                return new ServerSocketChannelImpl(this);  

            } 

    - 打开一个管道,返回的是pipe实现类对象。

            public Pipe openPipe() throws IOException  {  

                return new PipeImpl(this);  

            }  

    - 打开Selector的方法没有实现,待子类实现。

    **WindowsSelectorProvider **:SelectorProvider的最终实现类,继承于SelectorProviderImpl。实现了Selector的打开方法

    - 打开Selector。调用的是Selector的最终实现类WindowsSelectorImpl,通过WindowsSelectorImpl的构造函数返回一个Selector

            public AbstractSelector openSelector() throws IOException {

                  return new WindowsSelectorImpl(this);

            }

    ## 再看 channel ##

    **FileChannel**:是一个抽象类,继承于AbstractInterruptibleChannel类,没有继承SelectableChannel,即FileChannel无法注册到Selector上。FileChannel也无法以非阻塞模式读写。通过阻塞方式对文件读写。

    - 打开一个FileChannel,一般通过传统IO流获取FileChannel。但是FileChannel类中也定义了open()函数打开一个FileChannel,getChannel()方法底层就是调用的FileChannel的open()方法

            FileInputStream fis = new FileInputStream("C:\\mycode\\hello.txt");

            FileChannel inChannel = fis.getChannel();

    - 从FileChannel读取数据:public abstract int read(ByteBuffer dst) throws IOException;将通道中的数据读出来,并写道指定ByteBuffer中.FileChannel也支持分散写,即写到多个ByteBuffer中.

            public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

    - 向FileChannel写数据: public abstract int write(ByteBuffer src) throws IOException;将ByteBuffer缓冲区中的数据写入到channel中.FileChannel也支持聚集写,即多个ByteBuffer中的数据写到channel中.

            public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

    - 在FileChannel的某个特定位置进行数据的读/写操作,改变文件position的位置.

            //获取文件position的位置

            public abstract long position() throws IOException;

            //设置文件position

            public abstract FileChannel position(long newPosition) throws IOException;

    - 将channel中的数据写入到另一个channel,或将另一个channel中的数据写入到这个channel中

            //将本channel中的数据写入到另一个“可写入”的channel,从另一个channel的position处开始写入

            public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

            //将另一个channel中的数据写入到本channel中,从另一个channel的position处开始

            public abstract long transferFrom(ReadableByteChannel src, long position, long count) throws IOException;

    - 获取channel关联的文件的大小

            public abstract long size() throws IOException;

    - 截取一个指定大小的文件,截断channel关联的文件为size大小,size之后的数据会被丢弃

            public abstract FileChannel truncate(long size) throws IOException;

    **DatagramChannel**:是一个抽象类,真正的实现类是其子类DatagramChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。UDP通信的channel。

    - 创建一个DatagramChannel。调用DatagramChannel的静态方法open(),通过SelectorProvider去创建DatagramChannelImpl的实例

            public static DatagramChannel open() throws IOException {

                return SelectorProvider.provider().openDatagramChannel();

            }

    - DatagramChannel支持的事件:读和写

            public final int validOps() {

                return (SelectionKey.OP_READ | SelectionKey.OP_WRITE);

            }

    - 报文流本来是无连接的,在没有连接到一个指定地址时,channel可以同时发送数据报到多个远程地址、也可以同时从多个远程地址接收数据报。通过DatagramChannel的send(ByteBuffer src, SocketAddress add)方法发送数据报到指定远程地址,通过DatagramChannel的receive(ByteBuffer dst)方法从任意远程地址接收数据报,receive()方法会返回一个SocketAddress对象用以标识数据报来自哪个远程地址。在没有建立连接的时候,每一次调用send()方法发送数据报或者调用receive()方法接收数据报时都会接收安全检查。

            //发送数据报到指定远程地址

            public abstract int send(ByteBuffer src, SocketAddress target) throws IOException;

            //从任意远程地址接收数据报,并返回数据来自哪个地址

            public abstract SocketAddress receive(ByteBuffer dst) throws IOException;

    - 报文流也可以建立连接。建立连接后,channel将只能从指定的远程地址接收数据报、同时也只能发送数据报到指定的远程地址。由于已经连接到了指定的远程地址,因此在发送或者接收数据报的时候可以调用write()方法已经read()方法。write(ByteBuffer src)方法将数据报发送到指定远程地址、read(ByteBuffer dst)方法从指定远程地址接收数据报。指定连接到指定远程地址的channel才能调用write()方法和read()方法,每次调用write()方法和read()方法时不需要接收安全检查。**将DatagramChannel置于已连接的状态可以使除了它所“连接”到的地址之外的任何其他源地址的数据报被忽略。这是很有帮助的,因为不想要的包都已经被网络层丢弃了,从而避免了使用代码来接收、检查然后丢弃包的麻烦。**

            //将channel连接到指定远程地址

            public abstract DatagramChannel connect(SocketAddress remote) throws IOException;

            //断开channel与远程地址间的连接

            public abstract DatagramChannel disconnect() throws IOException;

            //channel是否连接到了某个远程地址

            public abstract boolean isConnected();

            //发送数据报到指定远程地址,支持聚集写

            public abstract int write(ByteBuffer src) throws IOException;

            public final long write(ByteBuffer[] srcs) throws IOException {

                return write(srcs, 0, srcs.length);

            }

            public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

            //从指定远程地址接收数据报,支持分散读

            public abstract int read(ByteBuffer dst) throws IOException;

            public final long read(ByteBuffer[] dsts) throws IOException {

                return read(dsts, 0, dsts.length);

            }

            public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

    - 如果channel处于阻塞模式:调用send()方法或者write()方法,调用线程可能会休眠直到数据报被加入传输队列。如果channel是非阻塞的:send()方法或者write()方法、read()返回值要么是字节缓冲区的字节数,要么是“0”,receive()方法的返回值要么是远程地址对象要么是null。

    - 报文流可以绑定也可以不绑定。如果channel负责监听,那么必须绑定到一个指定端口,channel将会一直监听这个端口。当channel没有绑定的时候,仍然能够接收数据报,使用的是动态分配的端口号。已经绑定的channel将从指定端口接收或者发送数据报。

    - 报文流是不可靠传输。1)假如receive()方法提供的ByteBuffer没有足够的剩余空间来存放您正在接收的数据包,没有被填充的字节都会被悄悄地丢弃。2)如果send()方法给定的ByteBuffer传输队列没有足够空间来承载整个数据报,那么什么内容都不会被发送。3)传输过程中的协议可能将数据报分解成碎片。例如,以太网不能传输超过1,500个字节左右的包。如果您的数据报比较大,那么就会存在被分解成碎片的风险,成倍地增加了传输过程中包丢失的几率。被分解的数据报在目的地会被重新组合起来,接收者将看不到碎片。但是,如果有一个碎片不能按时到达,那么整个数据报将被丢弃。

    **SocketChannel**:是一个抽象类,真正的实现类是其子类SocketChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。客户端的TCP通信的channel。

    - 创建一个SocketChannel对象,通过SocketChannel的静态方法open委托给SelectorProvider类创建SocketChannelImpl类对象。

            public static SocketChannel open() throws IOException {

                return SelectorProvider.provider().openSocketChannel();

            }

    - SocketChannel支持的事件:读、写、发起连接

        public final int validOps() {

            return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);

        }

    - channel必须在使用之前连接到远程地址,在阻塞模式下,连接操作会一直阻塞直到连接成功或者失败;在非阻塞模式下,连接操作即使没有连接成功也会立刻返回,因此需要通过finishConnect()方法判断是否连接成功并一直尝试连接。

            //channel是否成连接到一个远程地址

            public abstract boolean isConnected();

            //channel是否正处于连接中

            public abstract boolean isConnectionPending();

            //channel连接一个远程地址

            public abstract boolean connect(SocketAddress remote) throws IOException;

            //channel是否完成了连接

            public abstract boolean finishConnect() throws IOException;

            while(!channel.finishConnect()){

                //由于必须在连接成功之后才能进行IO操作,必须等待连接成功

                doSomethingElse();

            }

    - 往channel中写数据,支持聚集写。写完之后,Selector的select()方法会检测到这个channel的WRITE事件就绪了

            public abstract int write(ByteBuffer src) throws IOException;

            public final long write(ByteBuffer[] srcs) throws IOException {

                return write(srcs, 0, srcs.length);

            }

            public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

    - 从channel中读取数据,支持分散读。Selector的select()方法会检测到这个channel的READ事件就绪了

            public abstract int read(ByteBuffer dst) throws IOException;

            public final long read(ByteBuffer[] dsts) throws IOException {

                return read(dsts, 0, dsts.length);

            }    

            public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

    SocketChannel的一个实例:

            public class MyClient {

                private static Selector selector = null;

                private volatile static boolean stop = false;

                private static SocketChannel channel = null;

                public static void main(String[] args) {

                    selector = Selector.open();

                    try {

                        channel = SocketChannel.open();

                        channel.configureBlocking(false);

                        channel.connect(new InetSocketAddress("127.0.0.1", 7777));

                        //注册一个连接请求事件

                        channel.register(selector, SelectionKey.OP_CONNECT);

                        try {

                            while (!stop) {

                                selector.select();

                                Set selectedKeys = selector.selectedKeys();

                                Iterator iterator = selectedKeys.iterator();

                                while (iterator.hasNext()) {

                                    SelectionKey key = iterator.next();

                                    //连接就绪

                                    if (key.isConnectable()) {

                                        //服务器接收了连接请求

                                        SocketChannel sc = (SocketChannel) key.channel();

                                        if (sc.finishConnect()) {

                                            // 将关注的事件变成read

                                            sc.register(selector, SelectionKey.OP_READ);

                                            doWrite(sc, "dddddd");

                                        }

                                    }

                                    // 读就绪

                                    if (key.isReadable()) {

                                        //服务器有数据过来了,处理数据,再发送数据

                                    }

                                    iterator.remove();

                                }

                            }

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    } catch (ClosedChannelException e) {

                        System.out.println("client: 失去主机连接");

                        e.printStackTrace();

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

    **ServerSocketChannel**:是一个抽象类,真正的实现类是其子类ServerSocketChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。服务器端的TCP通信的channel。

    - 创建一个ServerSocketChannel,通过ServerSocketChannel的静态方法open委托给SelectorProvider类创建ServerSocketChannelImpl类对象。

            public static ServerSocketChannel open() throws IOException {

                return SelectorProvider.provider().openServerSocketChannel();

            }

    - ServerSocketChannel支持的事件:接收连接

            public final int validOps() {

                return SelectionKey.OP_ACCEPT;

            }

    - ServerSocketChannel必须先绑定到一个端口上,一直监听这个端口。

            public abstract ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException;

    - 获取与这个ServerSocketChannel关联的SocketChannel,即发起连接请求的SocketChannel

            public abstract SocketChannel accept() throws IOException;

    ServerSocketChannel的一个实例:

        public class MyService {

            public static Selector selector = null;

            public static void main(String[] args) {

                selector = Selector.open();// 打开selector

                ServerSocketChannel server = ServerSocketChannel.open();

                server.socket().bind(new InetSocketAddress(7777), 1024);

                server.configureBlocking(false);

                //服务器开始监听等待连接,注册ACCEPT事件

                server.register(selector, SelectionKey.OP_ACCEPT);

                while (true) {

                    try {

                        selector.select(1000); // 阻塞selector

                        // ================如果有新连接

                        Set selectedKeys = selector.selectedKeys();// 获得事件集合;

                        // ================遍历selectedKeys

                        Iterator iterator = selectedKeys.iterator();

                        SelectionKey key = null;

                        while (iterator.hasNext()) {

                            key = iterator.next();// 获得到当前的事件

                            // ===============处理事件

                             // 连接就绪,有客户端请求连接,注册的事件发生

                            if (key.isAcceptable()) {

                                // 获得对应的ServerSocketChannel

                                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                                // 得到对应的SocketChannel 

                                SocketChannel channel = ssc.accept();

                                // 处理socketChannel

                                channel.configureBlocking(false); 

                                channel.register(selector, SelectionKey.OP_READ); 

                            }

                            // 读就绪

                            if (key.isReadable()) {

                                //客户端有数据过来了,之前注册的READ事件来了

                            }

                            // ===============

                            iterator.remove(); // 移除事件

                        }

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

    相关文章

      网友评论

          本文标题:Java NIO

          本文链接:https://www.haomeiwen.com/subject/lveyyftx.html