美文网首页面试题网络
Okio从懵逼到掌握

Okio从懵逼到掌握

作者: mandypig | 来源:发表于2019-11-04 17:58 被阅读0次

okio作为java io流的开源处理库,以其短小精悍,性能高效而著称。和大多数人一样,接触okio都是从okhttp开始,okhttp源码实现上底层的流处理方式正是使用自家的okio库来操作。这也可能就是为什么okhttp会以ok开头的原因吧。看到okhttp内部关于okio的操作,一开始我的内心的拒绝的,直到看到公司项目内部有同事在使用,扶我起来我还能继续学。再熟悉了相关api调用,理解了内部原理之后,嗯 真香,有机会我也用用。
和原生java io流操作相比,okio的操作显得相对简洁。既然要去了解okio,那么首先就需要先去明白java自带的io有什么缺点。

原生java io流

原生的java io流涉及到的类繁多,当初学习java,相信大家一定都被io流各种包装类给震撼到。io流的设计使用了典型的装饰者模式,各种大套嵌小套,一种俄罗斯套娃的既视感,为了实现某种操作往往需要配合不同功能的io增加类,比如比较典型的按行读取文件操作

        FileInputStream fis = new FileInputStream(file);
        InputStreamReader isr = new InputStreamReader(new FileInputStream(file), "UTF-8");
        BufferedReader br = new BufferedReader(isr);
        String line ;
        while ((line = br.readLine()) != null) {
            Log.e("mandy","line=="+line);
        }
当然这只是io流比较常规的操作,一些另类操作还需要借助其他的io类,放一张io流的全家福让大家再次感受下原生io流家族的庞大。 1186236-20170628084745102-439177264.png

如果说原生io操作比较繁琐没有伤及到根本,那么另一个缺点在读写转换的时候,涉及到的内存拷贝问题才是io流最大的性能问题。io流类多,操作比较繁琐这个点我们完全可以通过封装工具类来解决,毕竟公司项目内部对于一些io操作都会封装出对应的类来操作,不可能每次都会自己来做io操作,从这个层面来说,原生io流这个缺点还算可以接受。但是io流读写涉及到的冗余拷贝问题才是io流中的根本问题,okio正是通过自有的一套机制来巧妙解决掉这个问题。

java io流中的冗余拷贝问题,比较常见的一个使用场景就是复制文件,demo代码如下

File original = new File("hello.txt");
    try (
      InputStream in = new BufferedInputStream(new FileInputStream(original));
      OutputStream out = new BufferedOutputStream(new FileOutputStream(copied))) {
        byte[] buffer = new byte[1024];
        int lengthRead;
        while ((lengthRead = in.read(buffer)) > 0) {
            out.write(buffer, 0, lengthRead);
            out.flush();
        }
    }

为了将内容复制到另一个文件中,需要一个中间缓存区buffer用来保存读取到的数据,也就是说数据的流向一个完整的过程是这样的,BufferedInputStream 自带buffer---->自定义buffer---->BufferedOutputStream 自带buffer,使用java io流这个过程无法避免,我们没有办法去除中间的自定义buffer,实现高效的
BufferedInputStream 自带buffer---->BufferedOutputStream 自带buffer,okio的出现正是为了解决这个核心问题,少一次拷贝过程对于读写操作是有性能提升的。当然okio不仅仅解决了这个问题,还自带其他特性,比如超时机制,封装了大量方便的api来满足我们的日常开发,而不必像传统io流需要借助各种类来实现相应功能。

okio 常见类

okio比较简洁,通过几个类就能实现各种操作,以常见的读取文件来说

BufferedSource bufferedSource = Okio.buffer(Okio.source(file));
while (!bufferedSource.exhausted()) {
                String line = bufferedSource.readUtf8Line();
                Log.e("mandy", "line==" + line);
}

source可以看成是io流中的inputstream,buffer方法类似io流中的bufferedinputstream,让inputstream具有buffer的功能,readUtf8Line就是读行操作,这里有几个点主要注意一下
(1)okio本质是还是使用的java中的io流来操作,看下source的源码就能看出来

public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }

内部通过FileInputStream来实现最终的读操作,和我们平时的操作没有太大的区别
(2)okio的流的读取使用原生的行为,但是在缓存使用方面却是使用自己的一套机制,抛弃了原生的bufferedinputstream,这也是okio读写高效的一个根本原因,简单看下buffer的内部实现

public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

封装了RealBufferedSource,看下RealBufferedSource的继承

final class RealBufferedSource implements BufferedSource,
public interface BufferedSource extends Source 

BufferedSource内部封装了大量和读相关的方法比如exhausted,readByte,readUtf8Line,readAll等等,okio源码注释中对于这些方法都有大量详细的说明,感兴趣的可以进源码了解使用方法。Source就一个接口类内部3个方法read,close,timeout,timeout用来实现超时机制,可以先不关注。

和source对应的一个类就是sink,类似于outputstream,用来实现写操作,同样的也有BufferedSink和RealBufferedSink,感受下如何使用

 BufferedSink bufferedSink = Okio.buffer(Okio.sink(file1));
 bufferedSink.writeUtf8("yes okio");
 bufferedSink.close();

不管是RealBufferedSource还是RealBufferedSink,内部本质上都是通过Buffer这个类来实现缓存功能,而Buffer内部又是通过Segment这个核心类来缓存读取到的数据,Buffer就是一个管理者用来调度Segment。总体来说和okio核心相关的类就这么几个,是不是看着还是比较简单的。

RealBufferedSource

final class RealBufferedSource implements BufferedSource {
  public final Buffer buffer = new Buffer();
  public final Source source;
  boolean closed;

  ......
}

RealBufferedSource的成员变量就这么几个,结合上面的示例代码可以知道source表示的就是传入的读取数据源,closed表示文件是否被关闭,读取关闭的文件会抛出异常,比较关键的成员就是这个Buffer了,数据的缓存就是通过这个类完成的。

Buffer

先看下它的继承结构以及成员变量

public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  private static final byte[] DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  static final int REPLACEMENT_CHARACTER = '\ufffd';

  Segment head;
  long size;

  public Buffer() {
  }
}

可以看到Buffer实现了BufferedSource,BufferedSink两个接口,也就是说Buffer同时具备读和写的功能。成员变量中重要的就这么两个,head表示链表头,通过head可以找到它的前驱以及后继节点,Segment是一个双向链表结构,具体等下说明。Buffer就是通过head找到缓存的读写数据。size表示读取文件的总的字节数量。

Segment

Buffer内部通过head指向Segment,它是数据存储的真正类,看下它的结构

final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 8192;

  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;

  final byte[] data;

  /** The next byte of application data byte to read in this segment. */
  int pos;

  /** The first byte of available data ready to be written to. */
  int limit;

  /** True if other segments or byte strings use the same byte array. */
  boolean shared;

  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;

  /** Next segment in a linked or circularly-linked list. */
  Segment next;

  /** Previous segment in a circularly-linked list. */
  Segment prev;

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }
}

SIZE: Segment内部维护着一个固定的缓存池,大小为8192,也就是说最多缓存8K的数据,如果文件大小超过8K如何缓存,就需要新建一个Segment链接到上一个Segment的后面。
SHARE_MINIMUM: Segment内部优化的一个阈值大小,比如一个Segment内部有5K的数据,如果有其中部分数据需要给其他Segment共享,这个SHARE_MINIMUM就是最小可共享的数据大小,关于SHARE_MINIMUM设计的意图官方已经给出了解释

// We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.

SHARE_MINIMUM就是为了避免共享太小的数据造成Segment链表过长。
data:缓存池,存储读写的数据
pos:已经读取到的位置,比如5K的数据读取了2K,那么pos所在位置就是2K处
limit:已经写到的位置,比如已经写入了5K的数据,那么limit就是5K
shared:表示Segment是否是共享的,配合SHARE_MINIMUM使用
owner:拥有者?,不太清楚没太仔细看,不影响正常代码理解
next,pre:后继前驱指针

Segmentpool

Segment池管理类,每次okio想要获取Segment都会从pool中获取,使用完毕后又会放回到pool中复用

介绍完毕,和okio相关的核心类就这么几个,关键就是看okio是如何去使用它们,以完成一次文件拷贝为例,跟踪下内部源码

okio拷贝文件源码分析

使用okio进行文件拷贝代码如下:

            BufferedSource bufferedSource = Okio.buffer(Okio.source(src));
            BufferedSink bufferedSink = Okio.buffer(Okio.sink(dest));
            bufferedSink.writeAll(bufferedSource);
            bufferedSink.close();

close方法调用必须加上,否则会出现拷贝文件内部不全或者没有数据的问题,具体原因在下面跟踪源码给出原因。

直接从writeAll开始分析,代码如下

@Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
      emitCompleteSegments();
    }
    return totalBytesRead;
  }

通过一个for循环将src中的数据全部都读取到buffer中,该buffer就是RealBufferedSink的成员变量。存储读到的全部数据,重点就来看下source.read(buffer, Segment.SIZE)是如何实现的,这里的source就是传入的Okio.buffer(Okio.source(src))对象,它是一个RealBufferedSource对象。查看下read方法,源码如下

@Override public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }

这里又有一个buffer成员变量,不过它是RealBufferedSource的,表示已经读取到的数据,初始情况下size==0进入到if分支内部执行 source.read(buffer, Segment.SIZE);,这里的source就是Okio.buffer(Okio.source(src));中传入的Okio.source(src)对象,跟踪下该对象的read方法

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

source方法内部通过new Source的形式返回一个source对象给RealBufferedSource,定位到read方法去看看源码是如何实现的。
timeout.throwIfReached();超时机制判断,如果io操作在一定时间没有响应会抛出异常,okio内部有两种超时机制,分别为同步超时机制,异步超时机制,这里就属于同步超时机制,但是在okio库中并没有具体的同步超时机制的实现,这部分内容感兴趣的可以去看看okhttp源码,内部就有关于超时机制的实现源码,这里就不展开分析,并不影响整体源码的理解。

Segment tail = sink.writableSegment(1);这里的sink就是RealBufferedSource传递过来的buffer对象,该代码的作用就是从buffer对象中拿到链表尾的Segment对象,代码也比较简单

Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }

    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

回到read方法继续分析

int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);

Segment.SIZE - tail.limit表示还能写入的最大数据量,和byteCount比较获取一个较小值,这里的byteCount为8k即Segment的最大存储值,接下来的代码我们就非常熟悉了,调用in的read方法,in就是fileinputstream,就是我们非常熟悉的java io操作了,从这里也能看到okio本质上还是使用的java io操作。

回到RealBufferedSource的read方法,这里再添一下

@Override public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }

执行完 long read = source.read(buffer, Segment.SIZE);后最终会调用到return buffer.read(sink, toRead);,注意这里的sink对象
是RealBufferedSink中的buffer,这句代码的意思就是将RealBufferedSource中的数据转移到RealBufferedSink中,很关键的一句代码,看看okio是如何去实现的。

 @Override public long read(Buffer sink, long byteCount) {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (size == 0) return -1L;
    if (byteCount > size) byteCount = size;
    sink.write(this, byteCount);
    return byteCount;
  }

通过调用sink.write(this, byteCount);来完成具体实现,实现原理会根据Source和Sink中Segment的不同状态实现策略会相应不同。有这种几种情况
(1)Source中需要传递的数据是"满"的情况,也就是8k都是有效数据,这种情况直接从source的buffer中拿到Segment,然后添加到sink的buffer上即可,和java io流相比,省去了中间的一次临时buffer拷贝,从而提高的读写效率
(2)Source中需要传递的数据不"满"的情况,通过pos和limit可以定位到有效数据区间,和Sink中buffer的尾Segment有效数据进行对比,如果两个Segment中的有效数据可以合并到一个Segment中那么会进行数据整理,多余的Segment会被回收到。
如果两个Segment的有效数据总和超过8k,那么直接将Source中的Segment链接到Sink中buffer的尾部即可。
(3)Source的buffer中的Segment只是传递部分数据,如5K的数据值传递其中2K,okio内部会通过split方法将Segment分成2K和3K两个Segment,然后将2K的Segment参照第二种情况和Sink中的Segment进行合并。

以上就是Segment的合并规则,可以看出okio在缓存处理方面还是下了一番功夫。执行完上述流程之后数据就已经从Source成功转移到了Sink中,接下来就看Sink是如何将数据写入到文件,完成最终的拷贝操作。回到writeAll源码中

@Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
      emitCompleteSegments();
    }
    return totalBytesRead;
  }

饶了一圈终于执行完了read方法,再来看下emitCompleteSegments做了什么

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }

public long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
      result -= tail.limit - tail.pos;
    }

    return result;
  }

重点就是这个completeSegmentByteCount,它的作用就是计算出所有"满"的segment的总字节数。
代码中的size是buffer的全部字节数,唯一可能不"满"的segment就是最后一个segment,所以直接得到tail即可,而tail.limit - tail.pos则是尾Segment的有效数据大小,(result-尾segment有效字节数)即得到所有"满"的segment及字节数。

可以看到if分支,当Segment是"满"的情况下是直接返回result,显然是大于0的情况,调用sink.write(buffer, byteCount)方法完成最终的数据写入文件。当Segment不"满"的情况下,比如一个文件只有5K数据量,此时result计算结果为0,导致buffer中的数据并不会立即写入到文件中,因此需要调用close或者flush方法强制将缓存数据写入到文件。也就是为什么开头的拷贝demo代码需要调用close的原因。

异步超时机制

okio相对于java io流一个完善的地方在于引入了超时机制,io的读写由于对于文件访问是互斥的可能造成访问资源无法获取,从而导致超时问题,okio实现了两种超时机制,同步超时和异步超时。同步超时就是每次在读写之前调用 timeout.throwIfReached();这里有一个问题就是如果这次io阻塞了,必须等到下一次io读写才会触发throwIfReached,所以更加合理的方式还是使用异步超时机制。这里添一下异步超时的大致原理

         enter();
          try {
            sink.write(source, toWrite);
            byteCount -= toWrite;
            throwOnTimeout = true;
          } catch (IOException e) {
            throw exit(e);
          } finally {
            exit(throwOnTimeout);
          }

每次在读写之前先调用enter方法将AsyncTimeout挂载到等待队列中,等待队列存放着多个AsyncTimeout,按照AsyncTimeout预计超时的时间,从小到大排列,也就是说排在队头的AsyncTimeout是接下来最先要被执行的,一旦超时时间到来就会执行AsyncTimeout的timeout方法,具体方法的执行需要开发者自己去实现,如果在规定的时间完成了io读写,那么就会触发exit方法,该方法就是将AsyncTimeout从等待队列移除。这种实现方式和android中的ANR原理几乎没什么区别。具体enter中的源码就不继续分析了,就是相关将AsyncTimeout添加到等待队列相应位置的操作,感兴趣的自己去看下。

其他

okio中其实自带了好用的工具类,比如btyestring,gzipsource,gzipsink都是非常好用的工具,这些工具在okhttp的源码都可以找到一些出处,比如将url转换成md5的16进制表示,文件存储时使用gzip压缩后大幅减少文件的大小

            ByteString byteString = ByteString.encodeString("https://www.jianshu.com", Charset.forName("UTF-8"));
            String hex = byteString.md5().hex();

            BufferedSource buffer = Okio.buffer(Okio.source(src));
            GzipSink gzipSink = new GzipSink(Okio.sink(dest));
            BufferedSink sink = Okio.buffer(gzipSink);
            sink.writeAll(buffer);
            sink.close();

okio原理总结

看完上述的分析应该对okio的实现原理有了一定的理解,这里再简单总结下:
okio抛弃了jdk中自带buffer的io流类,转而使用自己实现的缓存类来提高io性能。关键的类RealBufferedSource,RealBufferedSink。两者内部都是通过Buffer来实现数据的缓存。Buffer内部通过一个个Segment来真正缓存数据,每个Segment容量为8K,当数据存储不下就会从segment池中获取并链接到尾部。

RealBufferedSource从文件中获取数据存储到buffer中,如果涉及到文件拷贝的操作,还需要使用到RealBufferedSink,将RealBufferedSource存储的数据转移到RealBufferedSink的Buffer中,这个转移过程有别于传统io流操作,传统io需要自定义一个数组,将read buffer中数据转移到自定义数组,然后再转移到write buffer。而okio直接就可以完成RealBufferedSink Buffer向RealBufferedSink Buffer转移的过程,从而提高效率。

总结

到此关于okio的源码分析就结束了,总体来说实现上还是非常简洁巧妙的,在上述文章中是重点分析了拷贝文件的一个完整过程,如果只是单纯的读或者写操作,逻辑是相对简单点的,因为不会涉及到Segment之间的数据传递问题,数据的流向就是简单的从file到buffer缓存或者buffer缓存到file的过程,那么关于okio的Segment精髓就感受不到它的真正用途。
如果能把读写的流程搞明白,那么okio内部的一些其他操作看懂就只是时间问题了,然后再结合okhttp源码阅读,再看到okio的一些操作时就不会显得那么懵逼了。

相关文章

网友评论

    本文标题:Okio从懵逼到掌握

    本文链接:https://www.haomeiwen.com/subject/uxbbdqtx.html