美文网首页
Okio源码 - 解析

Okio源码 - 解析

作者: YocnZhao | 来源:发表于2019-02-21 15:51 被阅读0次

    本文基于版本

    compile 'com.squareup.okio:okio:2.2.2'
    

    前段时间看OKHttp,发现OKHttp是基于okio的,之前的时候写文件操作很多,当时也知道okio,但是并没有实际使用,现在也赶到这儿了,就啃一下okio这块骨头

    二话不必说,看源码
    Okio是okio开放出来的接口,理论上来说所有的调用的入口都是Okio这个类。Okio类的注释也很简单,提供了使用Okio必要的API。

    /** Essential APIs for working with Okio. */
    public final class Okio {
      private Okio() {
      }
      public static BufferedSource buffer(Source source) {
        return new RealBufferedSource(source);
      }
     public static BufferedSink buffer(Sink sink) {
        return new RealBufferedSink(sink);
      }
    
      /** Returns a sink that writes to {@code out}. */
      public static Sink sink(OutputStream out) {
        return sink(out, new Timeout());
      }
    
      private static Sink sink(final OutputStream out, final Timeout timeout) {
        if (out == null) throw new IllegalArgumentException("out == null");
        if (timeout == null) throw new IllegalArgumentException("timeout == null");
    
        return new Sink() {
          @Override public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);
            while (byteCount > 0) {
              timeout.throwIfReached();
              Segment head = source.head;
              int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
              out.write(head.data, head.pos, toCopy);
    
              head.pos += toCopy;
              byteCount -= toCopy;
              source.size -= toCopy;
    
              if (head.pos == head.limit) {
                source.head = head.pop();
                SegmentPool.recycle(head);
              }
            }
          }
          @Override public void flush() throws IOException {out.flush();}
          @Override public void close() throws IOException {out.close();}
          @Override public Timeout timeout() {return timeout;}
          @Override public String toString() { return "sink(" + out + ")";}
        };
      }
    }
    

    Okio类基本上是返回了两种类型,Sink跟Source,我们知道这两个东西在Okio中就相当于java io的OutputStream跟InputStream,最基础的IO接口。然后使用buffer方法得到BufferedSource或者BufferedSink。然后调用BufferedSource或者BufferedSink的方法,我们到BufferedSource中可以看到基本上所有的RealBufferedSource的功能都是通过Buffer类来实现的,我们来看下Buffer类和他的基础类Segment。

    1. Buffer
    2. Segment
    3. SegmentPool

    这几个是看下去的先决条件,So,先分析这几个的作用。从简单的看起,先看SegmentPool

    final class SegmentPool {
      /** The maximum number of bytes to pool. */
      // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
      static final long MAX_SIZE = 64 * 1024; // 64 KiB.
      /** Singly-linked list of segments. */
      static @Nullable Segment next;
      /** Total bytes in this pool. */
      static long byteCount;
    
      private SegmentPool() {
      }
    
      static Segment take() {
        synchronized (SegmentPool.class) {
          if (next != null) {
            Segment result = next;
            next = result.next;
            result.next = null;
            byteCount -= Segment.SIZE;
            return result;
          }
        }
        return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
      }
    
    /*segment加到单链表的最前面*/
      static void recycle(Segment segment) {
        if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
        if (segment.shared) return; // This segment cannot be recycled.
        synchronized (SegmentPool.class) {
          if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
          byteCount += Segment.SIZE;//byteCount加上一个Segment的size
          segment.next = next;//新加进来的segment的next赋值为当前pool的next
          segment.pos = segment.limit = 0;//pos limit都赋值为0
          next = segment;//当前pool的next赋值为加进来的segment,也就是加到最前面
        }
      }
    }
    
    

    SegmentPool是保存无用的Segments的单链表,用来避免内存抖动和零填充,是个线程安全的静态单例。两个方法:

    1. recycle(Segment segment) //加segment进来
    2. take() //取一个segment走

    代码很简单,in的时候最新的放到链表最前面,out的时候把之前面的取走。用一个叫做next的Segment标记应该取走哪一个。最大容量为64 * 1024,而每个segment的大小是8 * 1024,所以最多能装8个Segment。
    SegmentPool就是单纯的一个避免新建Segment的容器,需要Segment的时候从这里取一个就行了,就不用new一个Segment。
    new Segment的坏处是当Java开辟一块新的空间的时候,runtime必须把这块空间的所有byte都置为零才能返回给你,而很多时候这是不必要的。SegmentPool就是Okio来避免这些操作的处理。

    Segment是缓存的一个片段,buffer中的segment是一个循环链表,pool中的segment是单链表。

    final class Segment {
      /** The size of all segments in bytes. */
      static final int SIZE = 8192;//segment的size
      /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
      static final int SHARE_MINIMUM = 1024;//最小分割size
      final byte[] data;//存储的数据
      /** The next byte of application data byte to read in this segment. */
      int pos;//能读取到的第一个byte,也就是byte数据起点
      /** The first byte of available data ready to be written to. */
      int limit;//能被写入的第一个byte的位置,也就是最后一个数据的位置
      /** True if other segments or byte strings use the same byte array. */
      boolean shared;//是否共享
      /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
      boolean owner;//能否被修改
      /** Next segment in a linked or circularly-linked list. */
      Segment next;//下一个Segment
      /** Previous segment in a circularly-linked list. */
      Segment prev;//上一个Segment
    
      Segment() {
        this.data = new byte[SIZE];
        this.owner = true;
        this.shared = false;
      }
    
      Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
        this.data = data;
        this.pos = pos;
        this.limit = limit;
        this.shared = shared;
        this.owner = owner;
      }
      final Segment sharedCopy() {//返回当前的数据拷贝,new的Segment是shared的
        shared = true;
        return new Segment(data, pos, limit, true, false);
      }
      /** Returns a new segment that its own private copy of the underlying byte array. */
      final Segment unsharedCopy() {//区别于sharedCopy。返回一份clone
        return new Segment(data.clone(), pos, limit, false, true);
      }
      /**
       * Removes this segment of a circularly-linked list and returns its successor.
       * Returns null if the list is now empty.
       */
      public final @Nullable Segment pop() {
        Segment result = next != this ? next : null;
        prev.next = next;
        next.prev = prev;
        next = null;
        prev = null;
        return result;
      }
    
    public final Segment push(Segment segment) {
        segment.prev = this;
        segment.next = next;
        next.prev = segment;
        next = segment;
        return segment;
      }
    /**
       * Splits this head of a circularly-linked list into two segments. The first
       * segment contains the data in {@code [pos..pos+byteCount)}. The second
       * segment contains the data in {@code [pos+byteCount..limit)}. This can be
       * useful when moving partial segments from one buffer to another.
       *
       * <p>Returns the new head of the circularly-linked list.
       */
      public final Segment split(int byteCount) {
        if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
        Segment prefix;
    
       // We have two competing performance goals:
        //  - Avoid copying data. We accomplish this by sharing segments.
        //  - Avoid short shared segments. These are bad for performance because they are readonly and
        //    may lead to long chains of short segments.
        // To balance these goals we only share segments when the copy will be large.
        if (byteCount >= SHARE_MINIMUM) {
          prefix = sharedCopy();
        } else {
          prefix = SegmentPool.take();
    //public static void arraycopy(Object src, int srcPos, Object dest, int destPos, int length)
          System.arraycopy(data, pos, prefix.data, 0, byteCount);
        }
    
        prefix.limit = prefix.pos + byteCount;
        pos += byteCount;
        prev.push(prefix);
        return prefix;
      }
    ...
    }
    

    我们捡重点说,Segment主要数据存储在data中,这个data是一个size是8192的byte数组。pos是有效数据开始位置,limit是结束位置:


    data数据开始结束

    我们看上面的几个主要方法来理解它的主要参数:
    pop和push很简单就是简单的单链表操作。
    pop把自己从链表中拆出去,并吧自己的pre和next置null。
    push把新Segment放到自己后面。


    pop push图解
    我们看split(int byteCount),byteCount是想要分隔的数量,先if判断。小于零或者大于实际data数据肯定是不行的。
    判断是否大于SHARE_MINIMUM,是的话就直接返回copy,share自己的数据。反之,从SegmentPool中取一个unused的Segment,copy byteCount以内的数据过去(把自己data的pos开始的数据拷贝到prefix.data的0开始的位置,数量是byteCount)。然后把prefix.limit置成pos+byteCount,自己的数据开始位置pos+=byteCount,然后把新的Segment放到自己的prev后面,也就是放到prev和自己之间。 spilt方法图解 点图片看大图

    Segment大致就是这样。然后来看Buffer,Buffer是个大类,我们先从简单用法入手,假如我们有个txt文件,我们要把这里面的内容读出来并打印,我们可能会这样调用

    private static void readSimpleString(File file) {
            try {
                Source source = Okio.source(file);
                BufferedSource bufferedSource = Okio.buffer(source);
                String read = bufferedSource.readString(Charset.forName("utf-8"));
                LogUtil.d("read->" + read);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    

    我们实际上在拿着BufferedSource在做操作,我们去看BufferedSource的readString

     @Override public String readString(Charset charset) throws IOException {
        if (charset == null) throw new IllegalArgumentException("charset == null");
    
        buffer.writeAll(source);
        return buffer.readString(charset);
      }
    
      @Override public String readString(long byteCount, Charset charset) throws IOException {
        require(byteCount);
        if (charset == null) throw new IllegalArgumentException("charset == null");
        return buffer.readString(byteCount, charset);
      }
    

    我们看实际上是调了buffer的readString,我们看Buffer类:

    @Override public String readString(Charset charset) {
        try {
          return readString(size, charset);
        } catch (EOFException e) {
          throw new AssertionError(e);
        }
      }
    
      @Override public String readString(long byteCount, Charset charset) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount);
        if (charset == null) throw new IllegalArgumentException("charset == null");
        if (byteCount > Integer.MAX_VALUE) {
          throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
        }
        if (byteCount == 0) return "";
    
        Segment s = head;
        if (s.pos + byteCount > s.limit) {//如果大于head的limit说明包含多个segment,调用readByteArray
          // If the string spans multiple segments, delegate to readBytes().
          return new String(readByteArray(byteCount), charset);
        }
    //一个Segment就能搞定的情况
        String result = new String(s.data, s.pos, (int) byteCount, charset);
        s.pos += byteCount;
        size -= byteCount;
    //这个Segment已经读完了,就扔到SegmentPool里面并且得到新的head
        if (s.pos == s.limit) {
          head = s.pop();
          SegmentPool.recycle(s);
        }
    
        return result;
      }
    

    我们看判断了count,如果count小于head的limit,也就是一个segment就能搞定就完事儿了,当然大部分情况一个Segment是搞不定了,那我们继续往下看readByteArray:

    @Override public byte[] readByteArray(long byteCount) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount);
        if (byteCount > Integer.MAX_VALUE) {
          throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
        }
    
        byte[] result = new byte[(int) byteCount];
        readFully(result);
        return result;
      }
    

    new了一个count size的byte数组,调用了readFully

     @Override public void readFully(byte[] sink) throws EOFException {
        int offset = 0;
        while (offset < sink.length) {
          int read = read(sink, offset, sink.length - offset);
          if (read == -1) throw new EOFException();
          offset += read;
        }
      }
    

    这里用了while循环,使用offest作为判断条件,offest不到length不罢休。这里我们继续看到read里面

     @Override public int read(byte[] sink, int offset, int byteCount) {
        checkOffsetAndCount(sink.length, offset, byteCount);
     //找到当前的head Segment
        Segment s = head;
        if (s == null) return -1;
        int toCopy = Math.min(byteCount, s.limit - s.pos);//得到需要拷贝的数量,是head的有效数据容量还是byteCount
        System.arraycopy(s.data, s.pos, sink, offset, toCopy);//数据拷贝
    
        s.pos += toCopy;//head 的pos加上已经拷贝的数量
        size -= toCopy;//需要读的总的数量减去已经读到的
    
        if (s.pos == s.limit) {//读过的segment扔到SegmentPool缓存或者丢掉
          head = s.pop();//s的next作为新的head
          SegmentPool.recycle(s);
        }
    
        return toCopy;
      }
    

    read方法的三个参数(目标byte数组,已经读了多少,还要读多少),这里读取的逻辑就清晰了,当前head的数据拷贝到目标byte数组sink里面,使用上面讲过的Segment.pop()方法找到next作为新的head等while继续读,读完为止。
    这样我们就看完了Okio读一个String的源码。
    下面我们看下写:

    final class RealBufferedSink implements BufferedSink {
      public final Buffer buffer = new Buffer();
      public final Sink sink;
      boolean closed;
    
      RealBufferedSink(Sink sink) {
        if (sink == null) throw new NullPointerException("sink == null");
        this.sink = sink;
      }
    
      @Override public Buffer buffer() {
        return buffer;
      }
    ...
      @Override public BufferedSink writeString(String string, Charset charset) throws IOException {
        if (closed) throw new IllegalStateException("closed");
        buffer.writeString(string, charset);
        return emitCompleteSegments();
      }
    
    @Override public BufferedSink writeByte(int b) throws IOException {
        if (closed) throw new IllegalStateException("closed");
        buffer.writeByte(b);
        return emitCompleteSegments();
      }
    
    @Override public BufferedSink emitCompleteSegments() throws IOException {
        if (closed) throw new IllegalStateException("closed");
        long byteCount = buffer.completeSegmentByteCount();
        if (byteCount > 0) sink.write(buffer, byteCount);
        return this;
      }
    ...
    }
    

    我们看每个write都是调用到了emitCompleteSegments()这个方法,先获得要写入的count

    /**
       * Returns the number of bytes in segments that are not writable. This is the
       * number of bytes that can be flushed immediately to an underlying sink
       * without harming throughput.
       */
      public final long completeSegmentByteCount() {
        long result = size;
        if (result == 0) return 0;
    
        // Omit the tail if it's still writable.
        Segment tail = head.prev;
        if (tail.limit < Segment.SIZE && tail.owner) {
          result -= tail.limit - tail.pos;
        }
    
        return result;
      }
    

    再调用sink.write写入,我们应该还记得Okio里获得获取Sink的时候都是返回的一个匿名内部类,这里实现的write

    @Override public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);
            while (byteCount > 0) {
              timeout.throwIfReached();
              Segment head = source.head;
              int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
              out.write(head.data, head.pos, toCopy);
    
              head.pos += toCopy;
              byteCount -= toCopy;
              source.size -= toCopy;
    
              if (head.pos == head.limit) {
                source.head = head.pop();
                SegmentPool.recycle(head);
              }
            }
    

    还是使用了Segment,循环写入到os里面,完事儿调flush close就写进去了~

    这里我们先总结一下看过的东西:

    1. Okio类是我们调用的总的入口,我们得到BufferedSource做读取,BufferedSource最终使用了Buffer类来做实际逻辑操作。
    2. Segment是实际存储byte数组的类,并提供了几个基本方法
      2.1. pop:链表中弹出自己并返回下一个Segment
      2.2. push:添加新的Segment到自己后面
      2.3. spilt:根据SHARE_MINIMUM 切割一个新的Segment出来并放到链表里面(是否共享)
    3. SegmentPool保存已经使用完的Segment来避免内存抖动和零填充(JVM开辟内存时置零的消耗),SegmentPool只能存8个Segment,多了直接丢掉。
      3.1. recycle(Segment segment) //加新的segment进来
      3.2. take() //取一个segment走
    4. Buffer类是实际做读写逻辑的类,readString的时候是使用readFully里面的while循环read,read里面使用Segment的pop方法不断获取next的Segment并读取data的内容一直到结束。

    相关文章

      网友评论

          本文标题:Okio源码 - 解析

          本文链接:https://www.haomeiwen.com/subject/qvfkyqtx.html