美文网首页程序员NIO通信 & Netty 4 源码解读
「高并发通信框架Netty4 源码解读(五)」NIO通道Chan

「高并发通信框架Netty4 源码解读(五)」NIO通道Chan

作者: 源码之路 | 来源:发表于2020-06-26 18:49 被阅读0次

通道(Channel)是 java.nio 的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的 Java I/O 示例,提供与 I/O 服务的直接连接。 Channel 用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。

概念

通道可以形象地比喻为银行出纳窗口使用的气动导管。您的薪水支票就是您要传送的信息,载体(Carrier)就好比一个缓冲区。您先填充缓冲区(将您的支票放到载体上),接着将缓冲“写”到通道中(将载体丢进导管中),然后信息负载就被传递到通道另一侧的 I/O 服务(银行出纳员)。该过程的回应是:出纳员填充缓冲区(将您的收据放到载体上),接着开始一个反方向的通道传输(将载体丢回到导管中)。载体就到了通道的您这一侧(一个填满了的缓冲区正等待您的查验),然后您就会 flip 缓冲区(打开盖子)并将它清空(移除您的收据)。现在您可以开车走了,下一个对象(银行客户)将使用同样的载体(Buffer)和导管(Channel)对象来重复上述过程。

多数情况下,通道与操作系统的文件描述符(File Descriptor)和文件句柄(File Handle)有着一对一的关系。虽然通道比文件描述符更广义,但您将经常使用到的多数通道都是连接到开放的文件描述符的。 Channel 类提供维持平台独立性所需的抽象过程,不过仍然会模拟现代操作系统本身的 I/O 性能。

通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的 I/O 服务。缓冲区则是通道内部用来发送和接收数据的端点。

继承关系

通道基础

首先,我们来更近距离地看一下基本的 Channel 接口。下面是 Channel 接口的完整源码:

package java.nio.channels;
public interface Channel
{
  public boolean isOpen( );
  public void close( ) throws IOException;
}

与缓冲区不同,通道 API 主要由接口指定。不同的操作系统上通道实现(ChannelImplementation)会有根本性的差异,所以通道 API 仅仅描述了可以做什么。因此很自然地,通道实现经常使用操作系统的本地代码。通道接口允许您以一种受控且可移植的方式来访问底层的 I/O服务。

您可以从顶层的 Channel 接口看到,对所有通道来说只有两种共同的操作:检查一个通道是否打开(IsOpen())和关闭一个打开的通道(close())。继承图显示,所有有趣的东西都是那些实现Channel 接口以及它的子接口的类。

InterruptibleChannel 是一个标记接口,当被通道使用时可以标示该通道是可以中断的(Interruptible)。如果连接可中断通道的线程被中断,那么该通道会以特别的方式工作,关于这一点我们会在后面进行讨论。大多数但非全部的通道都是可以中断的。

从 Channel 接口引申出的其他接口都是面向字节的子接口,包括 WritableByteChannel 和ReadableByteChannel。这也正好支持了我们之前所学的:通道只能在字节缓冲区上操作。层次结构表明其他数据类型的通道也可以从 Channel 接口引申而来。这是一种很好的类设计,不过非字节实现是不可能的,因为操作系统都是以字节的形式实现底层 I/O 接口的。

层次结构中有两个类位于一个不同的包:java.nio.channels.spi。这两个类是 AbstractInterruptibleChannel
AbstractSelectableChannel,它们分别为可中断的(interruptible)和可选择的(selectable)的通道实现提供所需的常用方法。尽管描述通道行为的接口都是在 java.nio.channels 包中定义的,不过具体的通道实现却都是从java.nio.channels.spi 中的类引申来的。这使得他们可以访问受保护的方法,而这些方法普通的通道用户永远都不会调用。

作为通道的一个使用者,您可以放心地忽视 SPI 包中包含的中间类。这种有点费解的继承层次只会让那些使用新通道的用户感兴趣。 SPI 包允许新通道实现以一种受控且模块化的方式被植入到Java 虚拟机上。这意味着可以使用专为某种操作系统、文件系统或应用程序而优化的通道来使性能最大化。

打开通道

通道是访问 I/O 服务的导管。正如我们前面博客讨论的,I/O 可以分为广义的两大类别:File I/O 和 Stream I/O。那么相应地有两种类型的通道也就不足为怪了,它们是文件(file)通道和套接字(socket)通道。上文的继承图有一个FileChannel 类和三个 socket 通道类: SocketChannel、 ServerSocketChannel 和 DatagramChannel。

通道可以以多种方式创建。 Socket 通道有可以直接创建新 socket 通道的工厂方法。·但是一个FileChannel 对象却只能通过在一个打开的 RandomAccessFile、 FileInputStream 或 FileOutputStream对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。 File 和 socket 通道会在后面详细讨论。

SocketChannel sc = SocketChannel.open( );
  sc.connect (new InetSocketAddress ("somehost", someport));

ServerSocketChannel ssc = ServerSocketChannel.open( );
  ssc.socket( ).bind (new InetSocketAddress (somelocalport));

DatagramChannel dc = DatagramChannel.open( );

RandomAccessFile raf = new RandomAccessFile ("somefile", "r");

FileChannel fc = raf.getChannel( );

其实,java.net 的 socket 类也有新的 getChannel( )方法。这些方法虽然能返回一个相应的 socket 通道对象,但它们却并非新通道的来源,RandomAccessFile.getChannel( )方法才是。只有在已经有通道存在的时候,它们才返回与一个 socket 关联的通道;它们永远不会创建新通道。

使用通道

我们知道,通道将数据传输给 ByteBuffer 对象或者从 ByteBuffer 对象获取数据进行传输。

将上文的类继承图中大部分零乱内容移除可以得到下图所示的UML 类图。



子接口API代码如下:

public interface ReadableByteChannel extends Channel
{
  public int read (ByteBuffer dst) throws IOException;
}
public interface WritableByteChannel extends Channel
{
  public int write (ByteBuffer src) throws IOException;
}
public interface ByteChannel extends ReadableByteChannel, WritableByteChannel
{ 
}

通道可以是单向(unidirectional)或者双向的(bidirectional)。一个 channel 类可能实现定义read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以提供 write( )方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。

ByteChannel 接口引申出了 ReadableByteChannel 和WritableByteChannel 两个接口。 ByteChannel 接口本身并不定义新的 API 方法,它是一种用来聚集它自己以一个新名称继承的多个接口的便捷接口。根据定义,实现 ByteChannel 接口的通道会同时实现 ReadableByteChannel 和 WritableByteChannel 两个接口,所以此类通道是双向的。这是简化类义的语法糖(syntactic sugar),它使得用操作器(operator)实例来测试通道对象变得更加简单。

这是一种好的类设计技巧,如果您在写您自己的 Channel 实现的话,您可以适当地实现这些接口。不过对于使用 java.nio.channels 包中标准通道类的程序员来说,这些接口并没有太大的吸引力。其实,每一个 file 或 socket 通道都实现全部三个接口。从类定义的角度而言,这意味着全部 file 和 socket 通道对象都是双向的。这对于 sockets 不是问题,因为它们一直都是双向的,不过对于 files 却是个问题。

我们知道,一个文件可以在不同的时候以不同的权限打开。从 FileInputStream 对象的getChannel( )方法获取的 FileChannel 对象是只读的,不过从接口声明的角度来看却是双向的,因为FileChannel 实现 ByteChannel 接口。在这样一个通道上调用 write( )方法将抛出未经检查的NonWritableChannelException 异常,因为 FileInputStream 对象总是以 read-only 的权限打开文件。

通道会连接一个特定 I/O 服务且通道实例(channel instance)的性能受它所连接的 I/O 服务的特征限制,记住这很重要。一个连接到只读文件的 Channel 实例不能进行写操作,即使该实例所属的类可能有 write( )方法。基于此,程序员需要知道通道是如何打开的,避免试图尝试一个底层 I/O服务不允许的操作。

ByteChannel 的 read( ) 和 write( )方法使用 ByteBuffer 对象作为参数。两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining( )方法返回 false 值。下例 表示了如何从一个通道复制数据到另一个通道。

package czznio.buffer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

public class ChannelCopy {
    /**
     * 此代码将数据从stdin复制到stdout
     */
    public static void main(String[] argv)
            throws IOException {

        ReadableByteChannel source = Channels.newChannel(System.in);
        WritableByteChannel dest = Channels.newChannel(System.out);
        channelCopy1(source, dest);
        source.close();
        dest.close();
    }

    /**
     *通道复制方法1.
     * 此方法从src *通道复制数据并将其写入dest通道,直到src上的EOF。
     * 该实现利用临时缓冲区上的compact()
     * 如果缓冲区没有完全耗尽,则压缩数据。
     * 可能会导致数据复制,但会最大程度地减少系统调用。
     * 它还需要清理循环以确保所有数据都已发送。
     */
    private static void channelCopy1(ReadableByteChannel src,
                                     WritableByteChannel dest)
            throws IOException {
        ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
        while (src.read(buffer) != -1) {
            // 准备释放缓冲区
            buffer.flip();
            // 写到目的通道
            dest.write(buffer);
            //如果是部分转移,则将余数向下移位
            buffer.compact();
        }
        // EOF将使缓冲区处于填充状态
        buffer.flip();
        // 确保缓冲区已完全读完
        while (buffer.hasRemaining()) {
            dest.write(buffer);
        }
    }

}

通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。只有面向流的(stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。

socket 通道类从 SelectableChannel 引申而来。从 SelectableChannel 引申而来的类可以和支持有条件的选择(readiness selectio)的选择器(Selectors)一起使用。将非阻塞I/O 和选择器组合起来可以使您的程序利用多路复用I/O(multiplexed I/O)。这点也是下一篇选择器博客详细讲。

关闭通道

与缓冲区不同,通道不能被重复使用。一个打开的通道即代表与一个特定 I/O 服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。

调用通道的close( )方法时,可能会导致在通道关闭底层I/O服务的过程中线程暂时阻塞 ,哪怕该通道处于非阻塞模式。通道关闭时的阻塞行为(如果有的话)是高度取决于操作系统或者文件系统的。在一个通道上多次调用close( )方法是没有坏处的,但是如果第一个线程在close( )方法中阻塞,那么在它完成关闭通道之前,任何其他调用close( )方法都会阻塞。后续在该已关闭的通道上调用close( )不会产生任何操作,只会立即返回。

可以通过 isOpen( )方法来测试通道的开放状态。如果返回 true 值,那么该通道可以使用。如果返回 false 值,那么该通道已关闭,不能再被使用。尝试进行任何需要通道处于开放状态作为前提的操作,如读、写等都会导致ClosedChannelException 异常。

通道引入了一些与关闭和中断有关的新行为。如果一个通道实现 InterruptibleChannel 接口,它的行为以下述语义为准:如果一个线程在一个通道上被阻塞并且同时被中断(由调用该被阻塞线程的 interrupt( )方法的另一个线程中断),那么该通道将被关闭,该被阻塞线程也会产生一个 ClosedByInterruptException 异常。

此外,假如一个线程的 interrupt status 被设置并且该线程试图访问一个通道,那么这个通道将立即被关闭,同时将抛出相同的 ClosedByInterruptException 异常。线程的 interrupt status 在线程的interrupt( )方法被调用时会被设置。我们可以使用 isInterrupted( )来测试某个线程当前的 interrupt status。当前线程的 interrupt status 可以通过调用静态的Thread.interrupted( )方法清除。

仅仅因为休眠在其上的线程被中断就关闭通道,这看起来似乎过于苛刻了。不过这却是 NIO架构师们所做出的明确的设计决定。经验表明,想要在所有的操作系统上一致而可靠地处理被中断的 I/O 操作是不可能的。 “在全部平台上提供确定的通道行为”这一需求导致了“当 I/O 操作被中断时总是关闭通道”这一设计选择。这个选择被认为是可接受的,因为大部分时候一个线程被中断就是希望以此来关闭通道。 java.nio 包中强制使用此行为来避免因操作系统独特性而导致的困境,因为该困境对 I/O 区域而言是极其危险的。这也是为增强健壮性(robustness)而采用的一种经典的权衡。

可中断的通道也是可以异步关闭的。实现 InterruptibleChannel 接口的通道可以在任何时候被关闭,即使有另一个被阻塞的线程在等待该通道上的一个 I/O 操作完成。当一个通道被关闭时,休眠在该通道上的所有线程都将被唤醒并接收到一个 AsynchronousCloseException 异常。接着通道就被关闭并将不再可用。

Scatter/Gather

通道提供了一种被称为 Scatter/Gather 的重要新功能(有时也被称为矢量 I/O)。 Scatter/Gather是一个简单却强大的概念,它是指在多个缓冲区上实现一个简单的 I/O 操作。对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着通道发送的。缓冲区本身并不需要具备这种 gather 的能力(通常它们也没有此能力)。该 gather 过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于 read 操作而言,从通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。

大多数现代操作系统都支持本地矢量 I/O(native vectored I/O)。当您在一个通道上请求一个Scatter/Gather 操作时,该请求会被翻译为适当的本地调用来直接填充或抽取缓冲区。这是一个很大的进步,因为减少或避免了缓冲区拷贝和系统调用。 Scatter/Gather 应该使用直接的 ByteBuffers 以从本地 I/O 获取最大性能优势。

下面的代码描述了 scatter 是如何扩展读操作的,以及 gather 是如何基于写操作构建的:

public interface ScatteringByteChannel extends ReadableByteChannel
{
  public long read (ByteBuffer [] dsts) throws IOException;
  public long read (ByteBuffer [] dsts, int offset, int length) throws IOException;
}
public interface GatheringByteChannel extends WritableByteChannel
{
  public long write(ByteBuffer[] srcs) throws IOException;
  public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}

这两个接口都添加了两种以缓冲区数组作为参数的新方法。另外,每种方法都提供了一种带 offset 和 length 参数的形式。让我们先来理解一下怎样使用方法的简单形式。在下面的代码中,我们假定 channel 连接到一个有 48 字节数据等待读取的 socket 上:

ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (80);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);

一旦 read( )方法返回, bytesRead 就被赋予值 48, header 缓冲区将包含前 10 个从通道读取的字节而 body 缓冲区则包含接下来的 38 个字节。通道会自动地将数据 scatter 到这两个缓冲区中。缓冲区已经被填充了(尽管此例中 body 缓冲区还有空间填充更多数据),那么将需要被 flip以便其中数据可以被抽取。在类似这样的例子中,我们可能并不会费劲去 flip 这个 header 缓冲区而是以绝对 get 的方式随机访问它以检查各种 header 字段;不过 body 缓冲区会被 flip 并传递到另
一个通道的 write( )方法上,然后在通道上发送出去。例如:

switch (header.getShort(0)) {
  case TYPE_PING:
    break;
  case TYPE_FILE:
    body.flip( );
    fileChannel.write (body);
    break;
default:
    logUnknownPacket (header.getShort(0), header.getLong(2), body);
    break;
}

同样,很简单地,我们可以用一个 gather 操作将多个缓冲区的数据组合并发送出去。使用相同的缓冲区,我们可以像下面这样汇总数据并在一个 socket 通道上发送包:

body.clear( );
body.put("FOO".getBytes()).flip( ); // "FOO" as bytes
header.clear( );
header.putShort (TYPE_FILE).putLong (body.limit()).flip( );
long bytesWritten = channel.write (buffers);

以上代码从传递给 write( )方法的 buffers 数组所引用的缓冲区中 gather 数据,然后沿着通道发送了总共 13 个字节。

使用得当的话, Scatter/Gather 会是一个极其强大的工具。它允许您委托操作系统来完成辛苦活:将读取到的数据分开存放到多个存储桶(bucket)或者将不同的数据区块合并成一个整体。这是一个巨大的成就,因为操作系统已经被高度优化来完成此类工作了。它节省了您来回移动数据的工作,也就避免了缓冲区拷贝和减少了您需要编写、调试的代码数量。既然您基本上通过提供数据容器引用来组合数据,那么按照不同的组合构建多个缓冲区阵列数组,各种数据区块就可以以不同的方式来组合了。

文件通道

直到现在,我们都还只是在泛泛地讨论通道,比如讨论那些对所有通道都适用的内容。是时候具体点了,现在我们来讨论文件通道。FileChannel 类可以实现常用的 read, write 以及 scatter/gather 操作,同时它也提供了很多专用于文件的新方法。这些方法中的许多都是我们所熟悉的文件操作,不过其他的您可能之前并未接触过。现在我们将在此对它们全部予以讨论。

文件通道总是阻塞式的,因此不能被置于非阻塞模式。现代操作系统都有复杂的缓存和预取机制,使得本地磁盘 I/O 操作延迟很少。网络文件系统一般而言延迟会多些,不过却也因该优化而受益。 面向流的 I/O 的非阻塞范例对于面向文件的操作并无多大意义,这是由文件 I/O 本质上的不同性质造成的。对于文件 I/O,最强大之处在于异步 I/O(asynchronous I/O),它允许一个进程可以从操作系统请求一个或多个 I/O 操作而不必等待这些操作的完成。发起请求的进程之后会收到它请求的 I/O 操作已完成的通知。异步 I/O 是一种高级性能。

FileChannel对象不能直接创建。一个FileChannel实例只能通过在一个打开的file对象(RandomAccessFile、 FileInputStream或 FileOutputStream)上调用getChannel( )方法获取。调用getChannel( )方法会返回一个连接到相同文件的FileChannel对象且该FileChannel对象具有与file对象相同的访问权限,然后您就可以使用该通道对象来利用强大的FileChannel API了:

public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel {
    private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];

    protected FileChannel() {
    }

    public static FileChannel open(Path var0, Set<? extends OpenOption> var1, FileAttribute<?>... var2) throws IOException {
        FileSystemProvider var3 = var0.getFileSystem().provider();
        return var3.newFileChannel(var0, var1, var2);
    }

    public static FileChannel open(Path var0, OpenOption... var1) throws IOException {
        HashSet var2 = new HashSet(var1.length);
        Collections.addAll(var2, var1);
        return open(var0, var2, NO_ATTRIBUTES);
    }

    public abstract int read(ByteBuffer var1) throws IOException;

    public abstract long read(ByteBuffer[] var1, int var2, int var3) throws IOException;

    public final long read(ByteBuffer[] var1) throws IOException {
        return this.read(var1, 0, var1.length);
    }

    public abstract int write(ByteBuffer var1) throws IOException;

    public abstract long write(ByteBuffer[] var1, int var2, int var3) throws IOException;

    public final long write(ByteBuffer[] var1) throws IOException {
        return this.write(var1, 0, var1.length);
    }

    public abstract long position() throws IOException;

    public abstract FileChannel position(long var1) throws IOException;

    public abstract long size() throws IOException;

    public abstract FileChannel truncate(long var1) throws IOException;

    public abstract void force(boolean var1) throws IOException;

    public abstract long transferTo(long var1, long var3, WritableByteChannel var5) throws IOException;

    public abstract long transferFrom(ReadableByteChannel var1, long var2, long var4) throws IOException;

    public abstract int read(ByteBuffer var1, long var2) throws IOException;

    public abstract int write(ByteBuffer var1, long var2) throws IOException;

    public abstract MappedByteBuffer map(FileChannel.MapMode var1, long var2, long var4) throws IOException;

    public abstract FileLock lock(long var1, long var3, boolean var5) throws IOException;

    public final FileLock lock() throws IOException {
        return this.lock(0L, 9223372036854775807L, false);
    }

    public abstract FileLock tryLock(long var1, long var3, boolean var5) throws IOException;

    public final FileLock tryLock() throws IOException {
        return this.tryLock(0L, 9223372036854775807L, false);
    }

    public static class MapMode {
        public static final FileChannel.MapMode READ_ONLY = new FileChannel.MapMode("READ_ONLY");
        public static final FileChannel.MapMode READ_WRITE = new FileChannel.MapMode("READ_WRITE");
        public static final FileChannel.MapMode PRIVATE = new FileChannel.MapMode("PRIVATE");
        private final String name;

        private MapMode(String var1) {
            this.name = var1;
        }

        public String toString() {
            return this.name;
        }
    }
}

FileChannel 类本身是抽象的,您从 getChannel( )方法获取的实际对象是一个具体子类(subclass)的一个实例(instance),该子类可能使用本地代码来实现以上 API 方法中的一些或全部。

FileChannel 对象是线程安全(thread-safe)的。多个进程可以在同一个实例上并发调用方法而不会引起任何问题,不过并非所有的操作都是多线程的(multithreaded)。影响通道位置或者影响文件大小的操作都是单线程的(single-threaded)。如果有一个线程已经在执行会影响通道位置或文件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。并发行为也会受到底层的操作系统或文件系统影响。

同大多数 I/O 相关的类一样, FileChannel 是一个反映 Java 虚拟机外部一个具体对象的抽象。FileChannel 类保证同一个 Java 虚拟机上的所有实例看到的某个文件的视图均是一致的,但是 Java虚拟机却不能对超出它控制范围的因素提供担保。通过一个 FileChannel 实例看到的某个文件的视图同通过一个外部的非 Java 进程看到的该文件的视图可能一致,也可能不一致。多个进程发起的并发文件访问的语义高度取决于底层的操作系统和(或)文件系统。一般而言,由运行在不同 Java虚拟机上的 FileChannel 对象发起的对某个文件的并发访问和由非 Java 进程发起的对该文件的并发访问是一致的。

访问文件

每个 FileChannel 对象都同一个文件描述符(file descriptor)有一对一的关系。您可能也注意到了上面列出的 API 方法同 java.io 包中 RandomAccessFile 类的方法的相似之处了。本质上讲, RandomAccessFile 类提供的是同样的抽象内容。在通道出现之前,底层的文件操作都是通过 RandomAccessFile 类的方法来实现的。 FileChannel 模拟同样的 I/O 服务,因此它的 API 自然也是很相似的。

为了便于比较,下表列出了 FileChannel、 RandomAccessFile 和 POSIX I/O system calls 三者在方法上的对应关系


同底层的文件描述符一样,每个 FileChannel 都有一个叫“file position”的概念。这个 position 值决定文件中哪一处的数据接下来将被读或者写。从这个方面看, FileChannel 类同缓冲区很类似,并且 MappedByteBuffer 类使得我们可以通过ByteBuffer API 来访问文件数据。

您可以从前面的API清单中看到,有两种形式的position( )方法。第一种,不带参数的,返回当前文件的position值。返回值是一个长整型(long),表示文件中的当前字节位置。第二种形式的 position( )方法带一个 long(长整型) 参数并将通道的 position 设置为指定值。如果尝试将通道 position 设置为一个负值会导致 java.lang.IllegalArgumentException 异常,不过可以把 position 设置到超出文件尾,这样做会把 position 设置为指定值而不改变文件大小。假如在将position 设置为超出当前文件大小时实现了一个 read( )方法,那么会返回一个文件尾(end-of-file)条件;倘若此时实现的是一个 write( )方法则会引起文件增长以容纳写入的字节,具体行为类似于实现一个绝对 write( )并可能导致出现一个文件空洞。

文件空洞究竟是什么?

当磁盘上一个文件的分配空间小于它的文件大小时会出现“文件空洞”。对于内容稀疏的文件,大多数现代文件系统只为实际写入的数据分配磁盘空间(更准确地说,只为那些写入数据的文件系统页分配空间)。假如数据被写入到文件中非连续的位置上,这将导致文件出现在逻辑上不包含数据的区域(即“空洞”)。例如,下面的代码可能产生一个文件空洞

public class FileHole {


    public static void main(String[] argv)
            throws IOException {
        File temp = File.createTempFile("holy", null);
        RandomAccessFile file = new RandomAccessFile(temp, "rw");
        //获取文件通道
        FileChannel channel = file.getChannel();
        //分配一个直接缓冲区,用于向通道内传输数据
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100);
        putData(0, byteBuffer, channel);
        putData(5000000, byteBuffer, channel);
        putData(50000, byteBuffer, channel);
// Size will report the largest position written, but
// there are two holes in this file. This file will
// not consume 5 MB on disk (unless the filesystem is
// extremely brain-damaged)
        System.out.println("Wrote temp file '" + temp.getPath()
                + "', size=" + channel.size());
        channel.close();
        file.close();
    }
    //向通道内写入数据
    private static void putData(int position, ByteBuffer buffer,
                                FileChannel channel)
            throws IOException {
        String string = "*<-- location " + position;
        buffer.clear();
        buffer.put(string.getBytes("US-ASCII"));
        buffer.flip();
        channel.position(position);
        channel.write(buffer);
    }
}

如果该文件被顺序读取的话,所有空洞都会被“0”填充但不占用磁盘空间。读取该文件的进程会看到 5,000,021 个字节,大部分字节都以“0”表示。试试在该文件上运行 strings 命令,看看您会得到什么。再试试将文件大小的值提高到 50 或 100MB,看看您的全部磁盘空间消耗以及顺序扫描该文件所需时间会发生何种变化(前者不会改变,但是后者将有非常大的增加)。

FileChannel 位置(position)是从底层的文件描述符获得的,该 position 同时被作为通道引用获取来源的文件对象共享。这也就意味着一个对象对该 position 的更新可以被另一个对象看到:

public class FileHole {


    public static void main(String[] argv)
            throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile ("test.txt", "r");
// Set the file position
        randomAccessFile.seek (1000);
// Create a channel from the file
        FileChannel fileChannel = randomAccessFile.getChannel( );
// This will print "1000"
        System.out.println ("file pos: " + fileChannel.position( ));
// Change the position using the RandomAccessFile object
        randomAccessFile.seek (500);
// This will print "500"
        System.out.println ("file pos: " + fileChannel.position( ));
// Change the position using the FileChannel object
        fileChannel.position (200);
// This will print "200"
        System.out.println ("file pos: " + randomAccessFile.getFilePointer( ));
    }
}

类似于缓冲区的 get( ) 和 put( )方法,当字节被 read( )或 write( )方法传输时,文件 position 会自动更新。如果 position 值达到了文件大小的值(文件大小的值可以通过 size( )方法返回), read( )方法会返回一个文件尾条件值(-1)。可是,不同于缓冲区的是,如果实现 write( )方法时 position前进到超过文件大小的值,该文件会扩展以容纳新写入的字节。同样类似于缓冲区,也有带 position 参数的绝对形式的 read( )和 write( )方法。这种绝对形式的方法在返回值时不会改变当前的文件 position。由于通道的状态无需更新,因此绝对的读和写可能会更加有效率,操作请求可以直接传到本地代码。更妙的是,多个线程可以并发访问同一个文件而不会相互产生干扰。这是因为每次调用都是原子性的(atomic),并不依靠调用之间系统所记住的状态。

尝试在文件末尾之外的 position 进行一个绝对读操作, size( )方法会返回一个 end-of-file。在超出文件大小的 position 上做一个绝对 write( )会导致文件增加以容纳正在被写入的新字节。文件中位于之前 end-of-file 位置和新添加的字节起始位置之间区域的字节的值不是由 FileChannel 类指定,而是在大多数情况下反映底层文件系统的语义。取决于操作系统和(或)文件系统类型,这可能会导致在文件中出现一个空洞。

当需要减少一个文件的 size 时, truncate( )方法会砍掉您所指定的新 size 值之外的所有数据。如果当前 size 大于新size,超出新 size 的所有字节都会被悄悄地丢弃。如果提供的新 size 值大于或等于当前的文件 size 值,该文件不会被修改。这两种情况下, truncate( )都会产生副作用:文件的position 会被设置为所提供的新 size 值。

public abstract class FileChannel
extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
{
// This is a partial API listing
public abstract void truncate (long size)
public abstract void force (boolean metaData)
}

上面列出的最后一个 API 是 force( )。该方法告诉通道强制将全部待定的修改都应用到磁盘的文件上。所有的现代文件系统都会缓存数据和延迟磁盘文件更新以提高性能。调用 force( )方法要求文件的所有待定修改立即同步到磁盘。

如果文件位于一个本地文件系统,那么一旦 force( )方法返回,即可保证从通道被创建(或上次调用 force( ))时起的对文件所做的全部修改已经被写入到磁盘。对于关键操作如事务(transaction)处理来说,这一点是非常重要的,可以保证数据完整性和可靠的恢复。然而,如果文件位于一个远程的文件系统,如 NFS 上,那么不能保证待定修改一定能同步到永久存储器(permanent storage)上,因 Java 虚拟机不能做操作系统或文件系统不能实现的承诺。如果您的程序在面临系统崩溃时必须维持数据完整性,先去验证一下您在使用的操作系统和(或)文件系统在同步修改方面是可以依赖的。

force( )方法的布尔型参数表示在方法返回值前文件的元数据(metadata)是否也要被同步更新到磁盘。元数据指文件所有者、访问权限、最后一次修改时间等信息。大多数情形下,该信息对数据恢复而言是不重要的。给 force( )方法传递 false 值表示在方法返回前只需要同步文件数据的更改。大多数情形下,同步元数据要求操作系统进行至少一次额外的底层 I/O 操作。一些大数量事务处理程序可能通过在每次调用 force( )方法时不要求元数据更新来获取较高的性能提升,同时也不
会牺牲数据完整性。

文件锁定

在 JDK 1.4 版本之前, Java I/O 模型都未能提供文件锁定(file locking),缺少这一特性让人们很头疼。绝大多数现代操作系统早就有了文件锁定功能,而直到 JDK 1.4 版本发布时 Java 编程人员才可以使用文件锁(file lock)。在集成许多其他非 Java 程序时,文件锁定显得尤其重要。此外,它在判优(判断多个访问请求的优先级别) 一个大系统的多个 Java 组件发起的访问时也很有价值。

锁(lock)可以是共享的(shared)或独占的(exclusive)。本节中描述的文件锁定特性在很大程度上依赖本地的操作系统实现。并非所有的操作系统和文件系统都支持共享文件锁。对于那些不支持的,对一个共享锁的请求会被自动提升为对独占锁的请求。这可以保证准确性却可能严重影响性能。如果您计划部署程序,请确保您了解所用操作系统和文件系统的文件锁定行为,因为这将严重影响您的设计选择。

另外,并非所有平台都以同一个方式来实现基本的文件锁定。在不同的操作系统上,甚至在同一个操作系统的不同文件系统上,文件锁定的语义都会有所差异。一些操作系统仅提供劝告锁定(advisory locking) ,一些仅提供独占锁(exclusive locks),而有些操作系统可能两种锁都提供。

您应该总是按照劝告锁的假定来管理文件锁,因为这是最安全的。但是如能了解底层操作系统如何执行锁定也是非常好的。例如,如果所有的锁都是强制性的(mandatory)而您不及时释放您获得的锁的话,运行在同一操作系统上的其他程序可能会受到影响。

有关 FileChannel 实现的文件锁定模型的一个重要注意项是:锁的对象是文件而不是通道或线程,这意味着文件锁不适用于判优同一台 Java 虚拟机上的多个线程发起的访问。如果一个线程在某个文件上获得了一个独占锁,然后第二个线程利用一个单独打开的通道来请求该文件的独占锁,那么第二个线程的请求会被批准。但如果这两个线程运行在不同的 Java 虚拟
机上,那么第二个线程会阻塞,因为锁最终是由操作系统或文件系统来判优的并且几乎总是在进程级而非线程级上判优。锁都是与一个文件关联的,而不是与单个的文件句柄或通道关联。

文件锁旨在在进程级别上判优文件访问,比如在主要的程序组件之间或者在集成其他供应商的组件时。如果您需要控制多个 Java 线程的并发访问,您可能需要实施您自己的、轻量级的锁定方案。那种情形下,内存映射文件(本章后面会进行详述)可能是一个合适的选择。

现在让我们来看下与文件锁定有关的 FileChannel API 方法:

public abstract class FileChannel
extends AbstractChannel
implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
{
// This is a partial API listing
public final FileLock lock( )
public abstract FileLock lock (long position, long size,boolean shared)
public final FileLock tryLock( )
public abstract FileLock tryLock (long position, long size,boolean shared)
}

这次我们先看带参数形式的 lock( )方法。 锁是在文件内部区域上获得的。调用带参数的 Lock( )方法会指定文件内部锁定区域的开始 position 以及锁定区域的 size。第三个参数 shared 表示您想获取的锁是共享的(参数值为 true)还是独占的(参数值为 false)。

要获得一个共享锁,您必须先以只读权限打开文件,而请求独占锁时则需要写权限。另外,您提供的 position和 size 参数的值不能是负数。锁定区域的范围不一定要限制在文件的 size 值以内,锁可以扩展从而超出文件尾。因此,我们可以提前把待写入数据的区域锁定,我们也可以锁定一个不包含任何文件内容的区域,比如文件最后一个字节以外的区域。如果之后文件增长到达那块区域,那么您的文件锁就可以保护该区域的文件内容了。相反地,如果您锁定了文件的某一块区域,然后文件增长超出了那块区域,那么新增加的文件内容将不会受到您的文件锁的保护。

不带参数的简单形式的 lock( )方法是一种在整个文件上请求独占锁的便捷方法,锁定区域等于它能达到的最大范围。该方法等价于:
fileChannel.lock (0L, Long.MAX_VALUE, false);
如果您正请求的锁定范围是有效的,那么 lock( )方法会阻塞,它必须等待前面的锁被释放。假如您的线程在此情形下被暂停,该线程的行为受中断语义控制。如果通道被另外一个线程关闭,该暂停线程将恢复并产生一个AsynchronousCloseException 异常。假如该暂停线程被直接中断(通过调用它的 interrupt( )方法),它将醒来并产生一个
FileLockInterruptionException 异常。如果在调用 lock( )方法时线程的 interrupt status 已经被设置,也会产生 FileLockInterruptionException 异常。

在上面的 API 列表中有两个名为 tryLock( )的方法,它们是 lock( )方法的非阻塞变体。这两个tryLock( )和 lock( )方法起相同的作用,不过如果请求的锁不能立即获取到则会返回一个 null。您可以看到, lock( )和 tryLock( )方法均返回一个 FileLock 对象。以下是完整的 FileLock API:

public abstract class FileLock
{
public final FileChannel channel( )
public final long position( )
public final long size( )
public final boolean isShared( )
public final boolean overlaps (long position, long size)
public abstract boolean isValid( );
public abstract void release( ) throws IOException;
}

FileLock 类封装一个锁定的文件区域。 FileLock 对象由 FileChannel 创建并且总是关联到那个特定的通道实例。您可以通过调用 channel( )方法来查询一个 lock 对象以判断它是由哪个通道创建的。
一个 FileLock 对象创建之后即有效,直到它的 release( )方法被调用或它所关联的通道被关闭或Java 虚拟机关闭时才会失效。我们可以通过调用 isValid( )布尔方法来测试一个锁的有效性。一个锁的有效性可能会随着时间而改变,不过它的其他属性——位置(position)、范围大小(size)和独占性(exclusivity) ——在创建时即被确定,不会随着时间而改变。

您可以通过调用 isShared( )方法来测试一个锁以判断它是共享的还是独占的。如果底层的操作系统或文件系统不支持共享锁,那么该方法将总是返回 false 值,即使您申请锁时传递的参数值是 true。假如您的程序依赖共享锁定行为,请测试返回的锁以确保您得到了您申请的锁类型。

FileLock 对象是线程安全的,多个线程可以并发访问一个锁对象。

最后,您可以通过调用 overlaps( )方法来查询一个 FileLock 对象是否与一个指定的文件区域重叠。这将使您可以迅速判断您拥有的锁是否与一个感兴趣的区域(region of interest)有交叉。不过即使返回值是 false 也不能保证您就一定能在期望的区域上获得一个锁,因为 Java 虚拟机上的其他地方或者外部进程可能已经在该期望区域上有一个或多个锁了。您最好使用 tryLock( )方法确认一下。

尽管一个 FileLock 对象是与某个特定的 FileChannel 实例关联的,它所代表的锁却是与一个底层文件关联的,而不是与通道关联。因此,如果您在使用完一个锁后而不释放它的话,可能会导致冲突或者死锁。请小心管理文件锁以避免出现此问题。一旦您成功地获取了一个文件锁,如果随后在通道上出现错误的话,请务必释放这个锁。推荐使用类似下面的代码形式:

FileLock lock = fileChannel.lock( )
try {
    <perform read/write/whatever on channel>
} catch (IOException) [
  <handle unexpected exception>} 
finally {
  lock.release( )
}
package czznio.buffer;

import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Random;

public class LockTest {

    private static final int SIZEOF_INT = 4;
    private static final int INDEX_START = 0;
    private static final int INDEX_COUNT = 10;
    private static final int INDEX_SIZE = INDEX_COUNT * SIZEOF_INT;

    private ByteBuffer buffer = ByteBuffer.allocate(INDEX_SIZE);
    private IntBuffer indexBuffer = buffer.asIntBuffer();
    private Random rand = new Random();

    public static void main(String[] argv)
            throws Exception {
        boolean writer = false;
        String filename;
        if (argv.length != 2) {
            System.out.println("Usage: [ -r | -w ] filename");
            return;
        }
        writer = argv[0].equals("-w");
        filename = argv[1];
        RandomAccessFile raf = new RandomAccessFile(filename,
                (writer) ? "rw" : "r");
        FileChannel fc = raf.getChannel();
        LockTest lockTest = new LockTest();
        if (writer) {
            lockTest.doUpdates(fc);
        } else {
            lockTest.doQueries(fc);
        }
    }

    // ----------------------------------------------------------------
// Simulate a series of read-only queries while
// holding a shared lock on the index area
    void doQueries(FileChannel fc)
            throws Exception {
        while (true) {
            println("trying for shared lock...");
            FileLock lock = fc.lock(INDEX_START, INDEX_SIZE, true);
            int reps = rand.nextInt(60) + 20;
            for (int i = 0; i < reps; i++) {
                int n = rand.nextInt(INDEX_COUNT);
                int position = INDEX_START + (n * SIZEOF_INT);
                buffer.clear();
                fc.read(buffer, position);
                int value = indexBuffer.get(n);
                println("Index entry " + n + "=" + value);
// Pretend to be doing some work
                Thread.sleep(100);
            }
            lock.release();
            println("<sleeping>");
            Thread.sleep(rand.nextInt(3000) + 500);
        }
    }

    // Simulate a series of updates to the index area
// while holding an exclusive lock
    void doUpdates(FileChannel fc)
            throws Exception {
        while (true) {
            println("trying for exclusive lock...");
            FileLock lock = fc.lock(INDEX_START,
                    INDEX_SIZE, false);
            updateIndex(fc);
            lock.release();
            println("<sleeping>");
            Thread.sleep(rand.nextInt(2000) + 500);
        }
    }

    // Write new values to the index slots
    private int idxval = 1;

    private void updateIndex(FileChannel fc)
            throws Exception {
// "indexBuffer" is an int view of "buffer"
        indexBuffer.clear();
        for (int i = 0; i < INDEX_COUNT; i++) {
            idxval++;
            println("Updating index " + i + "=" + idxval);
            indexBuffer.put(idxval);
// Pretend that this is really hard work
            Thread.sleep(500);
        }
// leaves position and limit correct for whole buffer
        buffer.clear();
        fc.write(buffer, INDEX_START);
    }

    // ----------------------------------------------------------------
    private int lastLineLen = 0;

    // Specialized println that repaints the current line
    private void println(String msg) {
        System.out.print("\r ");
        System.out.print(msg);
        for (int i = msg.length(); i < lastLineLen; i++) {
            System.out.print(" ");
        }
        System.out.print("\r");
        System.out.flush();
        lastLineLen = msg.length();
    }
}

以上代码直接忽略了我之前说给的用 try/catch/finally 来释放锁的建议,在您自己所写的实际代码中请不要这么懒。

内存映射文件

新的 FileChannel 类提供了一个名为 map( )的方法,该方法可以在一个打开的文件和一个特殊类型的 ByteBuffer 之间建立一个虚拟内存映射。在 FileChannel 上调用 map( )方法会创建一个由磁盘文件支持的虚拟内存映射(virtual memory mapping)并在那块虚拟内存空间外部封装一个 MappedByteBuffer 对象。

由 map( )方法返回的 MappedByteBuffer 对象的行为在多数方面类似一个基于内存的缓冲区,只不过该对象的数据元素存储在磁盘上的一个文件中。调用 get( )方法会从磁盘文件中获取数据,此数据反映该文件的当前内容,即使在映射建立之后文件已经被一个外部进程做了修改。
通过文件映射看到的数据同您用常规方法读取文件看到的内容是完全一样的。相似地,对映射的缓冲区实现一个 put( )会更新磁盘上的那个文件(假设对该文件您有写的权限),并且您做的修改对于该文件的其他阅读者也是可见的。

通过内存映射机制来访问一个文件会比使用常规方法读写高效得多,甚至比使用通道的效率都高。因为不需要做明确的系统调用,那会很消耗时间。更重要的是,操作系统的虚拟内存可以自动缓存内存页(memory page)。这些页是用系统内存来缓存的,所以不会消耗 Java 虚拟机内存堆。

一旦一个内存页已经生效(从磁盘上缓存进来),它就能以完全的硬件速度再次被访问而不需要再次调用系统命令来获取数据。那些包含索引以及其他需频繁引用或更新的内容的巨大而结构化文件能因内存映射机制受益非常多。如果同时结合文件锁定来保护关键区域和控制事务原子性,那您将能了解到内存映射缓冲区如何可以被很好地利用。

下面让我们来看一下如何使用内存映射:

public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
{
// This is a partial API listing
public abstract MappedByteBuffer map (MapMode mode, long position,long size)
public static class MapMode
  {
    public static final MapMode READ_ONLY
    public static final MapMode READ_WRITE
    public static final MapMode PRIVATE
  }
}

可以看到,只有一种 map( )方法来创建一个文件映射。它的参数有 mode, position 和 size。参数 position 和 size 同 lock( )方法的这两个参数是一样的。我们可以创建一个 MappedByteBuffer 来代表一个文件中字节的某个子范围。例如,要映射 100 到 299(包含 299)位置的字节,可以使用下面的代码:
buffer = fileChannel.map (FileChannel.MapMode.READ_ONLY, 100, 200);
如果要映射整个文件则使用:
buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
案例:

public class Test {
    public static void main(String[] args) throws Exception {
        RandomAccessFile outputStream = new RandomAccessFile("test.txt","rw");
        FileChannel fileChannel = outputStream.getChannel();
        MappedByteBuffer mappedByteBuffer= fileChannel.map(FileChannel.MapMode.READ_WRITE,0,fileChannel.size());
        
        System.out.println(mappedByteBuffer.get());
        mappedByteBuffer.put(0, (byte) 'c');
        mappedByteBuffer.flip();
         mappedByteBuffer.force();
//        fileChannel.write(mappedByteBuffer);
        System.out.println();
    }
}

与文件锁的范围机制不一样,映射文件的范围不应超过文件的实际大小。如果您请求一个超出文件大小的映射,文件会被增大以匹配映射的大小。假如您给 size 参数传递的值是Integer.MAX_VALUE,文件大小的值会膨胀到超过 2.1GB。即使您请求的是一个只读映射,map( )方法也会尝试这样做并且大多数情况下都会抛出一个 IOException 异常,因为底层的文件不能被修改。该行为同之前讨论的文件“空洞”的行为是一致的。

FileChannel 类定义了代表映射模式的常量,且是使用一个类型安全的枚举而非数字值来定义这些常量。这些常量是FileChannel 内部定义的一个内部类( inner class)的静态字段,它们可以在编译时被检查类型,不过您可以像使用一个数值型常量那样使用它们。

同常规的文件句柄类似,文件映射可以是可写的或只读的。前两种映射模式MapMode.READ_ONLY 和MapMode.READ_WRITE 意义是很明显的,它们表示您希望获取的映射只读还是允许修改映射的文件。请求的映射模式将受被调用 map( )方法的 FileChannel 对象的访问权限所限制。如果通道是以只读的权限打开的而您却请求MapMode.READ_WRITE 模式,那么map( )方法会抛出一个 NonWritableChannelException 异常;如果您在一个没有读权限的通道上请求MapMode.READ_ONLY 映射模式,那么将产生 NonReadableChannelException 异常。不过在以read/write 权限打开的通道上请求一个 MapMode.READ_ONLY 映射却是允许的。 MappedByteBuffer对象的可变性可以通过对它调用 isReadOnly( )方法来检查。第三种模式 MapMode.PRIVATE 表示您想要一个写时拷贝( copy-on-write)的映射。这意味着您通过 put( )方法所做的任何修改都会导致产生一个私有的数据拷贝并且该拷贝中的数据只有MappedByteBuffer 实例可以看到。该过程不会对底层文件做任何修改,而且一旦缓冲区被施以垃圾收集动作( garbage collected),那些修改都会丢失。尽管写时拷贝的映射可以防止底层文件被修改,您也必须以 read/write 权限来打开文件以建立 MapMode.PRIVATE 映射。只有这样,返回的MappedByteBuffer 对象才能允许使用 put( )方法。

写时拷贝这一技术经常被操作系统使用,以在一个进程生成另一个进程时管理虚拟地址空间。使用写时拷贝可以允许父进程和子进程共享内存页直到它们中的一方实际发生修改行为。在处理同一文件的多个映射时也有相同的优势(当然,这需要底层操作系统的支持)。

假设一个文件被多个 MappedByteBuffer 对象映射并且每个映射都是 MapMode.PRIVATE 模式,那么这份文件的大部分内容都可以被所有映射共享。

选择使用 MapMode.PRIVATE 模式并不会导致您的缓冲区看不到通过其他方式对文件所做的修改。对文件某个区域的修改在使用 MapMode.PRIVATE 模式的缓冲区中都能反映出来,除非该缓冲区已经修改了文件上的同一个区域。A在一个写时拷贝的缓冲区上调用 put( )方法时,受影响的页会被拷贝,然后更改就会应用到该拷贝中。具体的页面大小取决于具体实现,不过通常都是和底层文件系统的页面大小时一样的。

如果缓冲区还没对某个页做出修改,那么这个页就会反映被映射文件的相应位置上的内容。一旦某个页因为写操作而被拷贝,之后就将使用该拷贝页,并且不能被其他缓冲区或文件更新所修改。看例子:

package czznio.buffer;

import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class Hitchens {

    public static void main(String[] argv)
            throws Exception {
// Create a temp file and get a channel connected to it
        File tempFile = File.createTempFile("mmaptest", null);
        RandomAccessFile file = new RandomAccessFile(tempFile, "rw");
        FileChannel channel = file.getChannel();
        ByteBuffer temp = ByteBuffer.allocate(100);
// Put something in the file, starting at location 0
        temp.put("This is the file content".getBytes());
        temp.flip();
        channel.write(temp, 0);
// Put something else in the file, starting at location 8192.
// 8192 is 8 KB, almost certainly a different memory/FS page.
// This may cause a file hole, depending on the
// filesystem page size.
        temp.clear();
        temp.put("This is more file content".getBytes());
        temp.flip();
        channel.write(temp, 8192);
// Create three types of mappings to the same file
        MappedByteBuffer ro = channel.map(
                FileChannel.MapMode.READ_ONLY, 0, channel.size());
        MappedByteBuffer rw = channel.map(
                FileChannel.MapMode.READ_WRITE, 0, channel.size());
        MappedByteBuffer cow = channel.map(
                FileChannel.MapMode.PRIVATE, 0, channel.size());
// the buffer states before any modifications
        System.out.println("Begin");
        showBuffers(ro, rw, cow);
// Modify the copy-on-write buffer
        cow.position(8);
        cow.put("COW".getBytes());
        System.out.println("Change to COW buffer");
        showBuffers(ro, rw, cow);
// Modify the read/write bufferrw.position (9);
        rw.put(" R/W ".getBytes());
        rw.position(8194);
        rw.put(" R/W ".getBytes());
        rw.force();
        System.out.println("Change to R/W buffer");
        showBuffers(ro, rw, cow);
// Write to the file through the channel; hit both pages
        temp.clear();
        temp.put("Channel write ".getBytes());
        temp.flip();
        channel.write(temp, 0);
        temp.rewind();
        channel.write(temp, 8202);
        System.out.println("Write on channel");
        showBuffers(ro, rw, cow);
// Modify the copy-on-write buffer again
        cow.position(8207);
        cow.put(" COW2 ".getBytes());
        System.out.println("Second change to COW buffer");
        showBuffers(ro, rw, cow);
// Modify the read/write buffer
        rw.position(0);
        rw.put(" R/W2 ".getBytes());
        rw.position(8210);
        rw.put(" R/W2 ".getBytes());
        rw.force();
        System.out.println("Second change to R/W buffer");
        showBuffers(ro, rw, cow);
// cleanup
        channel.close();
        file.close();
        tempFile.delete();
    }

    // Show the current content of the three buffers
    public static void showBuffers(ByteBuffer ro, ByteBuffer rw,
                                   ByteBuffer cow)
            throws Exception {
        dumpBuffer("R/O", ro);
        dumpBuffer("R/W", rw);
        dumpBuffer("COW", cow);
        System.out.println("");
    }

    // Dump buffer content, counting and skipping nulls
    public static void dumpBuffer(String prefix, ByteBuffer buffer)
            throws Exception {
        System.out.print(prefix + ": '");
        int nulls = 0;
        int limit = buffer.limit();
        for (int i = 0; i < limit; i++) {
            char c = (char) buffer.get(i);
            if (c == '\u0000') {
                nulls++;
                continue;
            }
            if (nulls != 0) {
                System.out.print("|[" + nulls
                        + " nulls]|");
                nulls = 0;
            }
            System.out.print(c);
        }
        System.out.println("'");
    }
}

您应该注意到了没有 unmap( )方法。也就是说,一个映射一旦建立之后将保持有效,直到MappedByteBuffer 对象被施以垃圾收集动作为止。同锁不一样的是,映射缓冲区没有绑定到创建它们的通道上。关闭相关联的 FileChannel 不会破坏映射,只有丢弃缓冲区对象本身才会破坏该映射。 NIO 设计师们之所以做这样的决定是因为当关闭通道时破坏映射会引起安全问题,而解决该安全问题又会导致性能问题。

如果您确实需要知道一个映射是什么时候被破坏的,他们建议使用虚引用(phantom references,参见 java.lang.ref.PhantomReference)和一个 cleanup 线程。不过有此需要的概率是微乎其微的。

MemoryMappedBuffer 直接反映它所关联的磁盘文件。如果映射有效时文件被在结构上修改,就会产生奇怪的行为(当然具体的行为是取决于操作系统和文件系统的)。 MemoryMappedBuffer有固定的大小,不过它所映射的文件却是弹性的。具体来说,如果映射有效时文件大小变化了,那么缓冲区的部分或全部内容都可能无法访问,并将返回未定义的数据或者抛出未检查的异常。关于被内存映射的文件如何受其他线程或外部进程控制这一点,请务必小心对待。

所有的 MappedByteBuffer 对象都是直接的,这意味着它们占用的内存空间位于 Java 虚拟机内存堆之外(并且可能不会算作 Java 虚拟机的内存占用,不过这取决于操作系统的虚拟内存模型)。

因为 MappedByteBuffers 也是 ByteBuffers,所以能够被传递 SocketChannel 之类通道的 read( )或write( )以有效传输数据给被映射的文件或从被映射的文件读取数据。如能再结合 scatter/gather,那么从内存缓冲区和被映射文件内容中组织数据就变得很容易了。下例就是以此方式写 HTTP 回应的。

package czznio.buffer;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class MappedHttp {

    private static final String OUTPUT_FILE = "MappedHttp.out";
    private static final String LINE_SEP = "\r\n";
    private static final String SERVER_ID = "Server: Ronsoft Dummy Server";
    private static final String HTTP_HDR =
            "HTTP/1.0 200 OK" + LINE_SEP + SERVER_ID + LINE_SEP;
    private static final String HTTP_404_HDR =
            "HTTP/1.0 404 Not Found" + LINE_SEP + SERVER_ID + LINE_SEP;
    private static final String MSG_404 = "Could not open file: ";
    public static void main (String [] argv)
            throws Exception
    {
        String file = "test.txt";
        ByteBuffer header = ByteBuffer.wrap (bytes (HTTP_HDR));
        ByteBuffer dynhdrs = ByteBuffer.allocate (128);
        ByteBuffer [] gather = { header, dynhdrs, null };
        String contentType = "unknown/unknown";
        long contentLength = -1;
        try {
            FileInputStream fis = new FileInputStream (file);
            FileChannel fc = fis.getChannel( );
            MappedByteBuffer filedata =
                    fc.map (FileChannel.MapMode.READ_ONLY, 0, fc.size( ));
            gather [2] = filedata;
            contentLength = fc.size( );
            contentType = URLConnection.guessContentTypeFromName (file);
        } catch (IOException e) {
// file could not be opened; report problem
            ByteBuffer buf = ByteBuffer.allocate (128);
            String msg = MSG_404 + e + LINE_SEP;
            buf.put (bytes (msg));
            buf.flip( );
// Use the HTTP error response
            gather [0] = ByteBuffer.wrap (bytes (HTTP_404_HDR));
            gather [2] = buf;
            contentLength = msg.length( );
            contentType = "text/plain";
        }
        StringBuffer sb = new StringBuffer( );
        sb.append ("Content-Length: " + contentLength);
        sb.append (LINE_SEP);
        sb.append ("Content-Type: ").append (contentType);
        sb.append (LINE_SEP).append (LINE_SEP);
        dynhdrs.put (bytes (sb.toString( )));
        dynhdrs.flip( );
        FileOutputStream fos = new FileOutputStream (OUTPUT_FILE);
        FileChannel out = fos.getChannel( );// All the buffers have been prepared; write 'em out
        while (out.write (gather) > 0) {
// Empty body; loop until all buffers are empty
        }
        out.close( );
        System.out.println ("output written to " + OUTPUT_FILE);
    }
    // Convert a string to its constituent bytes
// from the ASCII character set
    private static byte [] bytes (String string)
            throws Exception
    {
        return (string.getBytes ("US-ASCII"));
    }
}

到现在为止,我们已经讨论完了映射缓冲区同其他缓冲区相同的特性,这些也是您会用得最多的。不过 MappedByteBuffer 还定义了几个它独有的方法:

public abstract class MappedByteBuffer
extends ByteBuffer
{
    // This is a partial API listing
    public final MappedByteBuffer load( )
    public final boolean isLoaded( )
    public final MappedByteBuffer force( )
}

当我们为一个文件建立虚拟内存映射之后,文件数据通常不会因此被从磁盘读取到内存(这取决于操作系统)。该过程类似打开一个文件:文件先被定位,然后一个文件句柄会被创建,当您准备好之后就可以通过这个句柄来访问文件数据。对于映射缓冲区,虚拟内存系统将根据您的需要来把文件中相应区块的数据读进来。这个页验证或防错过程需要一定的时间,因为将文件数据读取到内存需要一次或多次的磁盘访问。
某些场景下,您可能想先把所有的页都读进内存以实现最小的缓冲区访问延迟。如果文件的所有页都是常驻内存的,那么它的访问速度就和访问一个基于内存的缓冲区一样了。

load( )方法会加载整个文件以使它常驻内存。一个内存映射缓冲区会建立与某个文件的虚拟内存映射。此映射使得操作系统的底层虚拟内存子系统可以根据需要将文件中相应区块的数据读进内存。已经在内存中或通过验证的页会占用实际内存空间,并且在它们被读进 RAM 时会挤出最近较少使用的其他内存页。

在一个映射缓冲区上调用 load( )方法会是一个代价高的操作,因为它会导致大量的页调入(page-in),具体数量取决于文件中被映射区域的实际大小。然而, load( )方法返回并不能保证文件就会完全常驻内存,这是由于请求页面调入(demand paging)是动态的。具体结果会因某些因素而有所差异,这些因素包括:操作系统、文件系统,可用 Java 虚拟机内存,最大 Java 虚拟机内存,垃圾收集器实现过程等等。请小心使用 load( )方法,它可能会导致您不希望出现的结果。该方法的主要作用是为提前加载文件埋单,以便后续的访问速度可以尽可能的快。

对于那些要求近乎实时访问(near-realtime access)的程序,解决方案就是预加载。但是请记住,不能保证全部页都会常驻内存,不管怎样,之后可能还会有页调入发生。内存页什么时候以及怎样消失受多个因素影响,这些因素中的许多都是不受 Java 虚拟机控制的。 JDK 1.4 的 NIO 并没有提供一个可以把页面固定到物理内存上的 API,尽管一些操作系统是支持这样做的。

对于大多数程序,特别是交互性的或其他事件驱动(event-driven)的程序而言,为提前加载文件消耗资源是不划算的。在实际访问时分摊页调入开销才是更好的选择。让操作系统根据需要来调入页意味着不访问的页永远不需要被加载。同预加载整个被映射的文件相比,这很容易减少 I/O 活动总次数。操作系统已经有一个复杂的内存管理系统了,就让它来替您完成此工作吧!

我们可以通过调用 isLoaded( )方法来判断一个被映射的文件是否完全常驻内存了。如果该方法返回 true 值,那么很大概率是映射缓冲区的访问延迟很少或者根本没有延迟。不过,这也是不能保证的。同样地,返回 false 值并不一定意味着访问缓冲区将很慢或者该文件并未完全常驻内存。 isLoaded( )方法的返回值只是一个暗示,由于垃圾收集的异步性质、底层操作系统以及运行系统的动态性等因素,想要在任意时刻准确判断全部映射页的状态是不可能的。

上面代码中列出的最后一个方法 force( )同 FileChannel 类中的同名方法相似该方法会强制将映射缓冲区上的更改应用到永久磁盘存储器上。当用 MappedByteBuffer 对象来更新一个文件,您应该总是使用 MappedByteBuffer.force( )而非 FileChannel.force( ),因为通道对象可能不清楚通过映射缓冲区做出的文件的全部更改。 MappedByteBuffer 没有不更新文件元数据的选项——元数据总是会同时被更新的。请注意,非本地文件系统也同样影响 MappedByteBuffer.force( )方法,正如它会对 FileChannel.force( )方法有影响。如果映射是以 MapMode.READ_ONLY 或 MAP_MODE.PRIVATE 模式建立的,那么调用 force( )方法将不起任何作用,因为永远不会有更改需要应用到磁盘上(但是这样做也是没有害处的)。

Channel-to-Channel 传输

由于经常需要从一个位置将文件数据批量传输到另一个位置, FileChannel 类添加了一些优化方法来提高该传输过程的效率:

public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
{
  // This is a partial API listing
  public abstract long transferTo (long position, long count,WritableByteChannel target)
  public abstract long transferFrom (ReadableByteChannel src,long position, long count)
}

transferTo( )和 transferFrom( )方法允许将一个通道交叉连接到另一个通道,而不需要通过一个中间缓冲区来传递数据。只有 FileChannel 类有这两个方法,因此 channel-to-channel 传输中通道之一必须是 FileChannel。您不能在 socket 通道之间直接传输数据,不过 socket 通道实现WritableByteChannel 和 ReadableByteChannel 接口,因此文件的内容可以用 transferTo( )方法传输给一个 socket 通道,或者也可以用 transferFrom( )方法将数据从一个 socket 通道直接读取到一个文件中。

直接的通道传输不会更新与某个 FileChannel 关联的 position 值。请求的数据传输将从position 参数指定的位置开始,传输的字节数不超过 count 参数的值。实际传输的字节数会由方法返回,可能少于您请求的字节数。

对于传输数据来源是一个文件的 transferTo( )方法,如果 position + count 的值大于文件的 size 值,传输会在文件尾的位置终止。假如传输的目的地是一个非阻塞模式的 socket 通道,那么当发送队列(send queue) 满了之后传输就可能终止,并且如果输出队列(output queue)已满的话可能不会发送任何数据。类似地,对于 transferFrom( )方法:如果来源 src 是

另外一个 FileChannel并且已经到达文件尾,那么传输将提早终止;如果来源 src 是一个非阻塞 socket 通道,只有当前
处于队列中的数据才会被传输(可能没有数据)。由于网络数据传输的非确定性,阻塞模式的socket 也可能会执行部分传输,这取决于操作系统。许多通道实现都是提供它们当前队列中已有的数据而不是等待您请求的全部数据都准备好。

此外,请记住:如果传输过程中出现问题,这些方法也可能抛出 java.io.IOException 异常。Channel-to-channel 传输是可以极其快速的,特别是在底层操作系统提供本地支持的时候。某些操作系统可以不必通过用户空间传递数据而进行直接的数据传输。对于大量的数据传输,这会是一个巨大的帮助

public class ChannelTransfer {

    public static void main (String [] argv)
            throws Exception
    {
        catFiles (Channels.newChannel (System.out), new String[]{"test.txt"});
    }
    // Concatenate the content of each of the named files to
// the given channel. A very dumb version of 'cat'.
    private static void catFiles (WritableByteChannel target,
                                  String [] files)
            throws Exception
    {
        for (int i = 0; i < files.length; i++) {
            FileInputStream fis = new FileInputStream (files [i]);
            FileChannel channel = fis.getChannel( );
            channel.transferTo (0, channel.size( ), target);
            channel.close( );
            fis.close( );
        }
    }
}

相关文章

网友评论

    本文标题:「高并发通信框架Netty4 源码解读(五)」NIO通道Chan

    本文链接:https://www.haomeiwen.com/subject/ofeyfktx.html