美文网首页APP & program
Android IO 框架 Okio 的实现原理,到底哪里 OK

Android IO 框架 Okio 的实现原理,到底哪里 OK

作者: 彭旭锐 | 来源:发表于2023-02-09 20:33 被阅读0次

    前言

    大家好,我是小彭。

    今天,我们来讨论一个 Square 开源的 I/O 框架 Okio,我们最开始接触到 Okio 框架还是源于 Square 家的 OkHttp 网络框架。那么,OkHttp 为什么要使用 Okio,它相比于 Java 原生 IO 有什么区别和优势?今天我们就围绕这些问题展开。

    本文源码基于 Okio v3.2.0。


    思维导图


    1. 说一下 Okio 的优势?

    相比于 Java 原生 IO 框架,我认为 Okio 的优势主要体现在 3 个方面:

    • 1、精简且全面的 API: 原生 IO 使用装饰模式,例如使用 BufferedInputStream 装饰 FileInputStream 文件输入流,可以增强流的缓冲功能。但是原生 IO 的装饰器过于庞大,需要区分字节、字符流、字节数组、字符数组、缓冲等多种装饰器,而这些恰恰又是最常用的基础装饰器。相较之下,Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中所有基础的装饰器,使得框架更加精简;

    • 2、基于共享的缓冲区设计: 由于 IO 系统调用存在上下文切换的性能损耗,为了减少系统调用次数,应用层往往会采用缓冲区策略。但是缓冲区又会存在副作用,当数据从一个缓冲区转移到另一个缓冲区时需要拷贝数据,这种内存中的拷贝显得没有必要。而 Okio 采用了基于共享的缓冲区设计,在缓冲区间转移数据只是共享 Segment 的引用,而减少了内存拷贝。同时 Segment 也采用了对象池设计,减少了内存分配和回收的开销;

    • 3、超时机制: Okio 弥补了部分 IO 操作不支持超时检测的缺陷,而且 Okio 不仅支持单次 IO 操作的超时检测,还支持包含多次 IO 操作的复合任务超时检测。

    下面,我们将从这三个优势展开分析:


    2. 精简的 Okio 框架

    先用一个表格总结 Okio 框架中主要的类型:

    类型 描述
    Source 输入流
    Sink 输出流
    BufferedSource 缓存输入流接口,实现类是 RealBufferedSource
    BufferedSink 缓冲输出流接口,实现类是 RealBufferedSink
    Buffer 缓冲区,由 Segment 链表组成
    Segment 数据片段,多个片段组成逻辑上连续数据
    ByteString String 类
    Timeout 超时控制

    2.1 Source 输入流 与 Sink 输出流

    在 Java 原生 IO 中有四个基础接口,分别是:

    • 字节流: InputStream 输入流和 OutputStream 输出流;
    • 字符流: Reader 输入流和 Writer 输出流。

    而在 Okio 更加精简,只有两个基础接口,分别是:

    • 流: Source 输入流和 Sink 输出流。

    Source.kt

    interface Source : Closeable {
    
        // 从输入流读取数据到 Buffer 中(Buffer 等价于 byte[] 字节数组)
        // 返回值:-1:输入内容结束
        @Throws(IOException::class)
        fun read(sink: Buffer, byteCount: Long): Long
    
        // 超时控制(详细分析见后续文章)
        fun timeout(): Timeout
    
        // 关闭流
        @Throws(IOException::class)
        override fun close()
    }
    

    Sink.java

    actual interface Sink : Closeable, Flushable {
    
        // 将 Buffer 的数据写入到输出流中(Buffer 等价于 byte[] 字节数组)
        @Throws(IOException::class)
        actual fun write(source: Buffer, byteCount: Long)
    
        // 清空输出缓冲区
        @Throws(IOException::class)
        actual override fun flush()
    
        // 超时控制(详细分析见后续文章)
        actual fun timeout(): Timeout
    
        // 关闭流
        @Throws(IOException::class)
        actual override fun close()
    }
    

    2.2 InputStream / OutputStream 与 Source / Sink 互转

    在功能上,InputStream - Source 和 OutputStream - Sink 分别是等价的,而且是相互兼容的。结合 Kotlin 扩展函数,两种接口之间的转换会非常方便:

    • source(): InputStream 转 Source,实现类是 InputStreamSource;
    • sink(): OutputStream 转 Sink,实现类是 OutputStreamSink;

    比较不理解的是: Okio 没有提供 InputStreamSource 和 OutputStreamSink 转回 InputStream 和 OutputStream 的方法,而是需要先转换为 BufferSource 与 BufferSink,再转回 InputStream 和 OutputStream。

    • buffer(): Source 转 BufferedSource,Sink 转 BufferedSink,实现类分别是 RealBufferedSource 和 RealBufferedSink。

    示例代码

    // 原生 IO -> Okio
    val source = FileInputStream(File("")).source()
    val bufferSource = FileInputStream(File("")).source().buffer()
    
    val sink = FileOutputStream(File("")).sink()
    val bufferSink = FileOutputStream(File("")).sink().buffer()
    
    // Okio -> 原生 IO
    val inputStream = bufferSource.inputStream()
    val outputStream = bufferSink.outputStream()
    

    JvmOkio.kt

    // InputStream -> Source
    fun InputStream.source(): Source = InputStreamSource(this, Timeout())
    
    // OutputStream -> Sink
    fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())
    
    private class InputStreamSource(
        private val input: InputStream,
        private val timeout: Timeout
    ) : Source {
    
        override fun read(sink: Buffer, byteCount: Long): Long {
            if (byteCount == 0L) return 0
            require(byteCount >= 0) { "byteCount < 0: $byteCount" }
            try {
                // 同步超时监控(详细分析见后续文章)
                timeout.throwIfReached()
                // 读入 Buffer
                val tail = sink.writableSegment(1)
                val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()
                val bytesRead = input.read(tail.data, tail.limit, maxToCopy)
                if (bytesRead == -1) {
                    if (tail.pos == tail.limit) {
                        // We allocated a tail segment, but didn't end up needing it. Recycle!
                        sink.head = tail.pop()
                        SegmentPool.recycle(tail)
                    }
                    return -1
                }
                tail.limit += bytesRead
                sink.size += bytesRead
                return bytesRead.toLong()
            } catch (e: AssertionError) {
                if (e.isAndroidGetsocknameError) throw IOException(e)
                throw e
            }
      }
    
      override fun close() = input.close()
    
      override fun timeout() = timeout
    
      override fun toString() = "source($input)"
    }
    
    private class OutputStreamSink(
        private val out: OutputStream,
        private val timeout: Timeout
    ) : Sink {
    
        override fun write(source: Buffer, byteCount: Long) {
            checkOffsetAndCount(source.size, 0, byteCount)
            var remaining = byteCount
            // 写出 Buffer
            while (remaining > 0) {
                // 同步超时监控(详细分析见后续文章)
                timeout.throwIfReached()
                // 取有效数据量和剩余输出量的较小值
                val head = source.head!!
                val toCopy = minOf(remaining, head.limit - head.pos).toInt()
                out.write(head.data, head.pos, toCopy)
    
                head.pos += toCopy
                remaining -= toCopy
                source.size -= toCopy
    
                // 指向下一个 Segment
                if (head.pos == head.limit) {
                    source.head = head.pop()
                    SegmentPool.recycle(head)
                }
            }
        }
    
        override fun flush() = out.flush()
    
        override fun close() = out.close()
    
        override fun timeout() = timeout
    
        override fun toString() = "sink($out)"
    }
    

    Okio.kt

    // Source -> BufferedSource
    fun Source.buffer(): BufferedSource = RealBufferedSource(this)
    
    // Sink -> BufferedSink
    fun Sink.buffer(): BufferedSink = RealBufferedSink(this)
    

    2.3 BufferSource 与 BufferSink

    在 Java 原生 IO 中,为了减少系统调用次数,我们一般不会直接调用 InputStream 和 OutputStream,而是会使用 BufferedInputStreamBufferedOutputStream 包装类增加缓冲功能。

    例如,我们希望采用带缓冲的方式读取字符格式的文件,则需要先将文件输入流包装为字符流,再包装为缓冲流:

    Java 原生 IO 示例

    // 第一层包装
    FileInputStream fis = new FileInputStream(file);
    // 第二层包装
    InputStreamReader isr = new InputStreamReader(new FileInputStream(file), "UTF-8");
    // 第三层包装
    BufferedReader br = new BufferedReader(isr);
    String line;
    while ((line = br.readLine()) != null) {
        ...
    }
    // 省略 close
    

    同理,我们在 Okio 中一般也不会直接调用 Source 和 Sink,而是会使用 BufferedSourceBufferedSink 包装类增加缓冲功能:

    Okio 示例

    val bufferedSource = file.source()/*第一层包装*/.buffer()/*第二层包装*/
    while (!bufferedSource.exhausted()) {
        val line = bufferedSource.readUtf8Line();
        ...
    }
    // 省略 close
    

    网上有资料说 Okio 没有使用装饰器模式,所以类结构更简单。 这么说其实不太准确,装饰器模式本身并不是缺点,而且从 BufferedSource 和 BufferSink 可以看出 Okio 也使用了装饰器模式。 严格来说是原生 IO 的装饰器过于庞大,而 Okio 的装饰器更加精简。

    比如原生 IO 常用的流就有这么多:

    • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;

    • 基础接口(区分字节流和字符流): InputStream / OutputStream 与 Reader / Writer;

    • 缓存流: BufferedInputStream / BufferedOutputStream 与 BufferedReader / BufferedWriter;

    • 基本类型: DataInputStream / DataOutputStream;

    • 字节数组和字符数组: ByteArrayInputStream / ByteArrayOutputStream 与 CharArrayReader / CharArrayWriter;

    • 此处省略一万个字。

    原生 IO 框架

    而这么多种流在 Okio 里还剩下多少呢?

    • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;
    • 基础接口: Source / Sink;
    • 缓存流: BufferedSource / BufferedSink。

    Okio 框架

    就问你服不服?

    而且你看哈,这些都是平时业务开发中最常见的基本类型,原生 IO 把它们都拆分开了,让问题复杂化了。反观 Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中基本的功能,而不再需要区分字节、字符、字节数组、字符数组、基础类型等等装饰器,确实让框架更加精简。

    BufferedSource.kt

    actual interface BufferedSource : Source, ReadableByteChannel {
    
        actual val buffer: Buffer
    
        // 读取 Int
        @Throws(IOException::class)
        actual fun readInt(): Int
    
        // 读取 String
        @Throws(IOException::class)
        fun readString(charset: Charset): String
    
        ...
    
        fun inputStream(): InputStream
    }
    

    BufferedSink.kt

    actual interface BufferedSink : Sink, WritableByteChannel {
    
        actual val buffer: Buffer
    
        // 写入 Int
        @Throws(IOException::class)
        actual fun writeInt(i: Int): BufferedSink
    
        // 写入 String
        @Throws(IOException::class)
        fun writeString(string: String, charset: Charset): BufferedSink
    
        ...
    
        fun outputStream(): OutputStream
    }
    

    2.4 RealBufferedSink 与 RealBufferedSource

    BufferedSource 和 BufferedSink 还是接口,它们的真正的实现类是 RealBufferedSource 和 RealBufferedSink。可以看到,在实现类中会创建一个 Buffer 缓冲区,在输入和输出的时候,都会借助 “Buffer 缓冲区” 减少系统调用次数。

    RealBufferedSource.kt

    internal actual class RealBufferedSource actual constructor(
        // 装饰器模式
        @JvmField actual val source: Source
    ) : BufferedSource {
    
        // 创建输入缓冲区
        @JvmField val bufferField = Buffer()
    
        // 带缓冲地读取(全部数据)
        override fun readString(charset: Charset): String {
            buffer.writeAll(source)
            return buffer.readString(charset)
        }
    
        // 带缓冲地读取(byteCount)
        override fun readString(byteCount: Long, charset: Charset): String {
            require(byteCount)
            return buffer.readString(byteCount, charset)
        }
    }
    

    RealBufferedSink.kt

    internal actual class RealBufferedSink actual constructor(
        // 装饰器模式
        @JvmField actual val sink: Sink
    ) : BufferedSink {
    
        // 创建输出缓冲区
        @JvmField val bufferField = Buffer()
    
        // 带缓冲地写入(全部数据)
        override fun writeString(string: String, charset: Charset): BufferedSink {
            buffer.writeString(string, charset)
            return emitCompleteSegments()
        }
    
        // 带缓冲地写入(beginIndex - endIndex)
        override fun writeString(
            string: String,
            beginIndex: Int,
            endIndex: Int,
            charset: Charset
        ): BufferedSink {
            buffer.writeString(string, beginIndex, endIndex, charset)
            return emitCompleteSegments()
        }
    }
    

    至此,Okio 基本框架分析结束,用一张图总结:

    Okio 框架


    3. Okio 的缓冲区设计

    3.1 使用缓冲区减少系统调用次数

    在操作系统中,访问磁盘和网卡等 IO 操作需要通过系统调用来执行。系统调用本质上是一种软中断,进程会从用户态陷入内核态执行中断处理程序,完成 IO 操作后再从内核态切换回用户态。

    可以看到,系统调用存在上下文切换的性能损耗。为了减少系统调用次数,应用层往往会采用缓冲区策略:

    以 Java 原生 IO BufferedInputStream 为例,会通过一个 byte[] 数组作为数据源的输入缓冲,每次读取数据时会读取更多数据到缓冲区中:

    • 如果缓冲区中存在有效数据,则直接从缓冲区数据读取;
    • 如果缓冲区不存在有效数据,则先执行系统调用填充缓冲区(fill),再从缓冲区读取数据;
    • 如果要读取的数据量大于缓冲区容量,就会跳过缓冲区直接执行系统调用。

    输出流 BufferedOutputStream 也类似,输出数据时会优先写到缓冲区,当缓冲区满或者手动调用 flush() 时,再执行系统调用写出数据。

    伪代码

    // 1. 输入
    fun read(byte[] dst, int len) : Int {
        // 缓冲区有效数据量
        int avail = count - pos
        if(avail <= 0) {
            if(len >= 缓冲区容量) {
                // 直接从输入流读取
                read(输入流 in, dst, len)
            }
            // 填充缓冲区
            fill(数据源 in, 缓冲区)
        }
        // 本次读取数据量,不超过可用容量
        int cnt = (avail < len) ? avail : len?
        read(缓冲区, dst, cnt)
        // 更新缓冲区索引
        pos += cnt
        return cnt
    }
    
    // 2. 输出
    fun write(byte[] src, len) {
        if(len > 缓冲区容量) {
            // 先将缓冲区写出
            flush(缓冲区)
            // 直接写出数据
            write(输出流 out, src, len)
        }
        // 缓冲区剩余容量
        int left = 缓冲区容量 - count
        if(len > 缓冲区剩余容量) {
            // 先将缓冲区写出
            flush(缓冲区)
        }
        // 将数据写入缓冲区
        write(缓冲区, src, len)
        // 更新缓冲区已添加数据容量
        count += len
    }
    

    3.2 缓冲区的副作用

    的确,缓冲区策略能有效地减少系统调用次数,不至于读取一个字节都需要执行一次系统调用,大多数情况下表现良好。 但考虑一种 “双流操作” 场景,即从一个输入流读取,再写入到一个输出流。回顾刚才讲的缓存策略,此时的数据转移过程为:

    • 1、从输入流读取到缓冲区;
    • 2、从输入流缓冲区拷贝到 byte[](拷贝)
    • 3、将 byte[] copy 到输出流缓冲区(拷贝);
    • 4、将输出流缓冲区写入到输出流。

    如果这两个流都使用了缓冲区设计,那么数据在这两个内存缓冲区之间相互拷贝,就显得没有必要。

    3.3 Okio 的 Buffer 缓冲区

    Okio 当然也有缓冲区策略,如果没有就会存在频繁系统调用的问题。

    Buffer 是 RealBufferedSource 和 RealBufferedSink 的数据缓冲区。虽然在实现上与原生 BufferedInputStream 和 BufferedOutputStream 不一样,但在功能上是一样的。区别在于:

    • 1、BufferedInputStream 中的缓冲区是 “一个固定长度的字节数组” ,数据从一个缓冲区转移到另一个缓冲区需要拷贝;

    • 2、Buffer 中的缓冲区是 “一个 Segment 双向循环链表” ,每个 Segment 对象是一小段字节数组,依靠 Segment 链表的顺序组成逻辑上的连续数据。这个 Segment 片段是 Okio 高效的关键。

    Buffer.kt

    actual class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {
    
        // 缓冲区(Segment 双向链表)
        @JvmField internal actual var head: Segment? = null
    
        // 缓冲区数据量
        @get:JvmName("size")
        actual var size: Long = 0L
            internal set
    
        override fun buffer() = this
    
        actual override val buffer get() = this
    }
    

    对比 BufferedInputStream:

    BufferedInputStream.java

    public class BufferedInputStream extends FilterInputStream {
    
        // 缓冲区的默认大小(8KB)
        private static int DEFAULT_BUFFER_SIZE = 8192;
    
        // 输入缓冲区(固定长度的数组)
        protected volatile byte buf[];
    
        // 有效数据起始位,也是读数据的起始位
        protected int pos;
    
        // 有效数据量,pos + count 是写数据的起始位
        protected int count;
    
        ...
    }
    

    3.4 Segment 片段与 SegmentPool 对象池

    Segment 中的字节数组是可以 “共享” 的,当数据从一个缓冲区转移到另一个缓冲区时,可以共享数据引用,而不一定需要拷贝数据。

    Segment.kt

    internal class Segment {
    
        companion object {
            // 片段的默认大小(8KB)
            const val SIZE = 8192
            // 最小共享阈值,超过 1KB 的数据才会共享
            const val SHARE_MINIMUM = 1024
        }
    
        // 底层数组
        @JvmField val data: ByteArra
        // 有效数据的起始位,也是读数据的起始位
        @JvmField var pos: Int = 0
        // 有效数据的结束位,也是写数据的起始位
        @JvmField var limit: Int = 0
        // 共享标记位
        @JvmField var shared: Boolean = false
        // 宿主标记位
        @JvmField var owner: Boolean = false
        // 后续指针
        @JvmField var next: Segment? = null
        // 前驱指针
        @JvmField var prev: Segment? = null
    
        constructor() {
            // 默认构造 8KB 数组(为什么默认长度是 8KB)
            this.data = ByteArray(SIZE)
            // 宿主标记位
            this.owner = true
            // 共享标记位
            this.shared = false
        }
    }
    

    另外,Segment 还使用了对象池设计,被回收的 Segment 对象会缓存在 SegmentPool 中。SegmentPool 内部维护了一个被回收的 Segment 对象单链表,缓存容量的最大值是 MAX_SIZE = 64 * 1024,也就相当于 8 个默认 Segment 的长度:

    SegmentPool.kt

    // object:全局单例
    internal actual object SegmentPool {
    
        // 缓存容量
        actual val MAX_SIZE = 64 * 1024
    
        // 头节点
        private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)
    
        ...
    }
    

    Segment 示意图


    4. 总结

    • 1、Okio 将原生 IO 多种基础装饰器聚合在 BufferedSource 和 BufferedSink,使得框架更加精简;
    • 2、为了减少系统调用次数的同时,应用层 IO 框架会使用缓存区设计。而 Okio 使用了基于共享 Segment 的缓冲区设计,减少了在缓冲区间转移数据的内存拷贝;
    • 3、Okio 弥补了部分 IO 操作不支持超时检测的缺陷,而且 Okio 不仅支持单次 IO 操作的超时检测,还支持包含多次 IO 操作的复合任务超时检测。

    关于 Okio 超时机制的详细分析,我们在 下一篇文章 里讨论。请关注。


    参考资料

    相关文章

      网友评论

        本文标题:Android IO 框架 Okio 的实现原理,到底哪里 OK

        本文链接:https://www.haomeiwen.com/subject/djehkdtx.html