美文网首页
【JAVA IO】 JAVA Classic IO 源码浅析

【JAVA IO】 JAVA Classic IO 源码浅析

作者: shalk | 来源:发表于2018-06-14 17:01 被阅读0次

    java.io 包含java经典的io api,从1.0引入,1.1时做了字符流方面的补充(reader writer)

    主要是四部分,

    1. 文件系统的相关操作
    2. 文件内容的随机读写
    3. 基于字节流的读写
    4. 基于字符流的读写

    文件系统

    主要看一下 File, 用文件或者目录的路径初始化一个对象,并且提供了对文件的一些操作,例如是否可读,是文件还是目录,当前目录下的文件,父级目录等等。这些操作委托给了FileSystem来实现。

    看构造函数和部分实现:

    
    private static final FileSystem fs = DefaultFileSystem.getFileSystem();
    
    private final String path;
    
    private final transient int prefixLength;
       
        //构造函数
        public File(String pathname) {
            if (pathname == null) {
                throw new NullPointerException();
            }
            this.path = fs.normalize(pathname);
            this.prefixLength = fs.prefixLength(this.path);
    }
    // 文件大小
        public long length() {
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkRead(path);
            }
            if (isInvalid()) {
                return 0L;
            }
            return fs.getLength(this);
        }
      // 文件是否存在
         public boolean exists() {
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkRead(path);
            }
            if (isInvalid()) {
                return false;
            }
            return ((fs.getBooleanAttributes(this) & FileSystem.BA_EXISTS) != 0);
        }
    

    由于Java的跨平台特性,FileSystem在windows和linux下的实现都不相同。

    字节流的读写

    抽象类是InputStream 和 outputstream,具体实现可以先从FileInputStream 和OutputStream阅读。

    InputStream

    java.io.InputStream#read()
    java.io.InputStream#read(byte[])
    java.io.InputStream#read(byte[], int, int)
    
    java.io.InputStream#skip
    
    java.io.InputStream#available
    java.io.InputStream#close
    
    java.io.InputStream#mark(int readLimit)
    java.io.InputStream#reset
    java.io.InputStream#markSupported
    

    read 是 抽象方法,后面两个read都是基于前面那个read实现的,skip是跳过一些字节

    mark相当于书签,reset会回到书签的位置,readLimit会设置读了多少个byte之后书签可能失效,markSupported代表是否支持这个功能,FileInputStream是不支持的,BufferedInputStream是支持的

    OutputStream如下:

    java.io.OutputStream#write(int)
    java.io.OutputStream#write(byte[])
    java.io.OutputStream#write(byte[], int, int)
    java.io.OutputStream#flush
    java.io.OutputStream#close
    

    flush 操作是如果有一些output stream有 buffer,那么buffer会写到目标位置,例如 目标是文件,就把缓存写到文件。不过这个不保证写到物理盘上,只保证写给操作系统。

    FileInputStream ,FileOutputStream 底层都采用native方法来实现。

    ByteArrayInputStream 底层是对byte数组进行读操作,支持书签,由于byteArray采用的数组对象,一般就是利用内存进行读操作。 可以理解为适配器模式。

    ByteArrayOutputStream 类似

    PipeInputStream 和PipeOutStream配套使用,PipeInput的数据来源必须是PipeOut,而且建议在多线程环境下,分别给两个线程使用,单个线程使用容易产生死锁。两个类都是线程安全的。

    PipeOutStream 只有一个域,PipedInputStream sink

        public synchronized void connect(PipedInputStream snk) throws IOException {
            if (snk == null) {
                throw new NullPointerException();
            } else if (sink != null || snk.connected) {
                throw new IOException("Already connected");
            }
            sink = snk;
            snk.in = -1;
            snk.out = 0;
            snk.connected = true;
        }
        
        public void write(int b)  throws IOException {
            if (sink == null) {
                throw new IOException("Pipe not connected");
            }
            sink.receive(b);
        }
    

    因此PipeOut是使用connect和PipeIn 建立简介,并且把PipeInput的标志位 设置为连接了。在write的时候也是调用sink的receive方法,PipedInputStream 的方法就比较多。

    
    java.io.PipedInputStream#PipedInputStream(java.io.PipedOutputStream)
    java.io.PipedInputStream#PipedInputStream(java.io.PipedOutputStream, int)
    java.io.PipedInputStream#PipedInputStream()
    java.io.PipedInputStream#PipedInputStream(int)
    
    // 初始化buff
    java.io.PipedInputStream#initPipe
    java.io.PipedInputStream#connect
    java.io.PipedInputStream#receive(int)
    java.io.PipedInputStream#receive(byte[], int, int)
    java.io.PipedInputStream#checkStateForReceive
    java.io.PipedInputStream#awaitSpace
    java.io.PipedInputStream#receivedLast
    java.io.PipedInputStream#read()
    java.io.PipedInputStream#read(byte[], int, int)
    java.io.PipedInputStream#available
    java.io.PipedInputStream#close
    
    java.io.PipedInputStream#closedByWriter
    java.io.PipedInputStream#closedByReader
    java.io.PipedInputStream#connected
    java.io.PipedInputStream#readSide
    java.io.PipedInputStream#writeSide
    java.io.PipedInputStream#DEFAULT_PIPE_SIZE
    java.io.PipedInputStream#PIPE_SIZE
    java.io.PipedInputStream#buffer
    java.io.PipedInputStream#in
    java.io.PipedInputStream#out
    

    数据都保存在 protected byte buffer[]; 这个循环buff内,而in 和out分别表示 写入和读取的索引位置, 相当于循环队列的头尾指针,当in小于0,是空buff, in等于out是满的buff。

      protected synchronized void receive(int b) throws IOException {
            checkStateForReceive();
            writeSide = Thread.currentThread();
            if (in == out)
                awaitSpace();
            if (in < 0) {
                in = 0;
                out = 0;
            }
            buffer[in++] = (byte)(b & 0xFF);
            if (in >= buffer.length) {
                in = 0;
            }
        }
          private void checkStateForReceive() throws IOException {
            if (!connected) {
                throw new IOException("Pipe not connected");
            } else if (closedByWriter || closedByReader) {
                throw new IOException("Pipe closed");
            } else if (readSide != null && !readSide.isAlive()) {
                throw new IOException("Read end dead");
            }
        }
    

    最终是把数据写到buffer 中,可以看到在写入前,要进行状态检查和容量检查,写入之后,如果in 越界,就绕回到 0,可能发现,如果写入太快,读很慢,buff是会被覆盖掉的,其实满的时候写线程会阻塞在awaitSpace,而且两个类都是全局加锁,所以读写之间是互相阻塞的。检查线程状态也非常简单,就是如果有方法调用recieve相关的方法,就把该线程记为writeSide,如果有方法调用读read,就记录为readSide。 并且记录 两个piped是否关闭。 另外看一下满的时候写者如何等待:

        private void awaitSpace() throws IOException {
            while (in == out) {
                checkStateForReceive();
    
                /* full: kick any waiting readers */
                notifyAll();
                try {
                    wait(1000);
                } catch (InterruptedException ex) {
                    throw new java.io.InterruptedIOException();
                }
            }
        }
    

    经典的把wait套在while循环里,当等待超时,就会判断是否有空间。

    syncronized(obj) {
      while(条件){
        obj.wait();
      }
    }
    
    
    其他方法(){
        obj.notifyAll();
    }
    

    并且读的时候,如果没有数据也会发生等待:

    public synchronized int read()  throws IOException {
            if (!connected) {
                throw new IOException("Pipe not connected");
            } else if (closedByReader) {
                throw new IOException("Pipe closed");
            } else if (writeSide != null && !writeSide.isAlive()
                       && !closedByWriter && (in < 0)) {
                throw new IOException("Write end dead");
            }
    
            readSide = Thread.currentThread();
            int trials = 2;
            while (in < 0) {
                if (closedByWriter) {
                    /* closed by writer, return EOF */
                    return -1;
                }
                if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                    throw new IOException("Pipe broken");
                }
                /* might be a writer waiting */
                notifyAll();
                try {
                    wait(1000);
                } catch (InterruptedException ex) {
                    throw new java.io.InterruptedIOException();
                }
            }
            int ret = buffer[out++] & 0xFF;
            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
    
            return ret;
        }
    

    并且,两个操作都会在循环里判断,是不是读写已经关闭,从而抛出异常,停止等待。

    再看一下SequenceInputStream是多个流合并进行读取, 轮询读取,不常用。

       public int read() throws IOException {
            while (in != null) {
                int c = in.read();
                if (c != -1) {
                    return c;
                }
                nextStream();
            }
            return -1;
        }
            final void nextStream() throws IOException {
            if (in != null) {
                in.close();
            }
    
            if (e.hasMoreElements()) {
                in = (InputStream) e.nextElement();
                if (in == null)
                    throw new NullPointerException();
            }
            else in = null;
    
        }
    

    filterInputStream 和FilterOutputStream,这两个是几个装饰器类的基类,如BufferedInputStream, DataInputStream,PushbackInputStream ,BufferdOutputStream. DataOutputStream. PrintStream,可以继承或者组合,参数自己的类。

    先看BufferedInputStream 和BufferdOutputStream,由于底层文件或者socket 可能存在堵塞延迟,写的时候可以先把数据写到缓存里,再从缓存把数据写入到目标,读取的时候可以先从缓存读取,再从目标取数据到缓存。 即在io和目标之间加了缓存。先看读

        public synchronized int read() throws IOException {
            if (pos >= count) {
                fill();
                if (pos >= count)
                    return -1;
            }
            return getBufIfOpen()[pos++] & 0xff;
        }
    

    pos 记录的是读缓存的位置,如果没有缓存可读,需要fill 装填, 如果可以读缓存,就返回缓存的结果。

    我们看一下装填部分,状态部分如下:

     private static final
            AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (BufferedInputStream.class,  byte[].class, "buf"); 
            
     private void fill() throws IOException {
            byte[] buffer = getBufIfOpen();
            if (markpos < 0)
                pos = 0;            /* no mark: throw away the buffer */
            else if (pos >= buffer.length)  /* no room left in buffer */
                if (markpos > 0) {  /* can throw away early part of the buffer */
                    int sz = pos - markpos;
                    System.arraycopy(buffer, markpos, buffer, 0, sz);
                    pos = sz;
                    markpos = 0;
                } else if (buffer.length >= marklimit) {
                    markpos = -1;   /* buffer got too big, invalidate mark */
                    pos = 0;        /* drop buffer contents */
                } else if (buffer.length >= MAX_BUFFER_SIZE) {
                    throw new OutOfMemoryError("Required array size too large");
                } else {            /* grow buffer */
                    int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
                            pos * 2 : MAX_BUFFER_SIZE;
                    if (nsz > marklimit)
                        nsz = marklimit;
                    byte nbuf[] = new byte[nsz];
                    System.arraycopy(buffer, 0, nbuf, 0, pos);
                    if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                        // Can't replace buf if there was an async close.
                        // Note: This would need to be changed if fill()
                        // is ever made accessible to multiple threads.
                        // But for now, the only way CAS can fail is via close.
                        // assert buf == null;
                        throw new IOException("Stream closed");
                    }
                    buffer = nbuf;
                }
            count = pos;
            int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
            if (n > 0)
                count = n + pos;
        }
    

    在读取之前,判断是否存在mark,没有mark就直接调用被装饰的inputstream的read方法,如果存在mark,则判断是不是失效,保留mark之后的数据,而且如果要mark的数据满了,就会进行扩容操作,并替换掉buff这个域,这个使用cas原子操作,保证数据符合预期。

    很奇怪的是BufferedInputStream几乎所有操作都加了锁,buff封闭在对象内,为啥要加原子操作呢,因为close方法会对buff进行操作,而且不加锁,如果在拷贝的过程中,buff成为null,而此处又把buff改成nbuff,close就会失败。 close 会自旋重试,而fill遇到这种情况会抛出异常。

    再看BufferedOutputStream

        public synchronized void write(int b) throws IOException {
            if (count >= buf.length) {
                flushBuffer();
            }
            buf[count++] = (byte)b;
        }
        
        /** Flush the internal buffer */
        private void flushBuffer() throws IOException {
            if (count > 0) {
                out.write(buf, 0, count);
                count = 0;
            }
        }
        
        public synchronized void flush() throws IOException {
            flushBuffer();
            out.flush();
        }
    

    写入时,如果buff满了,就把buffer先写入,再进行操作,flush的时候也会把buff先写入,再进行flush操作。

    DataInputStream 和DataOutputStream主要就是 针对基本类型进行的读写操作,扩展了一些api,因为之前read和write只能围绕byte进行操作,如果要写入一个long ,float ,double 都要自己封装,而Data的这两个类,进行这些封装。 还设计了相应的接口,DataInput,DataOutput

    java.io.DataInputStream#DataInputStream
    java.io.DataInputStream#read(byte[])
    java.io.DataInputStream#read(byte[], int, int)
    java.io.DataInputStream#readFully(byte[])
    java.io.DataInputStream#readFully(byte[], int, int)
    java.io.DataInputStream#skipBytes
    java.io.DataInputStream#readBoolean
    java.io.DataInputStream#readByte
    java.io.DataInputStream#readUnsignedByte
    java.io.DataInputStream#readShort
    java.io.DataInputStream#readUnsignedShort
    java.io.DataInputStream#readChar
    java.io.DataInputStream#readInt
    java.io.DataInputStream#readLong
    java.io.DataInputStream#readFloat
    java.io.DataInputStream#readDouble
    java.io.DataInputStream#readLine
    java.io.DataInputStream#readUTF()
    java.io.DataInputStream#readUTF(java.io.DataInput)
    java.io.DataInputStream#bytearr
    java.io.DataInputStream#chararr
    java.io.DataInputStream#readBuffer
    java.io.DataInputStream#lineBuffer
    

    在个部分唯一复杂的地方就是 readUTF 对应的DataOutputStream里面的writeUTF。

    可以见文档文档2,readUTF是按照UTF-8编码的形式 读取byte数组,按照UTF-8进行解码,最后保存到String中,而writeUTF8是编码的过程将String 编码成byte数组。String内部是char[];

    因此,这里是 char[] 与byte[] 之间的编解码。这个编码在UTF-8的基础上稍作调整。一个char可能编码成1,2,3个字符,解码时可以获得字符数量。

    PushbackInputStream 可以读取一些数据之后,又把数据unread,放回stream里,这个steam是不支持mark和reset的。实现上采用buff的形式,读取一些数据,unread的时候,修改buff的pos,下次读取可能会从buff里先读取。

       public int read() throws IOException {
            ensureOpen();
            if (pos < buf.length) {
                return buf[pos++] & 0xff;
            }
            return super.read();
        }
        
        
        public void unread(int b) throws IOException {
            ensureOpen();
            if (pos == 0) {
                throw new IOException("Push back buffer is full");
            }
            buf[--pos] = (byte)b;
        }
    

    PrinterStream 提供了许多方便的打印操作, 将不同数据,转换成字符串, 调用BufferedWriter 的写方法,进行字符流的写操作。另外有两个功能,一 写的时候设置是否有错误setError ,二,允许开启自动flush,例如,出现换行回车的时候, 会一起写入,最后显示效果比较流畅。

    
        private PrintStream(boolean autoFlush, OutputStream out) {
            super(out);
            this.autoFlush = autoFlush;
            this.charOut = new OutputStreamWriter(this);
            this.textOut = new BufferedWriter(charOut);
        }
    
     private void write(String s) {
            try {
                synchronized (this) {
                    ensureOpen();
                    textOut.write(s);
                    textOut.flushBuffer();
                    charOut.flushBuffer();
                    if (autoFlush && (s.indexOf('\n') >= 0))
                        out.flush();
                }
            }
            catch (InterruptedIOException x) {
                Thread.currentThread().interrupt();
            }
            catch (IOException x) {
                trouble = true;
            }
        }
    

    可以看到,写操作都是委托给BufferedWriter,中间使用适配器进行转换。

    字符流的读写

    前面的Stream类的读写单元都是围绕着byte或者byte[] 数组进行操作,要进行基本类型或者字符串的操作,要使用装饰器类。很多情况下,读写是围绕着字符串进行操作,因此基于字节流的操作非常不方便。

    相应的JDK1.1给出了一套围绕着char或者char[] 数组进行操作的类。

    Reader:BufferedReader, CharArrayReader, FilterReader, InputStreamReader, PipedReader, StringReader

    Writer: BufferedWriter, CharArrayWriter, FilterWriter, OutputStreamWriter, PipedWriter, PrintWriter, StringWriter

    字符流的类架构和直接类几乎相同,继承和装饰器模式,名称和对应的InputStream,OutputStream也一致。

    先看基类Reader和Writer, Reader 把实例本身设置了一个域Object lock作为同步锁,这样有一个好处是子类也可以使用这个域作为同步锁,而不用分别使用this。

    不过Writer顺便实现了Appendable接口,这样就可以流式append操作。

    CharArrayReader,CharArrayWriter 内存读写操作, 都是直接放入char数组

        public int read() throws IOException {
            synchronized (lock) {
                ensureOpen();
                if (pos >= count)
                    return -1;
                else
                    return buf[pos++];
            }
        }
        public void write(int c) {
            synchronized (lock) {
                int newcount = count + 1;
                if (newcount > buf.length) {
                    buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
                }
                buf[count] = (char)c;
                count = newcount;
            }
        }
        
    

    再看BufferedWriter和BufferedReader 这个地方没有继承自FilterWriter,FilterReader。

    采用的是数组做缓存,如果换成为空就进行装填。

    再看FileWriter和FileReader, FileWriter 直接继承了OutputStreamWriter,参数都是构造一个FileOutputStream并传递给OutputStreamWriter的构造函数。FileReader 也是构造FileOutPutStream给OutputStreamWriter使用。

    public class FileWriter extends OutputStreamWriter {
    
        public FileWriter(String fileName) throws IOException {
            super(new FileOutputStream(fileName));
        }
     }
    

    相当于简化了下面的写法:

    Writer writer = new OutputStreamWriter(new FileOutPutStream(fileName));
    Reader reader = new InputStreamReader(new FileInputStream(fileName));
    =》
    Writer writer = new FileWriter(fileName);
    Reader reader = new FileReader(fileName);
    

    PipedWriter 和PipedReader 也是成对使用,实现方式和PipedInputStream,PipedOutputStream类似

    看一下适配器的实现,InputStreamReader 和 OutputStreamWriter

    //InputStreamReader.java
    private final StreamDecoder sd;
      public int read() throws IOException {
          return sd.read();
      }
    
    //OutputStreamWriter
        private final StreamEncoder se;
          public void write(int c) throws IOException {
            se.write(c);
        }    
    

    具体的操作都委托给了StreamDecoder和StreamEncoder 对底层的字节流进行读写转换成字符流

    文件随机读写

    当使用FileInputStream或者FileOutPutStream,或者相应的Reader,Writer只能按照顺序进行读。实际上大部分操作系统,底层有接口支持读取文件的一部分,或者从文件的某个字节位置开始读写,因此可以和使用RandomAccessFile

    部分接口:
    java.io.RandomAccessFile#RandomAccessFile(java.lang.String, java.lang.String)
    java.io.RandomAccessFile#RandomAccessFile(java.io.File, java.lang.String)
    java.io.RandomAccessFile#getFD
    java.io.RandomAccessFile#getChannel
    java.io.RandomAccessFile#open0
    java.io.RandomAccessFile#open
    java.io.RandomAccessFile#read()
    java.io.RandomAccessFile#read0
    java.io.RandomAccessFile#readBytes
    java.io.RandomAccessFile#read(byte[], int, int)
    java.io.RandomAccessFile#read(byte[])
    java.io.RandomAccessFile#getFilePointer
    java.io.RandomAccessFile#seek
    java.io.RandomAccessFile#seek0
    java.io.RandomAccessFile#length
    java.io.RandomAccessFile#setLength
    java.io.RandomAccessFile#close
    

    另外为了读写方便,还实现了DataInput和DataOutput接口,关键方法采用了native实现,即不同平台的实现不同。

    END

    相关文章

      网友评论

          本文标题:【JAVA IO】 JAVA Classic IO 源码浅析

          本文链接:https://www.haomeiwen.com/subject/sowyeftx.html