深入理解Okio之旅

作者: 三好码农 | 来源:发表于2019-04-16 13:49 被阅读9次

    JDK的io库由于历史原因设计的比较复杂,有很多装饰类,使用起来需要记忆大量的类,相信你也对此早已诟病不满。Square公司推出的Okio应运而生,它原本是作为Okhttp的io功能库而设计的,也是因为Okhttp而被大家熟知。从知道到会使用,再到理解实现原理后熟练使用,甚至在此基础上二次开发优化,这个认知的过程需要刻意练习,这篇文章就是对Okio的一个总结,Okio虽然代码量不是很多, 但是里面值得学习的地方还是很多。

    Source + Sink

    简介

    Okio定义了自己的一套继承链,Source对应InputStream, Sink对应OutputStream,这样对比就不难理解了,看一下接口的定义

    public interface Source extends Closeable {
    
      long read(Buffer sink, long byteCount) throws IOException;
    
      Timeout timeout();
    
      @Override void close() throws IOException;
    }
    
    
    public interface Sink extends Closeable, Flushable {
    
      void write(Buffer source, long byteCount) throws IOException;
    
      @Override void flush() throws IOException;
    
      Timeout timeout();
    
      @Override void close() throws IOException;
    }
    

    接口定义的方法很简洁,
    read/write方法,读取和写入数据的接口方法,它们的第一个参数都是Buffer,关于这个类后面会详细介绍,这里我们暂且按照缓冲区理解它。byteCount就是读取或者写入的字节数。
    timeout方法,Okio新增的新特性,超时控制
    close方法,关闭输入输出流
    flush方法,将Buffer缓冲区中的数据写入目标流中。

    如何使用

    Okio已经帮我们定义了一个门面类,名字就叫Okio,通过它可以生成各种我们需要的对象。
    比如Okio.source(inputStream); 将inputStream包装成我们的Source对象,同样的
    Okio.sink(outputStream);将outputStream包装成Sink对象。

    所以Okio的底层操作的流对象还是Jdk里面定义的InputStream和OutputStream,作为一个轻量级的io框架它不可能跳出Jdk的框架去另外实现一套,它做的只是方便开发者的封装,但是它的封装设计足够优秀,这也是我还在这里跟你们吹牛x的原因,还是不想从大神那里学个一招半式。

    看个例子

            File file = new File("/Users/aliouswang/Documents/olympic/JavaArt//text.temp");
            Sink sink = Okio.sink(file);
    
            Buffer buffer = new Buffer();
            buffer.writeString("Hello okio!", Charset.forName("UTF-8"));
            buffer.writeInt(998);
            buffer.writeByte(1);
            buffer.writeLong(System.currentTimeMillis());
            buffer.writeUtf8("Hello end!");
    
            sink.write(buffer, buffer.size());
    
            sink.flush();
            sink.close();
    

    很简单的一个写文件的例子,前面说过Source和Sink的读和写的方法都需要一个Buffer对象,Buffer对象帮我们提供了类似BufferedInputStream和BufferedOutputStream的缓冲区功能(提高读写效率),同时还提供了DataInputStream和DataOutputStream中的大部分功能(比如写int,byte,long等),而且Buffer还提供了写String的方法,更是为我们经常使用的UTF-8编码格式,单独提供读写方法。

    有写就有读

            Source source = Okio.source(file);
            buffer.clear();
            source.read(buffer, 1024);
    
            String string = buffer.readString("Hello okio!".length(), Charset.forName("UTF-8"));
            int intValue = buffer.readInt();
            byte byteValue = buffer.readByte();
            long longValue = buffer.readLong();
            String utf8 = buffer.readUtf8();
    
            System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
                    "\nlong:" + longValue + "\nutf8:" + utf8);
    
            source.close();
    
        // 打印结果:
        str:Hello okio!;
        int:998;
        byte:1;
        long:1555325659665
        utf8:Hello end!
    

    但是每次都去new一个Buffer对象,是不是很麻烦,你我都能想到的,大神们肯定早就想到了,于是乎有了BufferedSink,BufferedSource。

    BufferedSource + BufferedSink

    BufferedSource 和 BufferedSink 也都是接口,里面定义的接口方法比较多,篇幅关系,这里只列出BufferedSink的定义,更细节的可以查看源码,源码中对很多方法的注释都举了例子来帮助我们理解,Okio的作者也是用心良苦,生怕我们广大的码农们看不懂,不会用啊!!!

    public interface BufferedSink extends Sink, WritableByteChannel {
      Buffer buffer();
      BufferedSink write(ByteString byteString) throws IOException;
      BufferedSink write(byte[] source) throws IOException;
      BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
      long writeAll(Source source) throws IOException;
      BufferedSink write(Source source, long byteCount) throws IOException;
      BufferedSink writeUtf8(String string) throws IOException;
      BufferedSink writeString(String string, Charset charset) throws IOException;
      BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
          throws IOException;
      BufferedSink writeByte(int b) throws IOException;
      BufferedSink writeShort(int s) throws IOException;
      BufferedSink writeShortLe(int s) throws IOException;
      BufferedSink writeInt(int i) throws IOException;
      BufferedSink writeIntLe(int i) throws IOException;
      BufferedSink writeLong(long v) throws IOException;
      BufferedSink writeLongLe(long v) throws IOException;
      BufferedSink writeDecimalLong(long v) throws IOException;
      BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
      @Override void flush() throws IOException;
      BufferedSink emit() throws IOException;
      BufferedSink emitCompleteSegments() throws IOException;
      OutputStream outputStream();
    }
    
    

    可以看到BufferedSink继承于Sink,同时还继承了WritableByteChannel,这个接口是nio接口,所以Okio同样实现了nio的相关功能,这里由于水平有限,关于nio的知识这篇文章不会涉及,有兴趣的同学可以自行查阅资料哦。

    BufferedSink定义了Buffer类中定义的全部方法,同时还定义了一个buffer()方法,返回一个Buffer对象,我们大概可以猜想到,这里应该是一个不太标准的代理模式,BufferedSink委托Buffer来干活。

    Okio同样提供了Buffer相关的方法方便我们使用。

      public static BufferedSink buffer(Sink sink) {
        return new RealBufferedSink(sink);
      }
    
      public static BufferedSource buffer(Source source) {
        return new RealBufferedSource(source);
      }
    

    返回的是BufferedSink 和 BufferedSource,Okio的默认实现类是RealBufferedSink和RealBufferedSource,我们可以通过BufferedSource和BufferedSink对上面读写文件的例子进行修改,

            File file = new File("/Users/aliouswang/Documents/java/JavaArt/text.temp");
            Sink sink = Okio.sink(file);
            BufferedSink bufferedSink = Okio.buffer(sink);
    
            bufferedSink.writeString("Hello okio!", Charset.forName("UTF-8"));
            bufferedSink.writeInt(998);
            bufferedSink.writeByte(1);
            bufferedSink.writeLong(System.currentTimeMillis());
            bufferedSink.writeUtf8("Hello end!");
    
            bufferedSink.close();
    
            Source source = Okio.source(file);
            BufferedSource bufferedSource = Okio.buffer(source);
    
            String string = bufferedSource.readString("Hello okio!".length(), Charset.forName("UTF-8"));
            int intValue = bufferedSource.readInt();
            byte byteValue = bufferedSource.readByte();
            long longValue = bufferedSource.readLong();
            String utf8 = bufferedSource.readUtf8();
    
            System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
                    "\nlong:" + longValue + "\nutf8:" + utf8);
    
            source.close();
    

    可以看到,BufferedSource和BufferedSink能够满足我们对io的日常绝大部分使用场景。

    Okio门面类的实现

    更一般的,我们会这样去写,链式调用,代码更简洁。

    BufferedSource bufferedSource = Okio.buffer(Okio.source(file));
    BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));
    

    非常简洁的就能生成BufferedSource和BufferedSink,看一下Okio帮我们做了什么。

      public static Source source(File file) throws FileNotFoundException {
        if (file == null) throw new IllegalArgumentException("file == null");
        return source(new FileInputStream(file));
      }
    
      public static Source source(InputStream in) {
        // 生成一个默认的Timeout超时对象,默认实现是没有超时deadtime的
        return source(in, new Timeout());
      }
    
      private static Source source(final InputStream in, final Timeout timeout) {
        if (in == null) throw new IllegalArgumentException("in == null");
        if (timeout == null) throw new IllegalArgumentException("timeout == null");
    
        return new Source() {
          @Override public long read(Buffer sink, long byteCount) throws IOException {
            if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
            if (byteCount == 0) return 0;
            try {
              // 检查超时,
              timeout.throwIfReached();
              // 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
              Segment tail = sink.writableSegment(1);
              int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
              // 将最大能copy的字节写入Buffer,
              int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
              if (bytesRead == -1) return -1;
              tail.limit += bytesRead;
              sink.size += bytesRead;
              return bytesRead;
            } catch (AssertionError e) {
              if (isAndroidGetsocknameError(e)) throw new IOException(e);
              throw e;
            }
          }
    
          @Override public void close() throws IOException {
            in.close();
          }
    
          @Override public Timeout timeout() {
            return timeout;
          }
    
          @Override public String toString() {
            return "source(" + in + ")";
          }
        };
      }
    

    通过Okio.source的实现可以看到,在读取的时候,会从传入的InputStream in 对象中读取字节到Buffer sink中,前面我们提到过,RealBufferedSource和RealBufferedSink内部都持有一个Buffer对象,可以猜测,它们持有的buffer对象 会在读写的时候传入。我们进入源码验证一下, 这里我们以readString 方法为例。

      @Override public String readString(long byteCount, Charset charset) throws IOException {
        require(byteCount);
        if (charset == null) throw new IllegalArgumentException("charset == null");
        return buffer.readString(byteCount, charset);
      }
    
      @Override public void require(long byteCount) throws IOException {
        if (!request(byteCount)) throw new EOFException();
      }
    
      @Override public boolean request(long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (closed) throw new IllegalStateException("closed");
        while (buffer.size < byteCount) {
          if (source.read(buffer, Segment.SIZE) == -1) return false;
        }
        return true;
      }
    
      @Override public String readString(long byteCount, Charset charset) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount);
        if (charset == null) throw new IllegalArgumentException("charset == null");
        if (byteCount > Integer.MAX_VALUE) {
          throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
        }
        if (byteCount == 0) return "";
    
        Segment s = head;
        if (s.pos + byteCount > s.limit) {
          // If the string spans multiple segments, delegate to readBytes().
          return new String(readByteArray(byteCount), charset);
        }
    
        String result = new String(s.data, s.pos, (int) byteCount, charset);
        s.pos += byteCount;
        size -= byteCount;
    
        if (s.pos == s.limit) {
          head = s.pop();
          SegmentPool.recycle(s);
        }
    
        return result;
      }
    

    代码比较清晰,先从source中读取要求的bytecount长度的String到buffer中,然后从buffer中读取String 返回。其他的读取方法跟readString大同小异,有兴趣同学可以自行查阅源码。

    说了这么久,我们的主角Buffer对象登场了。

    Buffer

    看一下Buffer类的申明,实现了BufferedSource, BufferedSink, Cloneable, ByteChannel 四个接口。

    public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {...}
    

    我们知道Buffer作为缓冲区,肯定底层需要有数据结构来存储暂存的数据,JDK的BuffedInputStream和BufferedOutputStream中是使用字节数组的,而这里Okio的Buffer不是,它使用的是Segment。

    public Segment head;
    
    Segment

    Segment 是一个双向循环链表,它的内部持有一个byte[] data,默认大小8192(与JDK的BufferedInputStream相同)。

    public final class Segment {
      /** The size of all segments in bytes. */
      static final int SIZE = 8192;
    
      /** 默认共享最小字节数*/
      static final int SHARE_MINIMUM = 1024;
    
      final byte[] data;
    
      /** 标识下一个读取字节的位置 */
      int pos;
    
      /** 标识下一个写入字节的位置 */
      int limit;
    
      /** 是否与其他Segment共享byte[] */
      boolean shared;
    
      /** 是否拥有这个byte[], 如果拥有可以写入 */
      boolean owner;
    
      /** Segment后继 */
      public Segment next;
    
      /** Segment前驱 */
      Segment prev;
      Segment() {
        this.data = new byte[SIZE];
        this.owner = true;
        this.shared = false;
      }
    
      ......
    }
    

    Sement关键的成员变量都加了注释,Okio为了优化性能,避免频繁的创建和回收对象,使用了对象池模式,设计了SegmentPool类来管理Segment。

    SegemntPool
    final class SegmentPool {
      /** The maximum number of bytes to pool. */
      // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
      static final long MAX_SIZE = 64 * 1024; // 64 KiB.
    
      /** Singly-linked list of segments. */
      static @Nullable Segment next;
    
      /** Total bytes in this pool. */
      static long byteCount;
    
      private SegmentPool() {
      }
    
      static Segment take() {
        synchronized (SegmentPool.class) {
          if (next != null) {
            Segment result = next;
            next = result.next;
            result.next = null;
            byteCount -= Segment.SIZE;
            return result;
          }
        }
        return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
      }
    
      static void recycle(Segment segment) {
        if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
        if (segment.shared) return; // This segment cannot be recycled.
        synchronized (SegmentPool.class) {
          if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
          byteCount += Segment.SIZE;
          segment.next = next;
          segment.pos = segment.limit = 0;
          next = segment;
        }
      }
    }
    

    SegmentPool代码很简洁,它的最大容量是8个Segment,如果超过调用take方法就会直接新建一个Segment对象,另外recycle回收方法负责回收闲置的Segment,将其加入链表,供其他buffer使用。

    有了Segment和SegmentPool的知识,就更容易理解Buffer类的实现了。
    比如Okio.source方法新建的Source对象的read方法,获取可以写入的Segment对象,便利Segment链表获取可以写入的Segment,如果head为null则新建一个Segment。

        // 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
      Segment tail = sink.writableSegment(1);
    
      Segment writableSegment(int minimumCapacity) {
        if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    
        if (head == null) {
          head = SegmentPool.take(); // Acquire a first segment.
          return head.next = head.prev = head;
        }
    
        Segment tail = head.prev;
        if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
          tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
        }
        return tail;
      }
    

    Buffer的其他方法就不一一分析了,有了我们前面的知识,相信看起来不会太难。接下来我们看一下Okio的另一个类ByteString。

    ByteString

    我们知道String是的内部是基于char[] 数组来实现的,Okio的ByteString内部是基于byte[] 数组来实现的。跟String类似,ByteString也被设计为不可变的,这样可以保证ByteString是线程安全的。

    public class ByteString implements Serializable, Comparable<ByteString> {
      final byte[] data;
      ByteString(byte[] data) {
        this.data = data; // Trusted internal constructor doesn't clone data.
      }
       ......
    }
    

    同时ByteString提供了很多方便的工具方法,比如base64,sha1加密等。

      public String base64() {
        return Base64.encode(data);
      }
    
      /** Returns the 128-bit MD5 hash of this byte string. */
      public ByteString md5() {
        return digest("MD5");
      }
    
      /** Returns the 160-bit SHA-1 hash of this byte string. */
      public ByteString sha1() {
        return digest("SHA-1");
      }
    
      /** Returns the 256-bit SHA-256 hash of this byte string. */
      public ByteString sha256() {
        return digest("SHA-256");
      }
    
      /** Returns the 512-bit SHA-512 hash of this byte string. */
      public ByteString sha512() {
        return digest("SHA-512");
      }
    

    同时ByteString也提供了静态方法,方便与String类型互转。

      /** Returns a new byte string containing the {@code UTF-8} bytes of {@code s}. */
      public static ByteString encodeUtf8(String s) {
        if (s == null) throw new IllegalArgumentException("s == null");
        ByteString byteString = new ByteString(s.getBytes(Util.UTF_8));
        byteString.utf8 = s;
        return byteString;
      }
    
      /** Returns a new byte string containing the {@code charset}-encoded bytes of {@code s}. */
      public static ByteString encodeString(String s, Charset charset) {
        if (s == null) throw new IllegalArgumentException("s == null");
        if (charset == null) throw new IllegalArgumentException("charset == null");
        return new ByteString(s.getBytes(charset));
      }
    
      /** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
      public String utf8() {
        String result = utf8;
        // We don't care if we double-allocate in racy code.
        return result != null ? result : (utf8 = new String(data, Util.UTF_8));
      }
    
      /** Constructs a new {@code String} by decoding the bytes using {@code charset}. */
      public String string(Charset charset) {
        if (charset == null) throw new IllegalArgumentException("charset == null");
        return new String(data, charset);
      }
    

    最后

    Okio并不是设计来代替Jdk io的,但是在某些重度io的场景,如果对性能优化追求极致的话,Okio不失是一种选择,关于Okio还有很多细节的知识由于篇幅关系没有涉及,有兴趣的同学可以去看源码中找答案,全文完。

    相关文章

      网友评论

        本文标题:深入理解Okio之旅

        本文链接:https://www.haomeiwen.com/subject/waokwqtx.html