美文网首页源码解析
Okio源码分析与性能优化思想

Okio源码分析与性能优化思想

作者: 龙儿筝 | 来源:发表于2018-02-24 13:31 被阅读24次

    Okio的优势

    private static final ByteString PNG_HEADER = ByteString.decodeHex("89504e470d0a1a0a");
    
    public void decodePng(InputStream in) throws IOException {
      BufferedSource pngSource = Okio.buffer(Okio.source(in));
    
      ByteString header = pngSource.readByteString(PNG_HEADER.size());
      if (!header.equals(PNG_HEADER)) {
        throw new IOException("Not a PNG.");
      }
    }
    

    这个是Okio官方提供了一个png图片的解码的例子,我们知道一般判断一个文件的格式就是依靠前面的校验码,比如class文件中前面的16进制代码就是以 cafebabe 开头,同样的常规的png,jpg,gif之类的都可以通过前面的魔数来进行判断文件类型,这里就以一个图片输入流转换成一个BufferedSource,并且通过 readByteString 方法拿到一个字节串 ByteString 这样就能验证这个文件是不是一个png的图片,同样的方法也能用在其他文件的校验上。
    Okio除了这些外还有很多额外的功能,而且官方也提供了许多包括对于zip文件的处理,各种MD5,SHA-1.SHA256,Base64之类编码的处理,如果需要额外的一些操作,也可以自己实现Sink,Source对应的方法。

    读写文件

    Okio.buffer(Okio.sink(file)).writeUtf8("龙儿筝").flush();
    String txt = Okio.buffer(Okio.source(file)).readUtf8();
    

    我们在创建BufferSink和BufferSource时,先得创建Sink和Source。Sink代表输出流,Source代表输入流,所有只用一个来进行分析。
    我们先看Sink代码,Sink是一个接口

    public interface Sink extends Closeable, Flushable {
    
      void write(Buffer source, long byteCount) throws IOException;
    
      @Override void flush() throws IOException;
    
      Timeout timeout();
    
      @Override void close() throws IOException;
    }
    

    Sink只包含一些最简单的方法,以及一个Timeout超时,找到Sink的子类BufferedSink,发现也是一个接口,里面提供了更多的方法,再找到BufferedSick的子类Buffer和RealBufferedSink。

    public static BufferedSink buffer(Sink sink) {
        return new RealBufferedSink(sink);
      }
    

    由buffer方法可知真正创建的是RealBufferedSink。RealBufferedSink类有两个属性buffer和sink以及一些方法。

    @Override public BufferedSink writeUtf8(String string, int beginIndex, int endIndex)
          throws IOException {
        if (closed) throw new IllegalStateException("closed");
        buffer.writeUtf8(string, beginIndex, endIndex);
        return emitCompleteSegments();
      }
    

    进入writeUtf8方法可知,虽然RealBufferedSink是BufferedSink的真正实现,但写数据代理交给buffer属性来完成。buffer并不是直接将数据写入文件中,而是调用flush方法才写入文件中。

    @Override public void flush() throws IOException {
        if (closed) throw new IllegalStateException("closed");
        if (buffer.size > 0) {
          sink.write(buffer, buffer.size);
        }
        sink.flush();
      }
    

    flush方法中是由sink来完成的。由前面创建BufferedSink可知sink是Okio的sink方法创建的

    Okio中的超时机制

    Okio的超时机制让io不会因为异常阻塞在某个未知的错误上,Okio的基础超时机制是采用同步超时

    private static Sink sink(final OutputStream out, final Timeout timeout) {
        if (out == null) throw new IllegalArgumentException("out == null");
        if (timeout == null) throw new IllegalArgumentException("timeout == null");
    
        return new Sink() {
          @Override public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);
            while (byteCount > 0) {
              timeout.throwIfReached();
              Segment head = source.head;
              int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
              out.write(head.data, head.pos, toCopy);
    
              head.pos += toCopy;
              byteCount -= toCopy;
              source.size -= toCopy;
    
              if (head.pos == head.limit) {
                source.head = head.pop();
                SegmentPool.recycle(head);
              }
            }
          }
    
          @Override public void flush() throws IOException {
            out.flush();
          }
    
          @Override public void close() throws IOException {
            out.close();
          }
    
          @Override public Timeout timeout() {
            return timeout;
          }
    
          @Override public String toString() {
            return "sink(" + out + ")";
          }
        };
      }
    

    这里创建Sink时传入了OutputStream和Timeout,可以看到write方法中实际上有一个while循环,在每个开始写的时候就调用了timeout.throwIfReached()方法,这个方法里面去判断时间是否超时,按序执行,同样的Source也是一样的操作。
    当我们创建Sink时传入的是一个Socket对象时,会发现这里使用的是异步超时机制

    public static Sink sink(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        AsyncTimeout timeout = timeout(socket);
        Sink sink = sink(socket.getOutputStream(), timeout);
        return timeout.sink(sink);
      }
    

    AsyncTimeout继承于TimeOut使用WatchDog机制来实现异步超时

    Segment和SegmentPool解析

    前面在while循环中写数据时会先创建一个Segment。Okio将数据分割成一块块的片段,同时segment拥有前置节点和后置节点,构成一个双向循环链表。这样采取分片使用链表,片中使用数组存储,兼具读的连续性和写的可插入性,是一种折中的方案,读写更快,而且可以根据需求改动分片的大小来权衡读写的业务操作。另外,segment也有一些内置的优化操作,综合这些Okio才能大话异彩。

    Segment属性分析

    final class Segment {
      static final int SIZE = 8192;
    
      static final int SHARE_MINIMUM = 1024;
    
      final byte[] data;
    
      int pos;
    
      int limit;
    
      boolean shared;
    
      boolean owner;
    
      Segment next;
    
      Segment prev;
    
      ...
      
    }
    

    SIZE就是一个segment的最大字节数,其中还有一个SHARE_MINIMUM,这个涉及到segment优化中的另一个技巧,共享内存,然后data就是保存的字节数组,pos,limit就是开始和结束点的index,shared和owner用来设置状态判断是否可写,一个有共享内存的segment是不能写入的,pre,next就是前置后置节点。

    Segment方法分析

    public void writeTo(Segment sink, int byteCount) {
        if (!sink.owner) throw new IllegalArgumentException();
        if (sink.limit + byteCount > SIZE) {
          // We can't fit byteCount bytes at the sink's current position. Shift sink first.
          if (sink.shared) throw new IllegalArgumentException();
          if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
          System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
          sink.limit -= sink.pos;
          sink.pos = 0;
        }
    
        System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
        sink.limit += byteCount;
        pos += byteCount;
      }
    

    owner和Shared这两个状态目前看来是完全相反的,赋值都是同步赋值的,这里有点不明白存在两个参数的意义,现在的功能主要是用来判断如果是共享就无法写,以免污染数据,会抛出异常。当然,如果要写的字节大小加上原来的字节数大于单个segment的最大值也是会抛出异常,也存在一种情况就是虽然尾节点索引和写入字节大小加起来超过,但是由于前面的pos索引可能因为read方法取出数据,pos索引后移这样导致可以容纳数据,这时就先执行移动操作,使用系统的 System.arraycopy 方法来移动到pos为0的状态,更改pos和limit索引后再在尾部写入byteCount数的数据,写完之后实际上原segment读了byteCount的数据,所以pos需要后移这么多。过程十分的清晰,比较好理解。
    除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据千疮百孔,可能十分短的数据却占据了一整个segment,所以有了一个压缩机制

    public void compact() {
        if (prev == this) throw new IllegalStateException();
        if (!prev.owner) return; // Cannot compact: prev isn't writable.
        int byteCount = limit - pos;
        int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
        if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
        writeTo(prev, byteCount);
        pop();
        SegmentPool.recycle(this);
      }
    

    照例如果前面是共享的那么不可写,也就不能压缩了,然后判断前一个的剩余大小是否比当前的大,有足够的空间来容纳数据,调用前面的 writeTo 方法来写数据,写完后移除当前segment,然后通过 SegmentPool 来回收。
    另一个技巧就是共享机制,为了减少数据复制带来的性能开销,segment存在一个共享机制

    public Segment split(int byteCount) {
        if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
        Segment prefix;
    
        // We have two competing performance goals:
        //  - Avoid copying data. We accomplish this by sharing segments.
        //  - Avoid short shared segments. These are bad for performance because they are readonly and
        //    may lead to long chains of short segments.
        // To balance these goals we only share segments when the copy will be large.
        if (byteCount >= SHARE_MINIMUM) {
          prefix = new Segment(this);
        } else {
          prefix = SegmentPool.take();
          System.arraycopy(data, pos, prefix.data, 0, byteCount);
        }
    
        prefix.limit = prefix.pos + byteCount;
        pos += byteCount;
        prev.push(prefix);
        return prefix;
      }
    

    为了防止一个很小的片段就进行共享,我们知道共享之后为了防止数据污染就无法写了,如果存在大片的共享小片段,实际上是很浪费资源的,所以通过这个对比可以看出这个最小数的意义。为了效率在移动大数据的时候直接移动整个segment而不是data,这样在写数据上能达到很高的效率

    SegmentPool属性分析

    final class SegmentPool {
      /** The maximum number of bytes to pool. */
      // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
      static final long MAX_SIZE = 64 * 1024; // 64 KiB.
    
      /** Singly-linked list of segments. */
      static @Nullable Segment next;
    
      /** Total bytes in this pool. */
      static long byteCount;
    
      ...
      
    }
    

    SegmentPool实际上是Segment的对象池,这个池子的上限是64K,相当于8个segment,next这个节点可以看出这个SegmentPool是按照单链表的方式进行存储的,byteCount则是已有的大小。SegmentPool采用栈的方式来管理Segment对象。

    ByteString分析

    在BufferedSink中可以直接写入ByteString,ByteString是不可变的。不可变的对象有许多的好处,首先本质是线程安全的,不要求同步处理,也就是没有锁之类的性能问题,而且可以被自由的共享内部信息。

    public class ByteString implements Serializable, Comparable<ByteString> {
      static final char[] HEX_DIGITS =
          { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
      private static final long serialVersionUID = 1L;
    
      /** A singleton empty {@code ByteString}. */
      public static final ByteString EMPTY = ByteString.of();
    
      final byte[] data;
      transient int hashCode; // Lazily computed; 0 if unknown.
      transient String utf8; // Lazily computed.
    
      ...
      
    }
    

    ByteString内部有两个属性,byte数组和String,这样能够让这个类在Byte和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式,为了性能Okio做了很多的事情。

    Buffer分析

    前面提到RealBufferedSink的操作实际上是由Buffer代理完成的。
    整个Buffer持有了一个Segment的引用,通过这个引用能拿到整个链表中所有的数据。
    Buffer一共实现了三个接口,读,写,以及clone。先从最简单的clone说起,clone是一种对象生成的方式,是除了常规的new·关键字以及反序列化之外的一种方式,主要分为深拷贝和浅拷贝两种,Buffer采用的是深拷贝的方式

    @Override public Buffer clone() {
        Buffer result = new Buffer();
        if (size == 0) return result;
    
        result.head = new Segment(head);
        result.head.next = result.head.prev = result.head;
        for (Segment s = head.next; s != head; s = s.next) {
          result.head.prev.push(new Segment(s));
        }
        result.size = size;
        return result;
      }
    

    对应实现的clone方法,如果整个Buffer的size为0,也就是没有数据,那么就返回一个新建的Buffer对象,如果不为空就是遍历所有的segment并且都创建一个对应的Segment,这样clone出来的对象就是一个全新的毫无关系的对象。前面分析segment的时候有讲到是一个双向循环链表,但是segment自身构造的时候却没有形成闭环,其实就是在Buffer中产生的。
    除了clone接口外,同时还有两个接口BufferedSink,BufferedSource。Buffer实现了这两个接口的所有方法,所以既然读也有写的方法。

    @Override public Buffer writeShort(int s) {
        Segment tail = writableSegment(2);
        byte[] data = tail.data;
        int limit = tail.limit;
        data[limit++] = (byte) ((s >>> 8) & 0xff);
        data[limit++] = (byte)  (s        & 0xff);
        tail.limit = limit;
        size += 2;
        return this;
      }
    

    writeShort用来给Buffer中写入一个short的数据,首先通过writableSegment拿到一个能够有2个字节空间的segment,tail中的data就是字节数组,limit则是数据的尾部索引,写数据就是在尾部继续往后写,直接设置在data通过limit自增后的index,然后重置尾部索引,并且buffer的size大小加2。

    @Override public short readShort() {
        if (size < 2) throw new IllegalStateException("size < 2: " + size);
    
        Segment segment = head;
        int pos = segment.pos;
        int limit = segment.limit;
    
        // If the short is split across multiple segments, delegate to readByte().
        if (limit - pos < 2) {
          int s = (readByte() & 0xff) << 8
              |   (readByte() & 0xff);
          return (short) s;
        }
    
        byte[] data = segment.data;
        int s = (data[pos++] & 0xff) << 8
            |   (data[pos++] & 0xff);
        size -= 2;
    
        if (pos == limit) {
          head = segment.pop();
          SegmentPool.recycle(segment);
        } else {
          segment.pos = pos;
        }
    
        return (short) s;
      }
    

    读的方法相对于写的方法就复杂一些,因为buffer是分块的,读数据的过程就有可能是跨segment的,比如前面一个字节,下一个segment一个字节,这种情况就转化为readbyte,读两个字节后合成一个short对象,对于连续的读可以直接通过pos索引自增达到目的,读完后Buffer的size减2。并且会有当前的segment会出现读完后数据为null的情况,此时头部索引pos和尾部索引limit就重合了,通过pop方法可以把这个segment分离出来,并且将下一个segment设置为Buffer的head,然后将分离出来的segment回收到对象池中。

    总结

    Okio这个库的精髓,第一就是快,Okio采取了空间换时间的方式比如Segment和ByteString之类的存储来让IO操作尽可能不成为整个系统的瓶颈,虽然采取这种方式但是在内存上也是极致的优化,使用的片段共享以及整体的读写共享来加快大字节数组的读写,第二就是稳定,Okio提供了超时机制,不仅在IO操作上加上超时的判定,包括close,flush之类的方法中都有超时机制,这让上层不会错过一个可能导致系统崩溃的超时异常,第三就是方便,Sink,Source两个包装了写和读,区别于传统的IO各种不同的输入输出流,这里只有一种而且支持socket,十分的方便。当然Okio还有很多其他的好处,易于扩展,代码量小易于阅读,我想这就是许多上层库选择Okio来作为IO操作的原因。

    参考地址

    大概是最完全的Okio源码解析文章

    相关文章

      网友评论

      本文标题:Okio源码分析与性能优化思想

      本文链接:https://www.haomeiwen.com/subject/pslqxftx.html