美文网首页
NIO教程 ——检视阅读(下)

NIO教程 ——检视阅读(下)

作者: 卡斯特梅的雨伞 | 来源:发表于2020-04-22 23:57 被阅读0次

Non-blocking Server非阻塞服务器

非阻塞服务器代码

非阻塞IO通道(Non-blocking IO Pipelines)

非阻塞的IO管道(Non-blocking IO Pipelines)可以看做是整个非阻塞IO处理过程的链条。包括在以非阻塞形式进行的读与写操作。

一个非阻塞的IO管道不必同时需要读和写数据,通常来说有些管道只需要读数据,而另一些管道则只需写数据。

当然一个非阻塞的IO管道他也可以同时从多个Channel中读取数据,例如同时从多个SocketChannel中读取数据;

非阻塞和阻塞通道比较(Non-blocking vs. Blocking IO Pipelines)

非阻塞IO管道和阻塞IO管道之间最大的区别是他们各自如何从Channel(套接字socket或文件file)读写数据。

  1. IO管道通常直接从流中(来自于socket活file的流)读取数据,然后把数据分割为连续的消息。这个处理与我们读取流信息,用tokenizer进行解析非常相似,不同的是我们在这里会把数据流分割为更大一些的消息块。
  2. 一个阻塞IO管道的使用可以和输入流一样调用,每次从Channel中读取一个字节的数据,阻塞自身直到有数据可读。
  3. 使用阻塞IO大大简化了Message Reader的实现成本。阻塞的Message Reader无需关注没有数据返回的情形,无需关注返回部分数据或者数据解析需要被复用的问题。

阻塞IO通道的缺点(Blocking IO Pipeline Drawbacks)

上面提到了阻塞的Message Reader易于实现,但是阻塞也给他带了不可避免的缺点,必须为每个数据数量都分配一个单独线程。原因就在于IO接口在读取数据时在有数据返回前会一直被阻塞住。这直接导致我们无法用单线程来处理一个流没有数据返回时去读取其他的流。每当一个线程尝试去读取一个流的数据,这个线程就会被阻塞直到有数据真正返回。

如果这样的IO管道运用到服务器去处理高并发的链接请求,服务器将不得不为每一个到来的链接分配一个单独的线程。如果并发数不高比如每一时刻只有几百并发,也需不会有太大问题。一旦服务器的并发数上升到百万级别,这种设计就缺乏伸缩性。每个线程需要为堆栈分配320KB(32位JVM)到1024KB(64位JVM)的内存空间。这就是说如果有1,000,000个线程,需要1TB的内存。而这些在还没开始真正处理接收到的消息前就需要(消息处理中还需要为对象开辟内存)。

为了减少线程数,很多服务器都设计了线程池,把所有接收到的请求放到队列内,每次读取一条连接进行处理。这种设计可以用下图表示:

non-blocking-server-3.png

但是这种设计要求缓冲的连接进程发送有意义的数据。如果这些连接长时间处于非活跃的状态,那么大量非活跃的连接会阻塞线程池中的所有线程。这会导致服务器的响应速度特别慢甚至无响应

有些服务器为了减轻这个问题,采取的操作是适当增加线程池的弹性。例如,当线程池所有线程都处于饱和时,线程池可能会自动扩容,启动更多的线程来处理事务。这个解决方案会使得服务器维护大量不活跃的链接。但是需要谨记服务器所能开辟的线程数是有限制的。所有当有1,000,000个低速的链接时(大量非活跃链接时),服务器还是不具备伸缩性。

基础的非阻塞通道设计(Basic Non-blocking IO Pipeline Design)

一个非阻塞的IO通道可以用单线程读取多个数据流。这个前提是相关的流可以切换为非阻塞模式(并不是所有流都可以以非阻塞形式操作,FileChannel就不能切换非阻塞模式)。在非阻塞模式下,读取一个流可能返回0个或多个字节。如果流还没有可供读取的数据那么就会返回0,其他大于1的返回都表明这是实际读取到的数据;

为了避开没有数据可读的流,我们结合Java NIO中的Selector。一个Selector可以注册多个SelectableChannel实例。当我们调用select()或selectorNow()方法时Selector会返回一个有数据可读的SelectableChannel实例。这个设计可以如下插图:

non-blocking-server-4.png

读取部分信息(Reading Partial Messages)

当我们从SelectableChannel中读取一段数据后,我们并不知道这段数据是否是完整的一个message。因为一个数据段可能包含部分message,也就是说即可能少于一个message,也可能多一个message(0到多个message),正如下面这张插图所示意的那样:

non-blocking-server-5.png

要处理这种截断的message,我们会遇到两个问题(非阻塞读取数据时):

  1. 检测数据段中是否包含一个完整的message
  2. 在message剩余部分获取到之前,我们如何处理不完整的message

检测完整message要求Message Reader查看数据段中的数据是否至少包含一个完整的message。如果包含一个或多个完整message,这些message可以被下发到通道中处理。查找完整message的过程是个大量重复的操作,所以这个操作必须是越快越好的。

当数据段中有一个不完整的message时,无论不完整消息是整个数据段还是说在完整message前后,这个不完整的message数据都需要在剩余部分获得前存储起来。

检查message完整性存储不完整message都是Message Reader的职责。为了避免混淆来自不同Channel的数据,我们为每一个Channel分配一个Message Reader。整个设计大概是这样的:

non-blocking-server-6.png

当我们通过Selector获取到一个有数据可以读取的Channel之后,该Channel关联的Message Reader会读取数据,并且把数据打断为Message块。得到完整的message后就可以通过通道下发到其他组件进行处理。

一个Message Reader自然是协议相关的。他需要知道message的格式以便读取。如果我们的服务器是跨协议复用的,那他必须实现Message Reader的协议-大致类似于接收一个Message Reader工厂作为配置参数。

存储不完整的Message(Storing Partial Messages)

现在我们已经明确了由Message Reader负责不完整消息的存储直到接收到完整的消息。现在我们还需要知道这个存储过程需要如何来实现。

在设计的时候我们需要考虑两个关键因素

  1. 我们希望在拷贝消息数据的时候数据量能尽可能的小,拷贝量越大则性能相对越低;
  2. 我们希望完整的消息是以顺序的字节存储,这样方便进行数据的解析;
为每个Message Reade分配Buffer(A Buffer Per Message Reader)
固定大小buffer

显然不完整的消息数据需要存储在某种buffer中。比较直接的办法是我们为每个Message Reader都分配一个内部的buffer成员。但是,多大的buffer才合适呢?这个buffer必须能存储下一个message最大的大小。如果一个message最大是1MB,那每个Message Reader内部的buffer就至少有1MB大小。

在百万级别的并发链接数下,1MB的buffer基本没法正常工作。举例来说,1,000,000 x 1MB就是1TB的内存大小!如果消息的最大数据量是16MB又需要多少内存呢?128MB呢?

缺点:这种直接分配一个message最大的大小值的buffer是非常浪费空间的。

可伸缩Buffer(Resizable Buffers)

另一个方案是在每个Message Reader内部维护一个容量可变的buffer。一个可变的buffer在初始化时占用较少空间,在消息变得很大超出容量时自动扩容。这样每个链接就不需要都占用比如1MB的空间。每个链接只使用承载下一个消息所必须的内存大小。

容量可变的buffer优点就是高效利用内存空间,不会浪费内存。

要实现一个可伸缩的buffer有几种不同的办法。每一种都有它的优缺点,下面几个小结我会逐一讨论它们。

拷贝扩容(Resize by Copy)

第一种实现可伸缩buffer的办法是初始化buffer的时候只申请较少的空间,比如4KB。如果消息超出了4KB的大小那么开赔一个更大的空间,比如8KB,然后把4KB中的数据拷贝纸8KB的内存块中。

拷贝方式扩容的优点:一个消息的全部数据都被保存在了一个连续的字节数组中。这使得数据解析变得更加容易。

缺点:会增加大量的数据拷贝操作。

拷贝扩容操作举例分析:

为了减少数据的拷贝操作,你可以分析整个消息流中的消息大小,一次来找到最适合当前机器的可以减少拷贝操作的buffer大小。例如,你可能会注意到觉大多数的消息都是小于4KB的,因为他们仅仅包含了一个非常请求和响应。这意味着消息的处所荣校应该设置为4KB。

同时,你可能会发现如果一个消息大于4KB,很可能是因为他包含了一个文件。你会可能注意到 大多数通过系统的数据都是小于128KB的。所以我们可以在第一次扩容设置为128KB。

最后你可能会发现当一个消息大于128KB后,没有什么规律可循来确定下次分配的空间大小,这意味着最后的buffer容量应该设置为消息最大的可能数据量。

结合这三次扩容时的大小设置,可以一定程度上减少数据拷贝。4KB以下的数据无需拷贝。在1百万的连接下需要的空间例如1,000,000x4KB=4GB,目前(2015)大多数服务器都扛得住。4KB到128KB会仅需拷贝一次,即拷贝4KB数据到128KB的里面。消息大小介于128KB和最大容量的时需要拷贝两次。首先4KB数据被拷贝第二次是拷贝128KB的数据,所以总共需要拷贝132KB数据。假设没有很多的消息会超过128KB,那么这个方案还是可以接受的。

当一个消息被完整的处理完毕后,它占用的内容应当即刻被释放。这样下一个来自同一个链接通道的消息可以从最小的buffer大小重新开始。这个操作是必须的如果我们需要尽可能高效地复用不同链接之间的内存。大多数情况下并不是所有的链接都会在同一时刻需要大容量的buffer。

笔者写了一个完整的教程阐述了如何实现一个内存buffer使其支持扩容:Resizable Arrays 。这个教程也附带了一个指向GitHub上的源码仓地址,里面有实现方案的具体代码。

追加扩容(Resize by Append)

另一种实现buffer扩容的方案是让buffer包含几个数组。当需要扩容的时候只需要在开辟一个新的字节数组,然后把内容写到里面去。

这种扩容也有两个具体的办法。一种是开辟单独的字节数组,然后用一个列表把这些独立数组关联起来。另一种是开辟一些更大的,相互共享的字节数组切片,然后用列表把这些切片和buffer关联起来。个人而言,笔者认为第二种切片方案更好一点点,但是它们之前的差异比较小

这种追加扩容的方案不管是用独立数组还是切片都有一个优点,那就是写数据的时候不需要额外的拷贝操作。所有的数据可以直接从socket(Channel)中拷贝至数组活切片当中。

这种方案的缺点也很明显,就是数据不是存储在一个连续的数组中。这会使得数据的解析变得更加复杂,因为解析器不得不同时查找每一个独立数组的结尾和所有数组的结尾。正因为我们需要在写数据时查找消息的结尾,这个模型在设计实现时会相对不那么容易。

TLV编码消息(TLV Encoded Messages)

有些协议的消息消失采用的是一种TLV格式(Type, Length, Value)。这意味着当消息到达时,消息的完整大小存储在了消息的开始部分。我们可以立刻判断为消息开辟多少内存空间。

优点:TLV编码是的内存管理变得更加简单。我们可以立刻知道为消息分配多少内存。即便是不完整的消息,buffer结尾后面也不会有浪费的内存。

缺点:我们需要在消息的全部数据接收到之前就开辟好需要用的所有内存。因此少量链接慢,但发送了大块数据的链接会占用较多内存,导致服务器无响应。

解决上诉问题的一个变通办法是使用一种内部包含多个TLV的消息格式。这样我们为每个TLV段分配内存而不是为整个的消息分配,并且只在消息的片段到达时才分配内存。但是消息片段很大时,任然会出现一样的问题。

另一个办法是为消息设置超时,如果长时间未接收到的消息(比如10-15秒)。这可以让服务器从偶发的并发处理大块消息恢复过来,不过还是会让服务器有一段时间无响应。另外恶意的DoS攻击会导致服务器开辟大量内存。

TLV编码使得内存管理更加简单,这也是HTTP1.1协议让人觉得是一个不太优良的的协议的原因。正因如此,HTTP2.0协议在设计中也利用TLV编码来传输数据帧。也是因为这个原因我们设计了自己的利用TLV编码的网络协议VStack.co

写不完整的消息(Writing Partial Messages)

在非阻塞IO管道中,写数据也是一个不小的挑战。当你调用一个非阻塞模式Channel的write()方法时,无法保证有多少机字节被写入了ByteBuffer中。write方法返回了实际写入的字节数,所以跟踪记录已被写入的字节数也是可行的。这就是我们遇到的问题:持续记录被写入的不完整的消息直到一个消息中所有的数据都发送完毕。

为了避免多个消息传递到Message Writer超出他所能处理到Channel的量,我们需要让到达的消息进入队列。Message Writer则尽可能快的将数据写到Channel里。

non-blocking-server-8.png

为了使Message Writer能够持续发送刚才已经发送了一部分的消息,Message Writer需要被一直调用,这样他就可以发送更多数据。

示例:

如果你有大量的链接,你会持有大量的Message Writer实例。检查比如1百万的Message Writer实例是来确定他们是否处于可写状态是很慢的操作。首先,许多Message Writer可能根本就没有数据需要发送。我们不想检查这些实例。其次,不是所有的Channel都处于可写状态。我们不想浪费时间在这些非写入状态的Channel。

为了检查一个Channel是否可写,可以把它注册到Selector上。但是我们不希望把所有的Channel实例都注册到Selector。试想一下,如果你有1百万的链接,这里面大部分是空闲的,把1百万链接都祖册到Selector上。然后调用select方法的时候就会有很多的Channel处于可写状态。你需要检查所有这些链接中的Message Writer以确认是否有数据可写。

为了避免检查所有的这些Message Writer,以及那些根本没有消息需要发送给他们的Channel实例,我们可以采用两步策略:

  1. 当有消息写入到Message Writer中后,把它关联的Channel注册到Selector上(如果还未注册的话)。
  2. 当服务器有空的时候,可以检查Selector看看注册在上面的Channel实例是否处于可写状态。每个可写的channel,使其Message Writer向Channel中写入数据。如果Message Writer已经把所有的消息都写入Channel,把Channel从Selector上解绑。

这两个小步骤确保只有有数据要写的Channel才会被注册到Selector。

集成(Putting it All Together)

正如你所知到的,一个被阻塞的服务器需要时刻检查当前是否有新的完整消息抵达。在一个消息被完整的收到前,服务器可能需要检查多次。检查一次是不够的。

类似的,服务器也需要时刻检查当前是否有任何可写的数据。如果有的话,服务器需要检查相应的链接看他们是否处于可写状态。仅仅在消息第一次进入队列时检查是不够的,因为一个消息可能被部分写入。

总而言之,一个非阻塞的服务器要三个管道,并且经常执行:

  • 读数据管道,用来检查打开的链接是否有新的数据到达;
  • 处理数据管道,负责处理接收到的完整消息;
  • 写数据管道,用于检查是否有数据可以写入打开的连接中;

这三个管道在循环中重复执行。你可以尝试优化它的执行。比如,如果没有消息在队列中等候,那么可以跳过写数据管道。或者,如果没有收到新的完整消息,你甚至可以跳过处理数据管道。

下面这张流程图阐述了这整个服务器循环过程:

non-blocking-server-9.png

服务器线程模型(Server Thread Model)

我们在GitHub上的源码中实现的非阻塞IO服务使用了一个包含两条线程的线程模型。第一个线程负责从ServerSocketChannel接收到达的链接。另一个线程负责处理这些链接,包括读消息,处理消息,把响应写回到链接。这个双线程模型如下:

non-blocking-server-10.png

NIO DatagramChannel数据报通道

一个Java NIO DatagramChannel是一个可以发送、接收UDP数据包的通道。由于UDP是面向无连接的网络协议,我们不可用像使用其他通道一样直接进行读写数据。正确的做法是发送、接收数据包

打开一个DatagramChannel(Opening a DatagramChannel)

打开一个DatagramChannel你这么操作:

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9999));

上述示例中,我们打开了一个DatagramChannel,它可以在9999端口上收发UDP数据包。

接收数据(Receiving Data)

接收数据,直接调用DatagramChannel的receive()方法:

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf);

receive()方法会把接收到的数据包中的数据拷贝至给定的Buffer中。如果数据包的内容超过了Buffer的大小,剩余的数据会被直接丢弃。

发送数据(Sending Data)

发送数据是通过DatagramChannel的send()方法:

String newData = "New String to wrte to file..."               +System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
int byteSent = channel.send(buf, new InetSocketAddress("java.com", 80));

上述示例会把一个字符串发送到“java.com”服务器的UDP端口80.目前这个端口没有被任何程序监听,所以什么都不会发生。当发送了数据后,我们不会收到数据包是否被接收的的通知,这是由于UDP本身不保证任何数据的发送问题。

链接特定机器地址(Connecting to a Specific Address)

DatagramChannel实际上是可以指定到网络中的特定地址的。由于UDP是面向无连接的,这种链接方式并不会创建实际的连接,这和TCP通道类似。确切的说,他会锁定DatagramChannel,这样我们就只能通过特定的地址来收发数据包。

看一个例子先:

channel.connect(new InetSocketAddress("jenkov.com"), 80));

当连接上后,可以向使用传统的通道那样调用read()和Writer()方法。区别是数据的读写情况得不到保证。下面是几个示例:

int bytesRead = channel.read(buf);    
int bytesWritten = channel.write(buf);

实例:

public class DataGramChannelClient {

    public static void main(String[] args) throws IOException {
        //open a datagramChannel
        DatagramChannel datagramChannel = DatagramChannel.open();
        try {
            //set non-blocking style
            datagramChannel.configureBlocking(false);

            //create a byteBuffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            //get test data from console
            Scanner scanner = new Scanner(System.in);

            while (scanner.hasNext()) {
                String next = scanner.next();
                byteBuffer.put(next.getBytes());
                byteBuffer.flip();
                //Sending Data
                datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9999));
                byteBuffer.clear();
            }
        } finally {
            datagramChannel.close();
        }
    }
}
public class DataGramChannelServer {

    public static void main(String[] args) throws IOException {
        //打开了一个DatagramChannel,它可以在9999端口上收发UDP数据包。
        DatagramChannel datagramChannel = DatagramChannel.open();
        datagramChannel.configureBlocking(false);
        datagramChannel.bind(new InetSocketAddress(9999));
        Selector selector = Selector.open();
        //注意要把数据报通道注册到selector上,否则不能检测到请求
        datagramChannel.register(selector, SelectionKey.OP_READ);

        while (selector.select() > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey selectionKey = keyIterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("ready Acceptable");
                } else if (selectionKey.isReadable()) {
                    System.out.println("ready Readable");
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    datagramChannel.receive(byteBuffer);
                    byteBuffer.flip();
                    //System.out.println(new String(byteBuffer.array()));//this(bytes, 0, bytes.length); byteBuffer不一定是读满的,所有用下面的limit
                    System.out.println(new String(byteBuffer.array(),0,byteBuffer.limit()));
                    byteBuffer.clear();
                }
            }
            keyIterator.remove();
        }
    }

}

NIO Pipe管道

一个Java NIO的管道是两个线程间单向传输数据的连接。一个管道(Pipe)有一个source channel和一个sink channel(没想到合适的中文名)。我们把数据写到sink channel中,这些数据可以同过source channel再读取出来。

下面是一个管道的示意图:

http://tutorials.jenkov.com/images/java-nio/pipe-internals.png

创建管道(Creating a Pipe)

打开一个管道通过调用Pipe.open()工厂方法,如下:

Pipe pipe = Pipe.open();

向管道写入数据(Writing to a Pipe)

向管道写入数据需要访问他的sink channel:

Pipe.SinkChannel sinkChannel = pipe.sink();

接下来就是调用write()方法写入数据了:

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}

从管道读取数据(Reading from a Pipe)

类似的从管道中读取数据需要访问他的source channel:

Pipe.SourceChannel sourceChannel = pipe.source();

接下来调用read()方法读取数据:

ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf);

注意这里read()的整形返回值代表实际读取到的字节数。

示例:

public class PipeTest {

    public static void main(String[] args) throws IOException {
        Pipe pipe = Pipe.open();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //通过缓冲区向管道写入数据
        Pipe.SinkChannel sinkChannel = pipe.sink();
        byteBuffer.put("i am pipe".getBytes());
        byteBuffer.flip();
        sinkChannel.write(byteBuffer);

        //通过缓冲区从管道读数据
        //先要重置缓冲区
        byteBuffer.clear();
        Pipe.SourceChannel sourceChannel = pipe.source();
        int length = sourceChannel.read(byteBuffer);
        //缓冲区转为读模式
        byteBuffer.flip();
        System.out.println(new String(byteBuffer.array(),0,length));
    }
}

NIO vs. IO

问题:

当学习Java的NIO和IO时,有个问题会跳入脑海当中:什么时候该用IO,什么时候用NIO?两者之间的区别,使用场景以及他们是如何影响代码设计的。

NIO和IO之间的主要差异(Mian Differences Between Java NIO and IO)

下面这个表格概括了NIO和IO的主要差异。我们会针对每个差异进行解释。

IO NIO
Stream oriented Buffer oriented
Blocking IO No blocking IO
Selectors

即:

IO NIO
面向流 面向缓冲
阻塞IO 非阻塞IO
选择器
面向流和面向缓冲区比较(Stream Oriented vs. Buffer Oriented)

1、第一个重大差异是IO是面向流的,而NIO是面向缓冲区的。这句话是什么意思呢?

Java IO面向流意思是我们每次从流当中读取一个或多个字节。怎么处理读取到的字节是我们自己的事情。他们不会再任何地方缓存。再有就是我们不能在流数据中向前后移动。如果需要向前后移动读取位置,那么我们需要首先为它创建一个缓存区。

Java NIO是面向缓冲区的,这有些细微差异。数据是被读取到缓存当中以便后续加工我们可以在缓存中向前向后移动。这个特性给我们处理数据提供了更大的弹性空间。当然我们仍然需要在使用数据前检查缓存中是否包含我们需要的所有数据。另外需要确保在往缓冲中写入数据时避免覆盖了已经写入但是还未被处理的数据。

2、阻塞和非阻塞IO比较(Blocking vs. No-blocking IO)

Java IO的各种流都是阻塞的。这意味着一个线程一旦调用了read(),write()方法,那么该线程就被阻塞住了,直到读取到数据或者数据完整写入了。在此期间线程不能做其他任何事情。

Java NIO的非阻塞模式使得线程可以通过channel来读数据,并且是返回当前已有的数据,或者什么都不返回。如果当前没有数据可读的话。这样一来线程不会被阻塞住,它可以继续向下执行其他事情

通常线程在调用非阻塞操作后,会通知处理其他channel上的IO操作。因此一个线程可以管理多个channel的输入输出。

3、Selectors

Java NIO的selector允许一个单一线程监听多个channel输入。我们可以注册多个channel到selector上,然后用一个线程来挑出一个处于可读或者可写状态的channel。selector机制使得单线程管理过个channel变得容易。

NIO和IO是如何影响程序设计的(How NIO and IO Influences Application Design)

开发中选择NIO或者IO会在多方面影响程序设计:

  1. 使用NIO、IO的API调用类
  2. 数据处理
  3. 处理数据需要的线程数
API调用(The API Calls)

显而易见使用NIO的API接口和使用IO时是不同的。不同于直接从InputStream读取字节,我们的数据需要先写入到buffer中,然后再从buffer中处理它们。

数据处理(The Processing of Data)

数据的处理方式也随着是NIO或IO而异

BIO下数据处理是阻塞的,一旦数据方法处理返回时数据就一定能读取到或写入好了,不会有只做一半的情况,且不能在流数据中向前后移动。而NIO是非阻塞的,在读取或写入数据缓冲区时是不能确定数据是否已经完整读完的,可能需要多次检查数据完整性。

例子:

在IO设计中,我们从InputStream或者Reader中读取字节。假设我们现在需要处理一个按行排列的文本数据,如下:

Name: Anna
Age: 25
Email: anna@mailserver.com
Phone: 1234567890

这个处理文本行的过程大概是这样的:

InputStream input = ... ; // get the InputStream from the client socket

BufferedReader reader = new BufferedReader(new InputStreamReader(input));

String nameLine   = reader.readLine();
String ageLine    = reader.readLine();
String emailLine  = reader.readLine();
String phoneLine  = reader.readLine();

请注意处理状态由程序执行多久决定。换句话说,一旦reader.readLine()方法返回,你就知道肯定文本行就已读完, readline()阻塞直到整行读完,这就是原因。你也知道此行包含名称;同样,第二个readline()调用返回的时候,你知道这行包含年龄等。 正如你可以看到,该处理程序仅在有新数据读入时运行,并知道每步的数据是什么。一旦正在运行的线程已处理过读入的某些数据,该线程不会再回退数据(大多如此)。下图也说明了这条原则:

image.png

而一个NIO的实现会有所不同,下面是一个简单的例子:

ByteBuffer buffer = ByteBuffer.allocate(48); 
int bytesRead = inChannel.read(buffer); 

注意第二行,从通道读取字节到ByteBuffer。当这个方法调用返回时,你不知道你所需的所有数据是否在缓冲区内。你所知道的是,该缓冲区包含一些字节,这使得处理有点困难。假设第一次 read(buffer)调用后,读入缓冲区的数据只有半行,例如,“Name:An”,你能处理数据吗?显然不能,需要等待,直到整行数据读入缓存,在此之前,对数据的任何处理毫无意义。所以,你怎么知道是否该缓冲区包含足够的数据可以处理呢?好了,你不知道。发现的方法只能查看缓冲区中的数据。其结果是,在你知道所有数据都在缓冲区里之前,你必须检查几次缓冲区的数据。这不仅效率低下,而且可以使程序设计方案杂乱不堪。例如:

ByteBuffer buffer = ByteBuffer.allocate(48);   

int bytesRead = inChannel.read(buffer);   

while(! bufferFull(bytesRead) ) {   
       bytesRead = inChannel.read(buffer);   
} 

bufferFull()方法必须跟踪有多少数据读入缓冲区,并返回真或假,这取决于缓冲区是否已满。换句话说,如果缓冲区准备好被处理,那么表示缓冲区满了。

bufferFull()方法扫描缓冲区,但必须保持在bufferFull()方法被调用之前状态相同。如果没有,下一个读入缓冲区的数据可能无法读到正确的位置。这是不可能的,但却是需要注意的又一问题。

如果缓冲区已满,它可以被处理。如果它不满,并且在你的实际案例中有意义,你或许能处理其中的部分数据。但是许多情况下并非如此。下图展示了“缓冲区数据循环就绪”:

image.png

小结

NIO允许我们只用一条线程来管理多个通道(网络连接或文件),随之而来的代价是解析数据相对于阻塞流来说可能会变得更加的复杂。

如果你需要同时管理成千上万的链接,这些链接只发送少量数据,例如聊天服务器,用NIO来实现这个服务器是有优势的。类似的,如果你需要维持大量的链接,例如P2P网络,用单线程来管理这些 链接也是有优势的。这种单线程多连接的NIO设计图

image.png

如果链接数不是很多,但是每个链接的占用较大带宽,每次都要发送大量数据,那么使用传统的IO设计服务器可能是最好的选择。下面是经典IO服务设计图

image.png

NIO Path路径

Java的path接口是作为Java NIO 2的一部分是Java6,7中NIO的升级增加部分。Path在Java 7新增的。相关接口位于java.nio.file包下,所以Path接口的完整名称是java.nio.file.Path.

一个Path实例代表一个文件系统内的路径。path可以指向文件也可以指向目录。可以是相对路径也可以是绝对路径。绝对路径包含了从根目录到该文件(目录)的完整路径。相对路径包含该文件(目录)相对于其他路径的路径。

在很多情况下java.no.file.Path接口和java.io.File比较相似,但是他们之间存在一些细微的差异。尽管如此,在大多数情况下,我们都可以用Path接口来替换File相关类。

创建Path实例(Creating a Path Instance)

为了使用java.nio.file.Path实例我们必须创建Path对象。创建Path实例可以通过Paths的工厂方法get()。

注意Paths.get("c:\data\myfile.txt")的调用。这个方法会创建一个Path实例,换句话说Paths.get()是Paths的一个工厂方法。

创建绝对路径(Creating an Absolute Path)

创建绝对路径只需要调动Paths.get()这个工厂方法,同时传入绝对文件。这是一个例子:

Path path = Paths.get("c:\\data\\myfile.txt");

对路径是c:\data\myfile.txt,里面的双斜杠\字符是Java 字符串中必须的,因为\是转义字符,表示后面跟的字符在字符串中的真实含义。双斜杠\表示\自身。

上面的路径是Windows下的文件系统路径表示。在Unixx系统中(Linux, MacOS,FreeBSD等)上述的绝对路径长得是这样的:

Path path = Paths.get("/home/jakobjenkov/myfile.txt");

他的绝对路径是/home/jakobjenkov/myfile.txt。 如果在Windows机器上使用用这种路径,那么这个路径会被认为是相对于当前磁盘的。例如:

/home/jakobjenkov/myfile.txt

这个路径会被理解其C盘上的文件,所以路径又变成了

C:/home/jakobjenkov/myfile.txt

创建相对路径(Creating a Relative Path)

相对路径是从一个路径(基准路径)指向另一个目录或文件的路径。完整路径实际上等同于相对路径加上基准路径。

Java NIO的Path类可以用于相对路径。创建一个相对路径可以通过调用Path.get(basePath, relativePath),下面是一个示例:

Path projects = Paths.get("d:\\data", "projects");
Path file     = Paths.get("d:\\data", "projects\\a-project\\myfile.txt");

第一行创建了一个指向d:\data\projects的Path实例。第二行创建了一个指向d:\data\projects\a-project\myfile.txt的Path实例。 在使用相对路径的时候有两个特殊的符号:

  • .
  • ..

.表示的是当前目录,例如我们可以这样创建一个相对路径:

Path currentDir = Paths.get(".");
System.out.println(currentDir.toAbsolutePath());

currentDir的实际路径就是当前代码执行的目录。 如果在路径中间使用了.那么他的含义实际上就是目录位置自身,例如:

Path currentDir = Paths.get("d:\\data\\projects\.\a-project");

上诉路径等同于:

d:\data\projects\a-project

..表示父目录或者说是上一级目录

Path parentDir = Paths.get("..");

这个Path实例指向的目录是当前程序代码的父目录。 如果在路径中间使用..那么会相应的改变指定的位置:

String path = "d:\\data\\projects\\a-project\\..\\another-project";
Path parentDir2 = Paths.get(path);

这个路径等同于:

d:\data\projects\another-project

.和..也可以结合起来用,这里不过多介绍。

Path.normalize()

Path的normalize()方法可以把路径规范化。也就是把.和..都等价去除:

String originalPath = "d:\\data\\projects\\a-project\\..\\another-project";

Path path1 = Paths.get(originalPath);
System.out.println("path1 = " + path1);

Path path2 = path1.normalize();
System.out.println("path2 = " + path2);

这段代码的输出如下:

path1 = d:\data\projects\a-project\..\another-project
path2 = d:\data\projects\another-project

实例:

import java.nio.file.Path;
import java.nio.file.Paths;
public class PathTest {
    public static void main(String[] args) {
        //创建Path实例
        Path path = Paths.get("c:\\data\\myfile.txt");
        //创建绝对路径(Creating an Absolute Path)
        Path path1 = Paths.get("c:\\data\\myfile.txt");
        //创建相对路径
        Path path2 = Paths.get("d:\\data", "projects\\a-project\\myfile.txt");
        //Path的normalize()方法可以把路径规范化
        String originalPath = "d:\\data\\projects\\a-project\\..\\another-project";

        Path path3 = Paths.get(originalPath);
        System.out.println("path3 = " + path3);

        Path path4 = path3.normalize();
        System.out.println("path4 = " + path4);
    }
}

NIO Files

Java NIO中的Files类(java.nio.file.Files)提供了多种操作文件系统中文件的方法。本节教程将覆盖大部分方法。Files类包含了很多方法,所以如果本文没有提到的你也可以直接查询JavaDoc文档。

java.nio.file.Files类是和java.nio.file.Path相结合使用的,所以在用Files之前确保你已经理解了Path类。

Files.exists()

Files.exits()方法用来检查给定的Path在文件系统中是否存在。 在文件系统中创建一个原本不存在的Path是可行的。例如,你想新建一个目录,那么先创建对应的Path实例,然后创建目录。

由于Path实例可能指向文件系统中的不存在的路径,所以需要用Files.exists()来确认。

下面是一个使用Files.exists()的示例:

Path path = Paths.get("data/logging.properties");

boolean pathExists =
        Files.exists(path,
            new LinkOption[]{ LinkOption.NOFOLLOW_LINKS});

这个示例中,我们首先创建了一个Path对象,然后利用Files.exists()来检查这个路径是否真实存在。

注意Files.exists()的的第二个参数。他是一个数组,这个参数直接影响到Files.exists()如何确定一个路径是否存在。在本例中,这个数组内包含了LinkOptions.NOFOLLOW_LINKS,表示检测时不包含符号链接文件。

Files.createDirectory()

Files.createDirectory()会创建Path表示的路径,下面是一个示例:

Path path = Paths.get("data/subdir");

try {
    Path newDir = Files.createDirectory(path);
} catch(FileAlreadyExistsException e){
    // the directory already exists.
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

第一行创建了一个Path实例,表示需要创建的目录。接着用try-catch把Files.createDirectory()的调用捕获住。如果创建成功,那么返回值就是新创建的路径

如果目录已经存在了,那么会抛出java.nio.file.FileAlreadyExistException异常。如果出现其他问题,会抛出一个IOException。比如说,要创建的目录的父目录不存在,那么就会抛出IOException。父目录指的是你要创建的目录所在的位置。也就是新创建的目录的上一级父目录。

Files.copy()

Files.copy()方法可以吧一个文件从一个地址复制到另一个位置。例如:

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

这个例子当中,首先创建了原文件和目标文件的Path实例。然后把它们作为参数,传递给Files.copy(),接着就会进行文件拷贝。

如果目标文件已经存在,就会抛出java.nio.file.FileAlreadyExistsException异常。类似的目标地址路径不对,也会抛出IOException。

覆盖已经存在的文件(Overwriting Existing Files)

copy操作可以强制覆盖已经存在的目标文件。下面是具体的示例:

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

注意copy方法的第三个参数,这个参数决定了是否可以覆盖文件。

Files.move()

Java NIO的Files类也包含了移动的文件的接口。移动文件和重命名是一样的,但是还会改变文件的目录位置。java.io.File类中的renameTo()方法与之功能是一样的。

Path sourcePath      = Paths.get("data/logging-copy.properties");
Path destinationPath = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.move(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
    //moving file failed.
    e.printStackTrace();
}

首先创建源路径和目标路径的,原路径指的是需要移动的文件的初始路径,目标路径是指需要移动到的位置。

这里move的第三个参数也允许我们覆盖已有的文件。

Files.delete()

Files.delete()方法可以删除一个文件或目录:

Path path = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.delete(path);
} catch (IOException e) {
    //deleting file failed
    e.printStackTrace();
}

首先创建需要删除的文件的path对象。接着就可以调用delete了。

Files.walkFileTree()

Files.walkFileTree()方法具有递归遍历目录的功能。walkFileTree接受一个Path和FileVisitor作为参数。Path对象是需要遍历的目录,FileVistor则会在每次遍历中被调用。

下面先来看一下FileVisitor这个接口的定义:

public interface FileVisitor {

    public FileVisitResult preVisitDirectory(
        Path dir, BasicFileAttributes attrs) throws IOException;

    public FileVisitResult visitFile(
        Path file, BasicFileAttributes attrs) throws IOException;

    public FileVisitResult visitFileFailed(
        Path file, IOException exc) throws IOException;

    public FileVisitResult postVisitDirectory(
        Path dir, IOException exc) throws IOException {

}

FileVisitor需要调用方自行实现,然后作为参数传入walkFileTree().FileVisitor的每个方法会在遍历过程中被调用多次。如果不需要处理每个方法,那么可以继承他的默认实现类SimpleFileVisitor,它将所有的接口做了空实现。

下面看一个walkFileTree()的示例:

Files.walkFileTree(path, new FileVisitor<Path>() {
  @Override
  public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
    System.out.println("pre visit dir:" + dir);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    System.out.println("visit file: " + file);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
    System.out.println("visit file failed: " + file);
    return FileVisitResult.CONTINUE;
  }

  @Override
  public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
    System.out.println("post visit directory: " + dir);
    return FileVisitResult.CONTINUE;
  }
});

FileVisitor的方法会在不同时机被调用: preVisitDirectory()在访问目录前被调用。postVisitDirectory()在访问后调用。

visitFile()会在整个遍历过程中的每次访问文件都被调用。他不是针对目录的,而是针对文件的。visitFileFailed()调用则是在文件访问失败的时候。例如,当缺少合适的权限或者其他错误。

上述四个方法都返回一个FileVisitResult枚举对象。具体的可选枚举项包括:

  • CONTINUE
  • TERMINATE
  • SKIP_SIBLINGS
  • SKIP_SUBTREE

返回这个枚举值可以让调用方决定文件遍历是否需要继续。 CONTINE表示文件遍历和正常情况下一样继续。

TERMINATE表示文件访问需要终止。

SKIP_SIBLINGS表示文件访问继续,但是不需要访问其他同级文件或目录。

SKIP_SUBTREE表示继续访问,但是不需要访问该目录下的子目录。这个枚举值仅在preVisitDirectory()中返回才有效。如果在另外几个方法中返回,那么会被理解为CONTINE。

Searching For Files

下面看一个例子,我们通过walkFileTree()来寻找一个README.txt文件:

Path rootPath = Paths.get("data");
String fileToFind = File.separator + "README.txt";

try {
  Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() {

    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
      String fileString = file.toAbsolutePath().toString();
      //System.out.println("pathString = " + fileString);

      if(fileString.endsWith(fileToFind)){
        System.out.println("file found at path: " + file.toAbsolutePath());
        return FileVisitResult.TERMINATE;
      }
      return FileVisitResult.CONTINUE;
    }
  });
} catch(IOException e){
    e.printStackTrace();
}

Deleting Directies Recursively

Files.walkFileTree()也可以用来删除一个目录以及内部的所有文件和子目。Files.delete()只用用于删除一个空目录。我们通过遍历目录,然后在visitFile()接口中三次所有文件,最后在postVisitDirectory()内删除目录本身。

Path rootPath = Paths.get("data/to-delete");

try {
  Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() {
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
      System.out.println("delete file: " + file.toString());
      Files.delete(file);
      return FileVisitResult.CONTINUE;
    }

    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
      Files.delete(dir);
      System.out.println("delete dir: " + dir.toString());
      return FileVisitResult.CONTINUE;
    }
  });
} catch(IOException e){
  e.printStackTrace();
}

示例:

public class FileTest {

    public static void main(String[] args) throws IOException {
        //创建绝对路径
        Path path = Paths.get("D:\\text\\file\\testfile.txt");//false
        //Path path = Paths.get("D:\\text\\1_loan.sql");//true
        //Path path = Paths.get("D:\\text\\test1\\1_loan.sql");//false
        //检查给定的Path在文件系统中是否存在,NOFOLLOW_LINKS:表示检测时不包含符号链接文件
        boolean exists = Files.exists(path, new LinkOption[]{LinkOption.NOFOLLOW_LINKS});
        //默认不传的话是包含符号链接文件的
        //boolean exists = Files.exists(path);
        System.out.println("exists =" + exists);

        //路径格式可以是这两种
        Path filePath = Paths.get("D:\\text\\file");
        //Path filePath = Paths.get("D:/text/file_copy");

        //返回值就是新创建的路径.创建文件夹
        //Path directoryPath = Files.createDirectory(filePath);
        //createDirectory directoryPath=D:\text\file
        //System.out.println("createDirectory directoryPath=" + directoryPath);

        //返回值就是新创建的路径.创建文件夹
        //Path rtfilePath = Files.createFile(path);
        //createFile rtfilePath=D:\text\file\testfile.txt
        //System.out.println("createFile rtfilePath=" + rtfilePath);

        //Path path1 = Paths.get("D:\\text\\file\\subfile\\testfile.sql");
        //NoSuchFileException: D:\text\file\subfile\testfile.sql because subfile not exist
        //Path rtpath1 = Files.createFile(path1);
        //System.out.println("createFile rtpath1=" + rtpath1);

        //Path path2 = Paths.get("D:\\text\\file\\subfile\\testfile.sql");
        //创建连续不存在的文件夹,不存在就创建,不过只能创建文件夹,不能连同文件创建,文件要另外创建
        //Path rtpath2 = Files.createDirectories(path2);
        //createFile rtpath2=D:\text\file\subfile\testfile.sql
        //System.out.println("createFile rtpath2=" + rtpath2);


        //copy
        Path sourcePath = Paths.get("D:\\text\\file\\testfile.txt");
        Path destinationPath = Paths.get("D:\\text\\file\\testfile_copy.txt");
        //copy =D:\text\file\testfile_copy.txt
        //copy的目标路径文件不能存在,否则抛java.nio.file.FileAlreadyExistsException: D:\text\file\testfile_copy.txt异常
        //Path copy = Files.copy(sourcePath, destinationPath);
        //copy操作可以强制覆盖已经存在的目标文件,传入参数 StandardCopyOption.REPLACE_EXISTING
        //Path copy = Files.copy(sourcePath, destinationPath,StandardCopyOption.REPLACE_EXISTING);
        //System.out.println("copy =" + copy);


        //move:移动文件和重命名是一样的
        //Path sourcePathM = Paths.get("D:\\text\\file\\testfile.txt");
        //Path destinationPathM = Paths.get("D:\\text\\file\\testfile_move.txt");
        //Path move = Files.move(sourcePathM, destinationPathM);
        //move操作可以强制覆盖已经存在的目标文件,传入参数 StandardCopyOption.REPLACE_EXISTING
        //原有的testfile.txt将被移动或重命名而不存在了,如果有目标文件testfile_move.txt存在,则会被覆盖
        //Path move = Files.move(sourcePathM, destinationPathM,StandardCopyOption.REPLACE_EXISTING);
        //System.out.println("move =" + move);

        //delete 删除一个文件或目录
        //Path deletePath = Paths.get("D:\\text\\file\\testfile_move.txt");
        //Exception in thread "main" java.nio.file.NoSuchFileException: D:\text\file\testfile_move.txt
        //要求要删除的文件或目录必须存在,否则报错
        //Files.delete(deletePath);
        //存在才删除
        //Files.deleteIfExists(deletePath);


        Path rootPath = Paths.get("D:");
         final String fileToFind = File.separator + "README.txt";

        try {
            Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() {

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    String fileString = file.toAbsolutePath().toString();
                    //System.out.println("pathString = " + fileString);

                    if(fileString.endsWith(fileToFind)){
                        System.out.println("file found at path: " + file.toAbsolutePath());
                        return FileVisitResult.TERMINATE;
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
        } catch(IOException e){
            e.printStackTrace();
        }
    }
}

NIO AsynchronousFileChannel异步文件通道(AIO)

Java7中新增了AsynchronousFileChannel作为nio的一部分。AsynchronousFileChannel使得数据可以进行异步读写。

创建AsynchronousFileChannel(Creating an AsynchronousFileChannel)

AsynchronousFileChannel的创建可以通过open()静态方法:

Path path = Paths.get("data/test.xml");

AsynchronousFileChannel fileChannel =
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);

open()的第一个参数是一个Path实体,指向我们需要操作的文件。 第二个参数是操作类型。上述示例中我们用的是StandardOpenOption.READ,表示以读的形式操作文件。

读取数据(Reading Data)

读取AsynchronousFileChannel的数据有两种方式。每种方法都会调用AsynchronousFileChannel的一个read()接口。下面分别看一下这两种写法。

1、通过Future读取数据(Reading Data Via a Future)

第一种方式是调用返回值为Future的read()方法:

Future<Integer> operation = fileChannel.read(buffer, 0);

这种方式中,read()接受一个ByteBuffer作为第一个参数,数据会被读取到ByteBuffer中。第二个参数是开始读取数据的位置。

read()方法会立刻返回,即使读操作没有完成。我们可以通过isDone()方法检查操作是否完成

下面是一个略长的示例:

 AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

Future<Integer> operation = fileChannel.read(buffer, position);

while(!operation.isDone());

buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();

在这个例子中我们创建了一个AsynchronousFileChannel,然后创建一个ByteBuffer作为参数传给read。接着我们创建了一个循环来检查是否读取完毕isDone()。这里的循环操作比较低效,它的意思是我们需要等待读取动作完成。

一旦读取完成后,我们就可以把数据写入ByteBuffer,然后输出。

2、通过CompletionHandler读取数据(Reading Data Via a CompletionHandler)

另一种方式是调用接收CompletionHandler作为参数的read()方法。下面是具体的使用:

fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("result = " + result);

        attachment.flip();
        byte[] data = new byte[attachment.limit()];
        attachment.get(data);
        System.out.println(new String(data));
        attachment.clear();
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
});

这里,一旦读取完成,将会触发CompletionHandler的completed()方法,并传入一个Integer和ByteBuffer。前面的整形表示的是读取到的字节数大小。第二个ByteBuffer也可以换成其他合适的对象方便数据写入。 如果读取操作失败了,那么会触发failed()方法。

写数据(Writing Data)

和读数据类似某些数据也有两种方式,调动不同的的write()方法,下面分别看介绍这两种方法。

通过Future写数据(Writing Data Via a Future)

通过AsynchronousFileChannel我们可以异步写数据。

Path path = Paths.get("data/test-write.txt");
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();

while(!operation.isDone());

System.out.println("Write done");

首先把文件以写方式打开,接着创建一个ByteBuffer作为写入数据的目的地。再把数据进入ByteBuffer。最后检查一下是否写入完成。 需要注意的是,这里的文件必须是已经存在的,否则在尝试write数据是会抛出一个java.nio.file.NoSuchFileException.

检查一个文件是否存在可以通过下面的方法:

if(!Files.exists(path)){
    Files.createFile(path);
}
通过CompletionHandler写数据(Writing Data Via a CompletionHandler)

我们也可以通过CompletionHandler来写数据:

Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
    Files.createFile(path);
}
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("bytes written: " + result);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("Write failed");
        exc.printStackTrace();
    }
});

同样当数据吸入完成后completed()会被调用,如果失败了那么failed()会被调用。

示例:

public class AIOTest {

    public static void main1(String[] args) throws IOException {
        //通过Future读取数据
        Path path = Paths.get("D:/test/file/README.txt");
        AsynchronousFileChannel asynChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        Future<Integer> future = asynChannel.read(byteBuffer, 0);
        while (!future.isDone()) {
            //等待读取动作完成
        };
        byteBuffer.flip();
        //有以下两种输出方式,本质上都是把缓冲区byteBuffer转为byte数组,再用new String接收
        //System.out.println(new String(byteBuffer.array(),0,byteBuffer.limit()));
        byte[] data = new byte[byteBuffer.limit()];
        byteBuffer.get(data);
        //设置编码格式
        //System.out.println(new String(data, StandardCharsets.UTF_8));
        //不设置编码格式时取的是系统默认的编码格式。在linux中是utf-8
        System.out.println(new String(data));
        byteBuffer.clear();
        asynChannel.close();
    }

    public static void main2(String[] args) throws IOException {
        //通过CompletionHandler读取数据
        Path path = Paths.get("D:/test/file/README.txt");
        AsynchronousFileChannel asynChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        asynChannel.read(byteBuffer, 0, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("result = " + result);
                attachment.flip();
                byte[] data = new byte[attachment.limit()];
                attachment.get(data);
                System.out.println(new String(data));
                attachment.clear();
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("result failed " + exc.getMessage());
            }
        });
        asynChannel.close();
    }

    public static void main3(String[] args) throws IOException {
        //通过Future写数据
        Path path = Paths.get("D:/test/file/README_WRITE.txt");
        //若文件不存在则创建一个
        if (!Files.exists(path)){
            Files.createFile(path);
        }
        AsynchronousFileChannel asynChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put("i am batman,and i am rich".getBytes());
        byteBuffer.flip();
        Future<Integer> future = asynChannel.write(byteBuffer, 0);
        byteBuffer.clear();
        while (!future.isDone()){
        }
        System.out.println("Write done");
        asynChannel.close();
    }

    public static void main(String[] args) throws IOException {
        //通过CompletionHandler写数据
        Path path = Paths.get("D:/test/file/README_WRITE.txt");
        //若文件不存在则创建一个
        if (!Files.exists(path)){
            Files.createFile(path);
        }
        AsynchronousFileChannel asynChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put("i am batman,and i am rich".getBytes());
        byteBuffer.flip();
        asynChannel.write(byteBuffer, 0, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("Write done");
                System.out.println("bytes written: " + result);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("bytes written failes: " + exc.getMessage());
            }
        });
        asynChannel.close();
    }
}

疑问:

Q:NIO的具体使用场景都有哪些?网络连接?学习NIO的目的?

Q:Linux的五种IO模型?与java的io模型的关系?

《漫话:如何给女朋友解释什么是Linux的五种IO模型?》

Q:equals()判断两个buffer相对,需满足:

  • 类型相同
  • buffer中剩余字节数相同
  • 所有剩余字节相等

从上面的三个条件可以看出,equals只比较buffer中的部分内容,并不会去比较每一个元素。

所有剩余字节相等是指如果这个buffer有被读或者写过,只比较他们剩余没有读或者写的部分是么?

Q:FileChannel不能切换为非阻塞模式,都有哪些Channel可以切换为非阻塞模式?

Q:select()方法的返回值是一个int整形,代表有多少channel处于就绪了。也就是自上一次select后有多少channel进入就绪。举例来说,假设第一次调用select时正好有一个channel就绪,那么返回值是1,并且对这个channel做任何处理,接着再次调用select,此时恰好又有一个新的channel就绪,那么返回值还是1,现在我们一共有两个channel处于就绪,但是在每次调用select时只有一个channel是就绪的。每次调用select时只有一个channel是就绪的?为什么?

Q:如何检查message完整性

Q:一个Message Reader自然是协议相关的?都有哪些协议?协议的作用是为了约定规范么?

Q:UDP是面向无连接的网络协议,什么叫无连接的网络协议?

Q:面向流和面向缓冲区的区别,我们不能在流数据中向前后移动。如果需要向前后移动读取位置,那么我们需要首先为它创建一个缓存区?怎么在缓冲区中向前后移动?

Q:针对作者画的NIO和BIO这两个交互图不能很明确得观察有什么不同,特别是BIO前面部分,和NIO后面部分没有画出来,后面部分也是多线程处理啊?不同点是是否阻塞进行连接还是非阻塞连接吧?

NIO:

nio-vs-io-3.png

BIO:

nio-vs-io-4.png

Q:NIO Path路径(java.nio.file.Path )和之前的BIOPath路径( java.io.File )有什么区别?在使用时怎么选择?

其他:

在用main方法测试时怎么给String args[] 参数赋值?

1、直接在代码中给args参数赋值一个我们想要的数组。

 static public void main(String args[]) throws Exception {
        args = new String[]{"D:\\text\\1_loan.sql", "D:\\text\\1_loan_copy.sql"};
  //....   
  }

2、在idea里的运行debug时可以设置program arguments,以空格符分开参数。

image.png

二、注意当copy其他java类进来时,如果引用类的包名路径不同,会导致报错,且还不能引用正确路径上的类,这时候要点开import 里引用的错误包路径的类引用路径,删除了重新引入。快捷键 ctrl + alt + o。

三、符号链接文件:与硬连接相对应,Lnux系统中还存在另一种连接,称为符号连接(Symbilc Link),也叫软连接。软链接文件有点类似于Windows的快捷方式 。

什么是linux下的符号链接文件

其他

首先Java中的IO有以下三种:
BIO(Blocking IO) 同步式阻塞IO
NIO(Non-BlockingIO/New IO) 同步式非阻塞IO JDK1.4提供
AIO(AsynchronousIO) 异步式非阻塞IO JDK1.8提供

略读:

IBM

NIO 的创建目的是为了让 Java 程序员可以实现高速 I/O 而无需编写自定义的本机代码。NIO 将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。

原来的 I/O 库(在 java.io.*中) 与 NIO 最重要的区别是数据打包和传输的方式。正如前面提到的,原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。

通道缓冲区是 NIO 中的核心对象,几乎在每一个 I/O 操作中都要使用它们。

通道是对原 I/O 包中的流的模拟。到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象。一个 Buffer 实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。

Buffer 是一个对象, 它包含一些要写入或者刚读出的数据。 在 NIO 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream 对象中 。

缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。

通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类), 而 通道可以用于读、写或者同时用于读写。

因为它们是双向的,所以通道可以比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。

在 NIO 系统中,任何时候执行一个读操作,您都是从通道中读取,但是您不是 直接 从通道读取。因为所有数据最终都驻留在缓冲区中,所以您是从通道读到缓冲区中。

因此读取文件涉及三个步骤:(1) 从 FileInputStream 获取 Channel,(2) 创建 Buffer,(3) 将数据从 Channel 读到 Buffer中。

clear() 方法重设缓冲区,使它可以接受读入的数据。 flip() 方法让缓冲区可以将新读入的数据写入另一个通道。

flip

现在我们要将数据写到输出通道中。在这之前,我们必须调用 flip() 方法。这个方法做两件非常重要的事:

  1. 它将 limit 设置为当前 position
  2. 它将 position 设置为 0。
image.png
clear

最后一步是调用缓冲区的 clear() 方法。这个方法重设缓冲区以便接收更多的字节。 Clear 做两种非常重要的事情:

  1. 它将 limit 设置为与 capacity 相同。
  2. 它设置 position 为 0。
image.png

read()write() 调用得到了极大的简化,因为许多工作细节都由缓冲区完成了。 clear()flip() 方法用于让缓冲区在读和写之间切换。

创建不同类型的缓冲区以达到不同的目的,如可保护数据不被修改的 只读 缓冲区,和直接映射到底层操作系统缓冲区的 直接 缓冲区。

使用静态方法 allocate() 来分配缓冲区:

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

将一个现有的数组转换为缓冲区,如下所示:

byte array[] = new byte[1024];``ByteBuffer buffer = ByteBuffer.wrap( array );

本例使用了 wrap() 方法将一个数组包装为缓冲区。必须非常小心地进行这类操作。一旦完成包装,底层数据就可以通过缓冲区或者直接访问。

创建一个包含槽 3 到槽 6 的子缓冲区。在某种意义上,子缓冲区就像原来的缓冲区中的一个窗口

窗口的起始和结束位置通过设置 positionlimit 值来指定,然后调用 Bufferslice() 方法:

buffer.position( 3 );``buffer.limit( 7 );``ByteBuffer slice = buffer.slice();

是缓冲区的 子缓冲区。不过, 片段缓冲区共享同一个底层数据数组

只读缓冲区非常简单 ― 您可以读取它们,但是不能向它们写入。可以通过调用缓冲区的 asReadOnlyBuffer() 方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区(并与其共享数据),只不过它是只读的。

只读缓冲区对于保护数据很有用。在将缓冲区传递给某个对象的方法时,您无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以 保证 该缓冲区不会被修改。

不能将只读的缓冲区转换为可写的缓冲区。

//直接缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect( 1024 );
分散/聚集 I/O

通道可以有选择地实现两个新的接口: ScatteringByteChannelGatheringByteChannel。一个 ScatteringByteChannel是一个具有两个附加读方法的通道:

  • long read( ByteBuffer[] dsts );
  • long read( ByteBuffer[] dsts, int offset, int length );

这些 long read() 方法很像标准的 read 方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。缓冲区数组就像一个大缓冲区。

以socket.read()为例子:

传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。


美团

对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。

最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。

换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。

NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。

回忆BIO模型,之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能”傻等”,即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

仔细分析一下我们需要的线程,其实主要包括以下几种: 1. 事件分发器,单线程选择就绪的事件。 2. I/O处理器,包括connect、read、write等,这种纯CPU操作,一般开启CPU核心个线程就可以。 3. 业务线程,在处理完I/O后,业务一般还会有自己的业务逻辑,有的还会有其他的阻塞I/O,如DB操作,RPC等。只要有阻塞,就需要单独的线程。

NIO给我们带来了些什么:

  • 事件驱动模型
  • 避免多线程
  • 单线程处理多任务
  • 非阻塞I/O,I/O读写不再阻塞,而是返回0
  • 基于block的传输,通常比基于流的传输更高效
  • 更高级的IO函数,zero-copy
  • IO多路复用大大提高了Java网络应用的可伸缩性和实用性

BIO,NIO,AIO 总结

如何区分 “同步/异步 ”和 “阻塞/非阻塞” 呢?

同步/异步是从行为角度描述事物的,而阻塞和非阻塞描述的当前事物的状态(等待调用结果时的状态)。

阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中·可以将数据直接写入或者将数据直接读到 Stream 对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。

NIO 通过Channel(通道) 进行读写。

通道是双向的,可读也可写,而流的读写是单向的。无论读写,通道只能和Buffer交互。因为 Buffer,通道可以异步地读写。

NIO有选择器,而IO没有。

选择器用于使用单个线程处理多个通道。因此,它需要较少的线程来处理这些通道。线程之间的切换对于操作系统来说是昂贵的。 因此,为了提高系统效率选择器是有用的。

AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

每当要从缓存区的时候读取数据时,就调用filp()“切换成读模式”

读完我们还想写数据到缓冲区,那就使用clear()函数,这个函数会“清空”缓冲区 。

参考

BIO,NIO,AIO 总结

Java NIO 教程——极客,蓝本

NIO 入门——IBM

相关文章

网友评论

      本文标题:NIO教程 ——检视阅读(下)

      本文链接:https://www.haomeiwen.com/subject/xkdnihtx.html