美文网首页Android程序员
[搞定开源] 第二篇 okio 1.14原理

[搞定开源] 第二篇 okio 1.14原理

作者: 展翅而飞 | 来源:发表于2018-07-13 14:48 被阅读0次

    Android开源项目原理系列

    [搞定开源] 第一篇 okhttp 3.10原理

    okio是okhttp的io部分,可以单独使用。知道它的存在后,我就爱不释手,因为它真的十分精彩。

    按官方的介绍,okio是对java io和nio的补充,以便更加容易处理数据输入输出。

    demo

    private void write(File file) throws IOException {
        try (Sink sink = Okio.sink(file);
             BufferedSink bufferedSink = Okio.buffer(sink)) {
    
            bufferedSink.writeUtf8("寥落古行宫").writeUtf8(System.lineSeparator());
            bufferedSink.writeUtf8("宫花寂寞红").writeUtf8(System.lineSeparator());
            bufferedSink.writeUtf8("白头宫女在").writeUtf8(System.lineSeparator());
            bufferedSink.writeUtf8("闲坐说玄宗").writeUtf8(System.lineSeparator());
        }
    }
    
    private void read(File file) throws IOException {
        try (Source source = Okio.source(file);
             BufferedSource bufferedSource = Okio.buffer(source)) {
    
            for (String line; (line = bufferedSource.readUtf8Line()) != null; ) {
                System.out.println(line);
            }
        }
    }
    

    先来看okio的基本使用,例子是一个写方法、一个读方法,Sink和Source对应java io的OutputStream和InputStream。

    try后面加了个括号,这是java7对try-catch的改进。结束try里的代码时,自动执行括号对象的close方法(对象需要实现Closeable),io代码的福音。

    okio的代码不多,借助idea的show diagrams功能,将okio全部类和继承关系打印出来:

    okio类

    ByteString

    ByteString是一个不可变的字节串,引用官方对ByteString诙谐的介绍:

    ByteString is String's long-lost brother, making it easy to treat binary data as a value.

    final byte[] data;
    transient String utf8; // Lazily computed.
    

    数据在ByteString里同时保存为byte[]和String,所以两者的转换易如反掌(都存成两份了,哈),典型的空间换时间。注意到String标记成transient,所以在序列化中,只处理byte[],无必要浪费功夫多理String。

    有了ByteString,各种字符转换和处理就很容易了(一些工具类可以干掉了),挑几个方法看看:

    public String utf8() {
      String result = utf8;
      return result != null ? result : (utf8 = new String(data, Util.UTF_8));
    }
    public String base64() {
      return Base64.encode(data);
    }
    public ByteString md5() {
      return digest("MD5");
    }
    public ByteString sha1() {
      return digest("SHA-1");
    }
    

    Source和Sink

    okio对数据的处理和java io类似,使用了流的概念。Source是输入流,Sink是输出流,分别定义了基础的read/write方法。

    我们知道,java io是“装饰者模式”的经典实现,在InputStream/OutputStream的基础上,有很多装饰类,动态地为流添加各种各样的功能。这是个很好的设计模式,但个人感觉类多了点,而Source/Sink的实现类只有几个,用得最多是buffer相关的类,提供大部分方法,简单些又不失功能。

    网络上偷张《Head First设计模式》讲装饰者模式的图:


    java io装饰者

    Buffer

    和ByteString处理不可变字节串相对,Buffer处理的是可变字节串。定义在BufferedSource/BufferedSink,对应的实现类是RealBufferedSource/RealBufferedSink。回顾上图okio所有类,Buffer在Source/Sink的中间,打通两者的操作。

    每个BufferedSource/BufferedSink都会创建自己的Buffer,一一对应:

    public final Buffer buffer = new Buffer();
    

    segment

    缓存操作的数据结构是segment,可以想象成一块块片段组成了整个buffer。

    final byte[] data;
    int pos;
    int limit;
    boolean shared;
    boolean owner;
    Segment next;
    Segment prev;
    

    segment其实就是长度固定的数组(SIZE=8192),两个指针pos和limit分别指示开始和结束的index,next和prev将segment构成一个双向链表。segmen还有两个布尔标记共享:

    • shared:是否和其他segment共享数据;
    • owner:是否独享数据。

    okio对缓存操作的核心思想就是:

    在buffer之间共享segment。

    在java io中,流与流的缓存是无联系的,如果输入流的数据要写入输出流,需要将数据复制过去。在okio中,相同的动作,只需要将segment从一个buffer分配给另一个buffer。好处,显而易见。


    • push/pop
    • compact
    • spilt
    • writeTo

    上面几个是操作segment的函数,在后文会用到,早点准备。函数的逻辑实质是链表的操作,其中push/pop将segment加入或移出链表,小学水平,路过。

    public void writeTo(Segment sink, int byteCount) {
      if (!sink.owner) throw new IllegalArgumentException();
      if (sink.limit + byteCount > SIZE) {
        // We can’t fit byteCount bytes at the sink’s current position. Shift sink first.
        if (sink.shared) throw new IllegalArgumentException();
        if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
        System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
        sink.limit -= sink.pos;
        sink.pos = 0;
      }
    
      System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
      sink.limit += byteCount;
      pos += byteCount;
    }
    

    writeTo将当前segment中的数据写入另一个segment,写入前需要判断owner和sheared。目标segment的空间需要足够,但有可能是read导致pos向后移,这个时候pos会大于0。已读完的数据可以覆盖,所以先移一移数据。剩下的,就是当前segment数组内容复制到目标segment数组。

    public void compact() {
      if (prev == this) throw new IllegalStateException();
      if (!prev.owner) return; // Cannot compact: prev isn‘t writable.
      int byteCount = limit - pos;
      int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
      if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
      writeTo(prev, byteCount);
      pop();
      SegmentPool.recycle(this);
    }
    

    为了节约内存空间,空闲的segment会尝试合并前一个segment。如果空间足够容纳,当前segment的内容writeTo前一个segment,然后回收当前segment。

    public Segment split(int byteCount) {
      if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
      Segment prefix;
    
      // We have two competing performance goals:
      //  - Avoid copying data. We accomplish this by sharing segments.
      //  - Avoid short shared segments. These are bad for performance because they are readonly and
      //    may lead to long chains of short segments.
      // To balance these goals we only share segments when the copy will be large.
      if (byteCount >= SHARE_MINIMUM) {
        prefix = sharedCopy();
      } else {
        prefix = SegmentPool.take();
        System.arraycopy(data, pos, prefix.data, 0, byteCount);
      }
    
      prefix.limit = prefix.pos + byteCount;
      pos += byteCount;
      prev.push(prefix);
      return prefix;
    }
    

    split方法将一个segment分割成两个segment,前者保存的数据范围:pos..pos+byteCount,后者保存的数据范围:pos+byteCount..limit。在创建新segment时,会根据内容多少控制segment是否shared(size>=1024),避免共享太细的segment。

    SegmentPool

    为了提高性能,无用的segment不会白白丢弃,SegmentPool提供了回收机制(享元模式,另一个例子是Handler发送的Message)。

    static void recycle(Segment segment) {
      if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
      if (segment.shared) return; // This segment cannot be recycled.
      synchronized (SegmentPool.class) {
        if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
        byteCount += Segment.SIZE;
        segment.next = next;
        segment.pos = segment.limit = 0;
        next = segment;
      }
    }
    

    recycle方法将无用的segment放入SegmentPool的单链表存着,下次重复使用时,调用take取出来:

    static Segment take() {
      synchronized (SegmentPool.class) {
        if (next != null) {
          Segment result = next;
          next = result.next;
          result.next = null;
          byteCount -= Segment.SIZE;
          return result;
        }
      }
      return new Segment(); // Pool is empty. Don’t zero-fill while holding a lock.
    }
    

    有存货就从链表里取出来,无就新建。

    write

    okio的缓存实质就是对segment读写,停一分钟想想对segment的write/read需要考虑什么问题。

    好了,segment里数组的长度固定,我们读写的数据可能是各种格式:Int、Long、String等。有可能读写发生在一个segment,也可能需要跨多个segment。

    具体来看例子里调用BufferedSink的writeUtf8,里面调用buffer的writeUtf8。写入一段string会比较长,writeUtf8进行分拆处理,核心是对分拆的字符调用writeByte方法。

    @Override public Buffer writeByte(int b) {
      Segment tail = writableSegment(1);
      tail.data[tail.limit++] = (byte) b;
      size += 1;
      return this;
    }
    

    写入操作就是两步:1、获取一个剩余空间足够的segment;2、数据写入segment里的数组。

    Segment writableSegment(int minimumCapacity) {
      if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    
      if (head == null) {
        head = SegmentPool.take(); // Acquire a first segment.
        return head.next = head.prev = head;
      }
    
      Segment tail = head.prev;
      if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
        tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
      }
      return tail;
    }
    

    writableSegment入参是最低要求的空间长度,如果head不存在,直接新建;否则通过head.prev取得链表尾,检查长度并返回。其他一堆write方法,根据数据类型,控制写入长度,大同小异。

    最精华的地方来了,来看write(Buffer source, long byteCount) ,入参是另一个buffer:

    @Override public void write(Buffer source, long byteCount) {
      if (source == null) throw new IllegalArgumentException("source == null");
      if (source == this) throw new IllegalArgumentException("source == this");
      checkOffsetAndCount(source.size, 0, byteCount);
    
      while (byteCount > 0) {
        //1
        if (byteCount < (source.head.limit - source.head.pos)) {
          Segment tail = head != null ? head.prev : null;
          if (tail != null && tail.owner
              && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
            //2
            source.head.writeTo(tail, (int) byteCount);
            source.size -= byteCount;
            size += byteCount;
            return;
          } else {
            //3
            source.head = source.head.split((int) byteCount);
          }
        }
    
        // 4
        Segment segmentToMove = source.head;
        long movedByteCount = segmentToMove.limit - segmentToMove.pos;
        source.head = segmentToMove.pop();
        if (head == null) {
          head = segmentToMove;
          head.next = head.prev = head;
        } else {
          Segment tail = head.prev;
          tail = tail.push(segmentToMove);
          tail.compact();
        }
        source.size -= movedByteCount;
        size += movedByteCount;
        byteCount -= movedByteCount;
      }
    }
    

    从别的buffer读取内容写入当前buffer,正是前文提到的,okio利用segment共享提高性能的体现。来源buffer需要考虑跨segment,目标buffer也需要考虑跨segment。

    进入循环,每次操作一段数据,从byteCount减去本次操作数据的长度,直到byteCount为零。mark1判断是否source.head包括所有要操作的数据,是的话就比较简单:

    source:[aabbbb] -> [bbbb]
    sink:[head]-[10%]  -> [head]-[10%+aa]
    

    mark2,source.head剩余[aa]需要输入,尾segment能够容纳,直接调用writeTo,最终[aa]复制进尾segment。

    source:[aabbbb] -> [aa]-[bbbb]
    sink:[head]-[80%] -> [head]-[80%]-[aa]
    

    mark3,source[aabbbb]还剩一段[aa]需要写入,尾segment不能容纳,无法使用writeTo。这个时候,需要将source分割成[aa]-[bbbb]两个,在mark4中将[aa]接入尾segment。

    mark4就是source的segment接入当前buffer的过程,当操作完成后,会尝试compact节约空间。

    纵观上述的过程,只有mark2有复制数据的动作,而且也是必须的,节约了内存。其他情况都是将segment指向不同的buffer,非常节约性能,真是666。

    read

    再来看read方法,例子里调用的readUtf8Line最终调用了buffer的readString:

    @Override public String readString(long byteCount, Charset charset) throws EOFException {
       checkOffsetAndCount(size, 0, byteCount);
       if (charset == null) throw new IllegalArgumentException("charset == null");
       if (byteCount > Integer.MAX_VALUE) {
         throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
       }
       if (byteCount == 0) return "";
    
       Segment s = head;
       if (s.pos + byteCount > s.limit) {
         // If the string spans multiple segments, delegate to readBytes().
         return new String(readByteArray(byteCount), charset);
       }
    
       String result = new String(s.data, s.pos, (int) byteCount, charset);
       s.pos += byteCount;
       size -= byteCount;
    
       if (s.pos == s.limit) {
         head = s.pop();
         SegmentPool.recycle(s);
       }
    
       return result;
     }
    

    首先获取head所在的segment,判断读取的长度是否大于limit:

    • segment范围内:直接可以读完,如果刚好读完segment,将它移出链表并放入SegmentPool回收;
    • 跨segment读取:委托readByteArray处理。
    @Override public void readFully(byte[] sink) throws EOFException {
      int offset = 0;
      while (offset < sink.length) {
        int read = read(sink, offset, sink.length - offset);
        if (read == -1) throw new EOFException();
        offset += read;
      }
    }
    

    readByteArray创建了个字节数组存放结果,交由readFully处理。既然需要跨segment,少不了循环逐段读取,readFully的循环里调用了read方法。

    @Override public int read(byte[] sink, int offset, int byteCount) {
      checkOffsetAndCount(sink.length, offset, byteCount);
    
      Segment s = head;
      if (s == null) return -1;
      int toCopy = Math.min(byteCount, s.limit - s.pos);
      System.arraycopy(s.data, s.pos, sink, offset, toCopy);
    
      s.pos += toCopy;
      size -= toCopy;
    
      if (s.pos == s.limit) {
        head = s.pop();
        SegmentPool.recycle(s);
      }
    
      return toCopy;
    }
    

    read方法和上面说的readString方法类似,从segment读取的结果复制到目标数组。

    okhttp和okio

    okhttp连接上的流Http1Codec的输入输出用的就是okio:

    final BufferedSource source;
    final BufferedSink sink;
    

    Timeout

    okio对数据的读写提供超时机制,例如从InputSream获取Source(适配器,Sink/Source和OutputStream/InputSteam可以随意切换),有个入参Timeout,表示对读取数据有时间要求。具体Timeout的代码不打算细说,讲个大概。

    超时的指定有两种:

    • Timeouts:最大等待时间,比如从网络读取数据,通过指定最大等待时间判断网络是否可用;
    • Deadlines:截止时间,在这个时间之前要完成数据的操作。

    Timeout类很常规,是个同步超时类,除此之外,AsyncTimeout类继承Timeout,提供异步超时机制。在适配Socket时用的是AsyncTimeout,因为socket在等待数据时会阻塞,需要异步处理。

    AsyncTimeout互相之间形成链表,根据超时时间有序排序。有个唯一守护线程WatchDog,实现超时到达马上反馈。io阻塞时线程停顿,需要通过别人告知,这就是异步超时的原理。

    第一次知道异步超时处理,wonderful。

    总结

    okio说:“java io很好,也有不足的地方,由我来优化”。

    小而美,精巧强大,缓存部分对segment的操作非常值得学习。只言片语不能尽述,RTFSC。

    不care有无人看我写的笔记,反正我会了,哈哈。

    相关文章

      网友评论

        本文标题:[搞定开源] 第二篇 okio 1.14原理

        本文链接:https://www.haomeiwen.com/subject/ydxbuftx.html