Java NIO

作者: Teddy_b | 来源:发表于2018-08-21 21:29 被阅读0次

# Java NIO #

Java NIO属于非阻塞IO,这是与传统IO最本质的区别。传统IO包括socket和文件IO都是阻塞式的,即一个动作的执行,必须等待先前的动作完成;非阻塞的IO在线程执行一个动作,不用等待动作执行完,可以去做别的事情,这是因为NIO是基于Channel的,而不是基于流的。每个线程可以同时监听多个注册到Selector上的Channel,   NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写到缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

## Buffer 缓冲区 ##

Java NIO中所有的缓冲区都继承于 Buffer这个抽象类。Buffer类似于一块可以被读写的区域,因此有读模式和写模式两种模式,在Buffer类中通过三个变量控制缓冲区的读和写position(下一个读或写的数组下标),limit(在读模式和写模式下有不同的含义),capacity(数组的容量):

- 写模式:往数组中写入数据时候,将limit设置为capacity,表示最多可以写入capacity个元素,position就表示下一个写入的下标。通过Buffer的flip()方法可以从写模式切换到读模式。

        public final Buffer flip() {

            limit = position;

            position = 0;

            mark = -1;

            return this;

        }

- 读模式:从数组中读取数据的时候,将limit设置为position,表示最多只有这么多个元素可以读,position设置为0,表示从头开始读数据。通过调用Buffer的clear()方法或者compact()方法可以从读模式切换到写模式。

        public final Buffer clear() {

            position = 0;

            limit = capacity;

            mark = -1;

            return this;

        }

clear()方法是不保留数据的(即使有些数据还没有读完),因为直接将position设置为0,表示从头开始写入数据,limit重新设置capacity。

        public ByteBuffer compact() {

            System.arraycopy(hb, ix(position()), hb, ix(0), remaining());

            position(remaining());

            limit(capacity());

            discardMark();

            return this;

        }

compact()方法不是Buffer类中的方式,是在Buffer的各个子类(ByteBuffer、CharBuffer等)中定义的抽象方法,并且在(ByteBuffer、CharBuffer等)的子类中提供了实现,compact()方法会保留还没读的数据,先将没读的数据拷贝到数组的最前面,然后设置position为下一个写入的下标,limit重写设置为capacity。

## ByteBuffer 字节缓冲区 ##

ByteBuffer继承于Buffer,ByteBuffer仍然是个抽象类,我们只能通过它的allocate(int capacity)方法获取一个非直接字节缓冲区(缓冲区不是直接在内存中的),也可以通过warp(byte[] b)方法创建一个非直接字节缓冲区。如果想要创建一个直接字节缓冲区,可以使用allocateDirect(int capacity)方法,直接缓冲区不是在堆上分配的,因此不受GC的管理,其创建和释放过程比较耗时,但是直接缓冲区上数据的读写比较快。

    //在堆上创建一个字节缓冲区

    public static ByteBuffer allocate(int capacity) {

        if (capacity < 0)

            throw new IllegalArgumentException();

        return new HeapByteBuffer(capacity, capacity);

    }

    //创建一个直接字节缓冲区

    public static ByteBuffer allocateDirect(int capacity) {

        return new DirectByteBuffer(capacity);

    }

**HeapByteBuffer:**HeapByteBuffer是 ByteBuffer的默认实现类,在堆上创建一个字节缓冲区.HeapByteBuffer主要有以下几个方面的功能:

- 读数据:

        //获取position处的元素,并将position++

        public byte get() {

            return hb[ix(nextGetIndex())];

        }

        //获取指定下标处的元素,此时position是没有变的。

        public byte get(int i) {

            return hb[ix(checkIndex(i))];

        }

        //获取从position开始的length个元素,并拷贝到指定数组中,position会更新为position + length

        public ByteBuffer get(byte[] dst, int offset, int length) {

            checkBounds(offset, length, dst.length);

            if (length > remaining())

                throw new BufferUnderflowException();

            System.arraycopy(hb, ix(position()), dst, offset, length);

            position(position() + length);

        return this;

    }

- 写数据:

        //将指定元素插入到position处,并将position++(如果满了将会抛出异常)

        public ByteBuffer put(byte x) {

                hb[ix(nextPutIndex())] = x;

                return this;

        }

        //将指定元素插入到指定位置,position位置不会改变

         public ByteBuffer put(int i, byte x) {

            hb[ix(checkIndex(i))] = x;

            return this;

        }

        //从position处开始写入指定数组中的元素,position会被更新为(position + length)

        public ByteBuffer put(byte[] src, int offset, int length) {

            checkBounds(offset, length, src.length);

            if (length > remaining())

                throw new BufferOverflowException();

            System.arraycopy(src, offset, hb, ix(position()), length);

            position(position() + length);

            return this;

        }

- 读写char, int ,short, double等其他基本数据类型

        //获取一个字符,Bits类根据大端还是小端用不同的方式组合两个字节

        public char getChar() {

            return Bits.getChar(this, ix(nextGetIndex(2)), bigEndian);

        }

        //大端存储模式,高位字节保存在低字节部分,因此组合的时候低字节在前面,高字节在后面

        static char getCharB(ByteBuffer bb, int bi) {

            return makeChar(bb._get(bi    ),

                        bb._get(bi + 1));

        }

        //小端存储模式,高位字节保存在高字节部分,因此组合的时候高字节在前面,低字节在后面

        static char getCharL(ByteBuffer bb, int bi) {

            return makeChar(bb._get(bi + 1),

                        bb._get(bi    ));

        }

        //写入一个char字符的时候也是一样

        public ByteBuffer putChar(char x) {

            Bits.putChar(this, ix(nextPutIndex(2)), x, bigEndian);

            return this;

        }

- 用ByteBuffer包装成其他基本数据类型的缓冲区(CharBuffer、IntBuffer等)

        //ByteBuffer的asCharBuffer()方法

        public CharBuffer asCharBuffer() {

            int size = this.remaining() >> 1;

            int off = offset + position();

            return (bigEndian

                    ? (CharBuffer)(new ByteBufferAsCharBufferB(this,

                                                                   -1,

                                                                   0,

                                                                   size,

                                                                   size,

                                                                   off))

                    : (CharBuffer)(new ByteBufferAsCharBufferL(this,

                                                                   -1,

                                                                   0,

                                                                   size,

                                                                   size,

                                                                   off)));

        }

asCharBuffer()方法也会根据机器是大端模式还是小端模式,创建不同的对象。如果是大端模式,将会创建ByteBufferAsCharBufferB类对象,由于ByteBufferAsCharBufferB对象是由ByteBuffer对象包装而来的,虽然ByteBufferAsCharBufferB的父类是CharBuffer,但是ByteBufferAsCharBufferB类中操作的并不是字符数组而是字节数组,所以ByteBufferAsCharBufferB类对象在读写的时候仍然借助Bits类完成,即先通过字节组成字符再读写。根据ByteBuffer得到其他类型的缓冲区也是一样的实现原理。

- 创建只读型缓冲区

        public ByteBuffer asReadOnlyBuffer() {

            return new HeapByteBufferR(hb,

                                         this.markValue(),

                                         this.position(),

                                         this.limit(),

                                         this.capacity(),

                                         offset);

        }

asReadOnlyBuffer()会创建HeapByteBufferR类对象,每种缓冲区都可以创建对应的只读型缓冲区,只读型缓冲区就直接继承创建它的这个类,如HeapByteBufferR继承于HeapByteBuffer,在HeapByteBufferR类中重写所有的put方法,所有的put方法都直接抛出异常,而get方法仍然使用父类的get方法。

**MappedByteBuffer**继承于ByteBuffer,是直接字节数组的抽象父类,在MappedByteBuffer类中只定义了三个和物理磁盘相关的方法:

- isLoaded():用于判断这个字节数组是否存在物理磁盘上

- load():将直接字节数组中的数据写到物理磁盘上

- force():强制将直接字节数组中的数据写到物理磁盘上

**DirectByteBuffer**继承于MappedByteBuffer,用于创建直接字节数组的类。由于直接字节数组不是在堆上分配内存,因此不受GC控制,创建和释放过程比较繁琐,通过通过Unsafe类的allocateMemory()方法分配内存空间,而且DirectByteBuffer的所有get()和put()方法都是通过Unsafe类完成

    //读取position处的元素

    public byte get() {

        return ((unsafe.getByte(ix(nextGetIndex()))));

    }

    //在position处写入元素

    public ByteBuffer put(byte x) {

        unsafe.putByte(ix(nextPutIndex()), ((x)));

        return this;

    }

## Channel ##

关于同步、异步、阻塞和非阻塞的区别:同步和异步说的是消息的通知机制,阻塞非阻塞说的是线程的状态 。

- 同步阻塞IO:client在调用read()方法时,stream里没有数据可读,线程停止向下执行,直至stream有数据。

- 同步非阻塞IO:client在调用read()方法时,stream里没有数据可读,read()方法就返回了,线程可以去干别的事,但是需要有一个线程监听这stream中是否有数据准备好。

- 异步非阻塞IO:服务端调用read()方法,若stream中无数据则返回,程序继续向下执行。当stream中有数据时,操作系统会负责把数据拷贝到用户空间,然后通知这个线程,这里的消息通知机制就是异步!

NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写道缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

**Channel**:是NIO中通道的接口类,只提供了两个抽象方法,isOpen()判断通道是否打开,close()关闭一个通道。

**AbstractInterruptibleChannel**:是一个抽象类,实现了Channel接口。任何一个通道如果想要实现在中断时实现异步关闭通道,那么必须继承这个类,这主要体现在两个方面:

- 当某个线程阻塞在channel上,而另一个线程调用了channel的close()方法,那么阻塞的线程会收到AsynchronousCloseException

- 如果某个线程阻塞在channel上,另一个线程调用了阻塞线程的interrupt()方法,那么阻塞线程会收到ClosedByInterruptException,并且通道会被关闭。

AbstractInterruptibleChannel抽象类中定义了一组协同方法begin()和end()方法来完成这两个功能,因此当线程执行一个可能阻塞的IO操作时,必须把这个IO操作放在begin()方法和end()方法之间,才能实现channel的异步关闭。

    protected final void begin() {

        if (interruptor == null) {

            interruptor = new Interruptible() {

                    public void interrupt(Thread target) {

                        synchronized (closeLock) {

                            if (!open)

                                return;

                            open = false;

                            interrupted = target;

                            try {

                                //收到中断请求后会回调AbstractInterruptibleChannel类的close()方法关闭通道

                                AbstractInterruptibleChannel.this.implCloseChannel();

                            } catch (IOException x) { }

                        }

                    }};

        }

        blockedOn(interruptor);

        Thread me = Thread.currentThread();

        if (me.isInterrupted())

            interruptor.interrupt(me);

    }

    protected final void end(boolean completed)

        throws AsynchronousCloseException

    {

        blockedOn(null);

        Thread interrupted = this.interrupted;

        //中断触发器不为空,会抛出ClosedByInterruptException

        if (interrupted != null && interrupted == Thread.currentThread()) {

            interrupted = null;

            throw new ClosedByInterruptException();

        }

        //触发器为空,没有中断,但是在阻塞的过程中channel被关闭了,抛出AsynchronousCloseException

        if (!completed && !open)

            throw new AsynchronousCloseException();

    }

**SelectableChannel**:是一个抽象类,继承于AbstractInterruptibleChannel。需要注册到Selector上的通道必须继承这个类

- public abstract int validOps();获取这个通道支持的事件

- public abstract boolean isRegistered();通道是否注册到了某个Selector上

- public abstract SelectionKey keyFor(Selector sel);这个通道在指定Selector上注册的事件

- public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;注册这个通道到指定的Selector上

- public abstract SelectableChannel configureBlocking(boolean block) throws IOException;修改这个通道为非阻塞或阻塞

- public abstract boolean isBlocking();如果这个通道是阻塞模式,返回true

**AbstractSelectableChannel**:是一个抽象类,是SelectableChannel的基础实现类。在AbstractSelectableChannel类中定义了一个SelectionKey数组,记录这个channel注册到了哪些Selector上,定义了一个keyCount记录这个channel注册的次数。并且channel的最初模式设置为阻塞模式。

- 判断这个channel是否注册了:keyCount不为0就表示注册了

        public final boolean isRegistered() {

            synchronized (keyLock) {

                return keyCount != 0;

            }

        }

- 获取这个通道在指定Selector上的注册事件,给定Selector,在SelectionKey[]数组中查找Selector与指定Selector相等的SelectionKey

        public final SelectionKey keyFor(Selector sel) {

            return findKey(sel);

        }

- 将这个通道注册到指定Selector上,如果给定的事件ops不是这个通道支持的事件validOps()将会抛出异常,而且阻塞的channel无法注册到Selector上,具体在注册的时候如果channel已经注册到了这个Selector上,那么更新ops和附加信息,如果这个channel还没有注册到这个Selector上,将会调用Selector 类的register(SelectableChannel ch, int ops, Object o)方法完成注册

        public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{

            synchronized (regLock) {

                if (!isOpen())

                    throw new ClosedChannelException();

                if ((ops & ~validOps()) != 0)

                    throw new IllegalArgumentException();

                if (blocking)

                    throw new IllegalBlockingModeException();

                SelectionKey k = findKey(sel);

                if (k != null) {

                    k.interestOps(ops);

                    k.attach(att);

                }

                if (k == null) {

                    // New registration

                    synchronized (keyLock) {

                        if (!isOpen())

                            throw new ClosedChannelException();

                        k = ((AbstractSelector)sel).register(this, ops, att);

                        addKey(k);

                    }

                }

                return k;

            }

        }

- 关闭通道除了需要关闭通道之外,还需要把SelectionKey[]数组上的SelectionKey取消,而SelectionKey类中的cancel()方法又会调用AbstractSelector中的cancel()方法,AbstractSelector类中的cancel()方法将这个SelectionKey加入到cancelledKeys集合中。

        protected final void implCloseChannel() throws IOException {

            implCloseSelectableChannel();

            synchronized (keyLock) {

                int count = (keys == null) ? 0 : keys.length;

                for (int i = 0; i < count; i++) {

                    SelectionKey k = keys[i];

                    if (k != null)

                        k.cancel();

                }

            }

        }

- 判断channel是否是阻塞模式,只有非阻塞channel才能注册到Selector上

        public final boolean isBlocking() {

            synchronized (regLock) {

                return blocking;

            }

        }

- 修改channel的阻塞模式

        public final SelectableChannel configureBlocking(boolean block) throws IOException {

            synchronized (regLock) {

                if (!isOpen())

                    throw new ClosedChannelException();

                if (blocking == block)

                    return this;

                if (block && haveValidKeys())

                    throw new IllegalBlockingModeException();

                implConfigureBlocking(block);

                blocking = block;

            }

            return this;

        }

**SelectionKey**:每次channel注册到一个Selector都会返回一个SelectionKey对象,因此SelectionKey描述的是一次注册事件中channel和Selector之间的映射关系。

- public abstract SelectableChannel channel();获取SelectionKey对应的channel

- public abstract Selector selector();获取对应SelectionKey对应的channel

- public abstract boolean isValid();这个SelectionKey是否有效

- public abstract void cancel();把这个SelectionKey置为无效

- public abstract int interestOps();获取这个SelectionKey关注的事件

- public abstract SelectionKey interestOps(int ops);设置这个SelectionKey关注的事件

- public abstract int readyOps();这个SelectionKey关注的事件中就绪了的事件

- public static final int OP_READ = 1 << 0;读事件

- public static final int OP_WRITE = 1 << 2;写事件

- public static final int OP_CONNECT = 1 << 3;channel连接到服务器的连接事件

- public static final int OP_ACCEPT = 1 << 4;服务器准备好了接受连接事件

- channel上有读事件就绪、写事件就绪(isWritable)、连接事件就绪(isConnectable)、接受连接事件就绪(isAcceptable)

    public final boolean isReadable() {

            return (readyOps() & OP_READ) != 0;

        }

**AbstractSelectionKey**:是一个抽象类,是SelectionKey的基础实现类。一个channel注册到一个Selector上返回一个SelectionKey,这个SelectionKey初始就是有效的。

- 取消一个SelectionKey,先将SelectionKey置为无效,然后调用Selector的cancel(SelectionKey key)方法完成具体的取消操作

        public final void cancel() {

            synchronized (this) {

                if (valid) {

                    valid = false;

                    ((AbstractSelector)selector()).cancel(this);

                }

            }

        }

**SelectionKeyImpl**:SelectionKey的最终实现类,继承于AbstractSelectionKey。在SelectionKeyImpl类中定义了int类型interestOps、int类型的readyOps,实现了SelectionKey抽象类中的interestOps()、readyOps()、interestOps(int ops)这三个方法。

## Selector ##

Selector是NIO得以实现的核心模块之一,NIO属于同步非阻塞IO,同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;

**Selector**:是一个抽象类。提供了静态方法open()创建一个Selector对象,open()方法是使用SelectorProvider类完成的。

- public abstract boolean isOpen();这个Selector是否打开了

- public abstract SelectorProvider provider();获取创建这个Selector的SelectorProvider

- public abstract Set keys();获取这个Selector上所有的注册的channel对应的SelectionKey

- public abstract Set selectedKeys();获取这个Selector上所有就绪了的channel对应的SelectionKey

- public abstract int selectNow() throws IOException;非阻塞的执行一次选择操作,没有就绪的channel就立即返回0,否则返回有多少个channel就绪了。

- public abstract int select(long timeout) throws IOException;执行一次选择操作,并且阻塞一段时间,在这段时间里,如果有channel就绪了,将会返回有多少个channel就绪了,如果到达了指定时间还没有channel就绪,就返回0.

- public abstract int select() throws IOException;阻塞执行选择操作,一直阻塞到有channel就绪,然后返回有多少个channel就绪了

- public abstract Selector wakeup();使阻塞在select()方法上的线程立即返回。

        Selector selector = Selector.open();

        channel.configureBlocking(false);

        SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

        while(true) {

          int readyChannels = selector.select();

          if(readyChannels == 0) continue;

          Set selectedKeys = selector.selectedKeys();

          Iterator keyIterator = selectedKeys.iterator();

          while(keyIterator.hasNext()) {

            SelectionKey key = keyIterator.next();

            if(key.isAcceptable()) {

                // a connection was accepted by a ServerSocketChannel.

            } else if (key.isConnectable()) {

                // a connection was established with a remote server.

            } else if (key.isReadable()) {

                // a channel is ready for reading

            } else if (key.isWritable()) {

                // a channel is ready for writing

            }

            keyIterator.remove();

          }

        }

**AbstractSelector**:是一个抽象类,是Selector的基础实现类.

- 定义了一个boolean类型的open变量,初始化为true,表示一个Selector创建时就为打开状态。

- 定义了一个cancelledKeys集合,表示已经取消的SelectionKey的集合。SelectionKey中的cancel()方法会调用AbstractSelector类的cancel()方法,cancel()方法将SelectionKey加入到cancelledKeys集合中。

        void cancel(SelectionKey k) {                       // package-private

            synchronized (cancelledKeys) {

                cancelledKeys.add(k);

            }

        }

- 定义了一个SelectorProvider对象,用于实现Selector类中的provider()方法。

- 定义了一个反注册方法deregister(SelectionKey key),调用的是channel的方法,将key从SelectionKey[] keys数组中移除

        protected final void deregister(AbstractSelectionKey key) {

            ((AbstractSelectableChannel)key.channel()).removeKey(key);

        }

- 定义了一组协同方法begin()和end(),与AbstractInterruptibleChannel类中协同方法类似,在一个可能阻塞的IO操作前使用begin()方法,在IO操作之后使用end()方法,但是这里的协同方法是为了在产生中断之后使select()方法立即返回。

        protected final void begin() {

            if (interruptor == null) {

                interruptor = new Interruptible() {

                        public void interrupt(Thread ignore) {

                            //产生中断之后,调用wakeup()方法唤醒select()方法

                            AbstractSelector.this.wakeup();

                        }};

            }

            AbstractInterruptibleChannel.blockedOn(interruptor);

            Thread me = Thread.currentThread();

            if (me.isInterrupted())

                interruptor.interrupt(me);

        }

**SelectorImpl**:仍然是一个抽象类,继承于AbstractSelector类,进一步实现了Selector类。

- 定义了一个keys集合,用于实现Selector类的keys()方法,直接返回这个集合。

- 定义了一个selectedKeys集合,用于实现Selector类的selectedKeys()方法,直接返回这个集合。

- 将三个select()方法均委托给一个抽象方法,待子类进一步实现。

        //委托给lockAndDoSelect()方法

        public int select(long timeout) throws IOException{

                  if (timeout < 0)

                      throw new IllegalArgumentException("Negative timeout");

                  return lockAndDoSelect((timeout == 0) ? -1 : timeout);

        }

        //调用上一个方法

        public int select() throws IOException {

             return select(0);

        }

        //委托给lockAndDoSelect()方法

        public int selectNow() throws IOException {

             return lockAndDoSelect(0);

        }

        //同步锁,进一步委托给doSelect(long time)这个抽象方法。

        private int lockAndDoSelect(long timeout) throws IOException {

              synchronized (this) {

                  if (!isOpen())

                      throw new ClosedSelectorException();

                  synchronized (publicKeys) {

                      synchronized (publicSelectedKeys) {

                          return doSelect(timeout);

                      }

                  }

              }

          }

- 进一步实现了close()方法,但是没有完全实现,还是委托给一个抽象方法

//先调用wakeup()方法使select()方法立即返回,然后同步锁,调用抽象方法implClose()

        public void implCloseSelector() throws IOException {

             wakeup();

             synchronized (this) {

                 synchronized (publicKeys) {

                     synchronized (publicSelectedKeys) {

                         implClose();

                     }

                 }

             }

         }

- 初步实现了register(channel, int ops, Object att)方法,委托给抽象方法implRegister(SelectionKey k)方法

        protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment){

             if (!(ch instanceof SelChImpl))

                 throw new IllegalSelectorException();

            //新建一个SelectionKey对象

             SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);

            //设置附加信息

             k.attach(attachment);

             synchronized (publicKeys) {

                 implRegister(k);

             }

            //设置感兴趣事件

             k.interestOps(ops);

             return k;

         }

- 定义了一个方法处理cancelledKeys集合,委托给抽象方法implDereg(SelectionKey k)方法

           void processDeregisterQueue() throws IOException {

                 // Precondition: Synchronized on this, keys, and selectedKeys

                //获取cancelledKeys集合

                 Set cks = cancelledKeys();

                 synchronized (cks) {

                     if (!cks.isEmpty()) {

                         Iterator i = cks.iterator();

                         while (i.hasNext()) {

                             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();

                             try {

                                 implDereg(ski);

                             } catch (SocketException se) {

                                 IOException ioe = new IOException("Error deregistering key");

                                 ioe.initCause(se);

                                 throw ioe;

                             } finally {

                                 i.remove();

                             }

                         }

                     }

                 }

             }

## SelectorProvider ##

SelectorProvider为Selector、DatagramChannel、SocketChannel、ServerSocketChannel、Pipe这些Selector和channel提供打开方法。

**SelectorProvider**:是一个抽象类

-     public abstract DatagramChannel openDatagramChannel() throws IOException;打开UDP通信channel

-     public abstract Pipe openPipe() throws IOException;打开一个管道

-     public abstract AbstractSelector openSelector() throws IOException;打开一个Selector

- public abstract ServerSocketChannel openServerSocketChannel() throws IOException;打开一个服务器socket channel

-     public abstract SocketChannel openSocketChannel() throws IOException;打开一个TCP通信channel

**SelectorProviderImpl**:是一个抽象类,是SelectorProvider的基础实现类。

- 打开UDP通信channel,返回的是UDP channel的实现类对象。

         public DatagramChannel openDatagramChannel() throws IOException   {  

            return new DatagramChannelImpl(this);  

        }  

- 打开TCP通信channel,返回的是Socket channel的实现类对象。

        public SocketChannel openSocketChannel() throws IOException  {  

            return new SocketChannelImpl(this);  

        }  

- 打开TCP通信服务器端的channel,返回的是ServerSocket channel的实现类对象。

        public ServerSocketChannel openServerSocketChannel() throws IOException  {  

            return new ServerSocketChannelImpl(this);  

        } 

- 打开一个管道,返回的是pipe实现类对象。

        public Pipe openPipe() throws IOException  {  

            return new PipeImpl(this);  

        }  

- 打开Selector的方法没有实现,待子类实现。

**WindowsSelectorProvider **:SelectorProvider的最终实现类,继承于SelectorProviderImpl。实现了Selector的打开方法

- 打开Selector。调用的是Selector的最终实现类WindowsSelectorImpl,通过WindowsSelectorImpl的构造函数返回一个Selector

        public AbstractSelector openSelector() throws IOException {

              return new WindowsSelectorImpl(this);

        }

## 再看 channel ##

**FileChannel**:是一个抽象类,继承于AbstractInterruptibleChannel类,没有继承SelectableChannel,即FileChannel无法注册到Selector上。FileChannel也无法以非阻塞模式读写。通过阻塞方式对文件读写。

- 打开一个FileChannel,一般通过传统IO流获取FileChannel。但是FileChannel类中也定义了open()函数打开一个FileChannel,getChannel()方法底层就是调用的FileChannel的open()方法

        FileInputStream fis = new FileInputStream("C:\\mycode\\hello.txt");

        FileChannel inChannel = fis.getChannel();

- 从FileChannel读取数据:public abstract int read(ByteBuffer dst) throws IOException;将通道中的数据读出来,并写道指定ByteBuffer中.FileChannel也支持分散写,即写到多个ByteBuffer中.

        public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

- 向FileChannel写数据: public abstract int write(ByteBuffer src) throws IOException;将ByteBuffer缓冲区中的数据写入到channel中.FileChannel也支持聚集写,即多个ByteBuffer中的数据写到channel中.

        public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

- 在FileChannel的某个特定位置进行数据的读/写操作,改变文件position的位置.

        //获取文件position的位置

        public abstract long position() throws IOException;

        //设置文件position

        public abstract FileChannel position(long newPosition) throws IOException;

- 将channel中的数据写入到另一个channel,或将另一个channel中的数据写入到这个channel中

        //将本channel中的数据写入到另一个“可写入”的channel,从另一个channel的position处开始写入

        public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

        //将另一个channel中的数据写入到本channel中,从另一个channel的position处开始

        public abstract long transferFrom(ReadableByteChannel src, long position, long count) throws IOException;

- 获取channel关联的文件的大小

        public abstract long size() throws IOException;

- 截取一个指定大小的文件,截断channel关联的文件为size大小,size之后的数据会被丢弃

        public abstract FileChannel truncate(long size) throws IOException;

**DatagramChannel**:是一个抽象类,真正的实现类是其子类DatagramChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。UDP通信的channel。

- 创建一个DatagramChannel。调用DatagramChannel的静态方法open(),通过SelectorProvider去创建DatagramChannelImpl的实例

        public static DatagramChannel open() throws IOException {

            return SelectorProvider.provider().openDatagramChannel();

        }

- DatagramChannel支持的事件:读和写

        public final int validOps() {

            return (SelectionKey.OP_READ | SelectionKey.OP_WRITE);

        }

- 报文流本来是无连接的,在没有连接到一个指定地址时,channel可以同时发送数据报到多个远程地址、也可以同时从多个远程地址接收数据报。通过DatagramChannel的send(ByteBuffer src, SocketAddress add)方法发送数据报到指定远程地址,通过DatagramChannel的receive(ByteBuffer dst)方法从任意远程地址接收数据报,receive()方法会返回一个SocketAddress对象用以标识数据报来自哪个远程地址。在没有建立连接的时候,每一次调用send()方法发送数据报或者调用receive()方法接收数据报时都会接收安全检查。

        //发送数据报到指定远程地址

        public abstract int send(ByteBuffer src, SocketAddress target) throws IOException;

        //从任意远程地址接收数据报,并返回数据来自哪个地址

        public abstract SocketAddress receive(ByteBuffer dst) throws IOException;

- 报文流也可以建立连接。建立连接后,channel将只能从指定的远程地址接收数据报、同时也只能发送数据报到指定的远程地址。由于已经连接到了指定的远程地址,因此在发送或者接收数据报的时候可以调用write()方法已经read()方法。write(ByteBuffer src)方法将数据报发送到指定远程地址、read(ByteBuffer dst)方法从指定远程地址接收数据报。指定连接到指定远程地址的channel才能调用write()方法和read()方法,每次调用write()方法和read()方法时不需要接收安全检查。**将DatagramChannel置于已连接的状态可以使除了它所“连接”到的地址之外的任何其他源地址的数据报被忽略。这是很有帮助的,因为不想要的包都已经被网络层丢弃了,从而避免了使用代码来接收、检查然后丢弃包的麻烦。**

        //将channel连接到指定远程地址

        public abstract DatagramChannel connect(SocketAddress remote) throws IOException;

        //断开channel与远程地址间的连接

        public abstract DatagramChannel disconnect() throws IOException;

        //channel是否连接到了某个远程地址

        public abstract boolean isConnected();

        //发送数据报到指定远程地址,支持聚集写

        public abstract int write(ByteBuffer src) throws IOException;

        public final long write(ByteBuffer[] srcs) throws IOException {

            return write(srcs, 0, srcs.length);

        }

        public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

        //从指定远程地址接收数据报,支持分散读

        public abstract int read(ByteBuffer dst) throws IOException;

        public final long read(ByteBuffer[] dsts) throws IOException {

            return read(dsts, 0, dsts.length);

        }

        public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

- 如果channel处于阻塞模式:调用send()方法或者write()方法,调用线程可能会休眠直到数据报被加入传输队列。如果channel是非阻塞的:send()方法或者write()方法、read()返回值要么是字节缓冲区的字节数,要么是“0”,receive()方法的返回值要么是远程地址对象要么是null。

- 报文流可以绑定也可以不绑定。如果channel负责监听,那么必须绑定到一个指定端口,channel将会一直监听这个端口。当channel没有绑定的时候,仍然能够接收数据报,使用的是动态分配的端口号。已经绑定的channel将从指定端口接收或者发送数据报。

- 报文流是不可靠传输。1)假如receive()方法提供的ByteBuffer没有足够的剩余空间来存放您正在接收的数据包,没有被填充的字节都会被悄悄地丢弃。2)如果send()方法给定的ByteBuffer传输队列没有足够空间来承载整个数据报,那么什么内容都不会被发送。3)传输过程中的协议可能将数据报分解成碎片。例如,以太网不能传输超过1,500个字节左右的包。如果您的数据报比较大,那么就会存在被分解成碎片的风险,成倍地增加了传输过程中包丢失的几率。被分解的数据报在目的地会被重新组合起来,接收者将看不到碎片。但是,如果有一个碎片不能按时到达,那么整个数据报将被丢弃。

**SocketChannel**:是一个抽象类,真正的实现类是其子类SocketChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。客户端的TCP通信的channel。

- 创建一个SocketChannel对象,通过SocketChannel的静态方法open委托给SelectorProvider类创建SocketChannelImpl类对象。

        public static SocketChannel open() throws IOException {

            return SelectorProvider.provider().openSocketChannel();

        }

- SocketChannel支持的事件:读、写、发起连接

    public final int validOps() {

        return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);

    }

- channel必须在使用之前连接到远程地址,在阻塞模式下,连接操作会一直阻塞直到连接成功或者失败;在非阻塞模式下,连接操作即使没有连接成功也会立刻返回,因此需要通过finishConnect()方法判断是否连接成功并一直尝试连接。

        //channel是否成连接到一个远程地址

        public abstract boolean isConnected();

        //channel是否正处于连接中

        public abstract boolean isConnectionPending();

        //channel连接一个远程地址

        public abstract boolean connect(SocketAddress remote) throws IOException;

        //channel是否完成了连接

        public abstract boolean finishConnect() throws IOException;

        while(!channel.finishConnect()){

            //由于必须在连接成功之后才能进行IO操作,必须等待连接成功

            doSomethingElse();

        }

- 往channel中写数据,支持聚集写。写完之后,Selector的select()方法会检测到这个channel的WRITE事件就绪了

        public abstract int write(ByteBuffer src) throws IOException;

        public final long write(ByteBuffer[] srcs) throws IOException {

            return write(srcs, 0, srcs.length);

        }

        public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

- 从channel中读取数据,支持分散读。Selector的select()方法会检测到这个channel的READ事件就绪了

        public abstract int read(ByteBuffer dst) throws IOException;

        public final long read(ByteBuffer[] dsts) throws IOException {

            return read(dsts, 0, dsts.length);

        }    

        public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

SocketChannel的一个实例:

        public class MyClient {

            private static Selector selector = null;

            private volatile static boolean stop = false;

            private static SocketChannel channel = null;

            public static void main(String[] args) {

                selector = Selector.open();

                try {

                    channel = SocketChannel.open();

                    channel.configureBlocking(false);

                    channel.connect(new InetSocketAddress("127.0.0.1", 7777));

                    //注册一个连接请求事件

                    channel.register(selector, SelectionKey.OP_CONNECT);

                    try {

                        while (!stop) {

                            selector.select();

                            Set selectedKeys = selector.selectedKeys();

                            Iterator iterator = selectedKeys.iterator();

                            while (iterator.hasNext()) {

                                SelectionKey key = iterator.next();

                                //连接就绪

                                if (key.isConnectable()) {

                                    //服务器接收了连接请求

                                    SocketChannel sc = (SocketChannel) key.channel();

                                    if (sc.finishConnect()) {

                                        // 将关注的事件变成read

                                        sc.register(selector, SelectionKey.OP_READ);

                                        doWrite(sc, "dddddd");

                                    }

                                }

                                // 读就绪

                                if (key.isReadable()) {

                                    //服务器有数据过来了,处理数据,再发送数据

                                }

                                iterator.remove();

                            }

                        }

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                } catch (ClosedChannelException e) {

                    System.out.println("client: 失去主机连接");

                    e.printStackTrace();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

**ServerSocketChannel**:是一个抽象类,真正的实现类是其子类ServerSocketChannelImpl,继承于AbstractSelectableChannel类。因此有阻塞和非阻塞两种模式,在非阻塞模式下可以注册到Selector上。服务器端的TCP通信的channel。

- 创建一个ServerSocketChannel,通过ServerSocketChannel的静态方法open委托给SelectorProvider类创建ServerSocketChannelImpl类对象。

        public static ServerSocketChannel open() throws IOException {

            return SelectorProvider.provider().openServerSocketChannel();

        }

- ServerSocketChannel支持的事件:接收连接

        public final int validOps() {

            return SelectionKey.OP_ACCEPT;

        }

- ServerSocketChannel必须先绑定到一个端口上,一直监听这个端口。

        public abstract ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException;

- 获取与这个ServerSocketChannel关联的SocketChannel,即发起连接请求的SocketChannel

        public abstract SocketChannel accept() throws IOException;

ServerSocketChannel的一个实例:

    public class MyService {

        public static Selector selector = null;

        public static void main(String[] args) {

            selector = Selector.open();// 打开selector

            ServerSocketChannel server = ServerSocketChannel.open();

            server.socket().bind(new InetSocketAddress(7777), 1024);

            server.configureBlocking(false);

            //服务器开始监听等待连接,注册ACCEPT事件

            server.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {

                try {

                    selector.select(1000); // 阻塞selector

                    // ================如果有新连接

                    Set selectedKeys = selector.selectedKeys();// 获得事件集合;

                    // ================遍历selectedKeys

                    Iterator iterator = selectedKeys.iterator();

                    SelectionKey key = null;

                    while (iterator.hasNext()) {

                        key = iterator.next();// 获得到当前的事件

                        // ===============处理事件

                         // 连接就绪,有客户端请求连接,注册的事件发生

                        if (key.isAcceptable()) {

                            // 获得对应的ServerSocketChannel

                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                            // 得到对应的SocketChannel 

                            SocketChannel channel = ssc.accept();

                            // 处理socketChannel

                            channel.configureBlocking(false); 

                            channel.register(selector, SelectionKey.OP_READ); 

                        }

                        // 读就绪

                        if (key.isReadable()) {

                            //客户端有数据过来了,之前注册的READ事件来了

                        }

                        // ===============

                        iterator.remove(); // 移除事件

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

    }

相关文章

网友评论

      本文标题:Java NIO

      本文链接:https://www.haomeiwen.com/subject/lveyyftx.html