本文基于版本
compile 'com.squareup.okio:okio:2.2.2'
前段时间看OKHttp,发现OKHttp是基于okio的,之前的时候写文件操作很多,当时也知道okio,但是并没有实际使用,现在也赶到这儿了,就啃一下okio这块骨头
二话不必说,看源码
Okio是okio开放出来的接口,理论上来说所有的调用的入口都是Okio这个类。Okio类的注释也很简单,提供了使用Okio必要的API。
/** Essential APIs for working with Okio. */
public final class Okio {
private Okio() {
}
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
/** Returns a sink that writes to {@code out}. */
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {out.flush();}
@Override public void close() throws IOException {out.close();}
@Override public Timeout timeout() {return timeout;}
@Override public String toString() { return "sink(" + out + ")";}
};
}
}
Okio类基本上是返回了两种类型,Sink跟Source,我们知道这两个东西在Okio中就相当于java io的OutputStream跟InputStream,最基础的IO接口。然后使用buffer方法得到BufferedSource或者BufferedSink。然后调用BufferedSource或者BufferedSink的方法,我们到BufferedSource中可以看到基本上所有的RealBufferedSource的功能都是通过Buffer类来实现的,我们来看下Buffer类和他的基础类Segment。
- Buffer
- Segment
- SegmentPool
这几个是看下去的先决条件,So,先分析这几个的作用。从简单的看起,先看SegmentPool:
final class SegmentPool {
/** The maximum number of bytes to pool. */
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
/** Singly-linked list of segments. */
static @Nullable Segment next;
/** Total bytes in this pool. */
static long byteCount;
private SegmentPool() {
}
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
/*segment加到单链表的最前面*/
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;//byteCount加上一个Segment的size
segment.next = next;//新加进来的segment的next赋值为当前pool的next
segment.pos = segment.limit = 0;//pos limit都赋值为0
next = segment;//当前pool的next赋值为加进来的segment,也就是加到最前面
}
}
}
SegmentPool是保存无用的Segments的单链表,用来避免内存抖动和零填充,是个线程安全的静态单例。两个方法:
- recycle(Segment segment) //加segment进来
- take() //取一个segment走
代码很简单,in的时候最新的放到链表最前面,out的时候把之前面的取走。用一个叫做next的Segment标记应该取走哪一个。最大容量为64 * 1024,而每个segment的大小是8 * 1024,所以最多能装8个Segment。
SegmentPool就是单纯的一个避免新建Segment的容器,需要Segment的时候从这里取一个就行了,就不用new一个Segment。
new Segment的坏处是当Java开辟一块新的空间的时候,runtime必须把这块空间的所有byte都置为零才能返回给你,而很多时候这是不必要的。SegmentPool就是Okio来避免这些操作的处理。
Segment是缓存的一个片段,buffer中的segment是一个循环链表,pool中的segment是单链表。
final class Segment {
/** The size of all segments in bytes. */
static final int SIZE = 8192;//segment的size
/** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
static final int SHARE_MINIMUM = 1024;//最小分割size
final byte[] data;//存储的数据
/** The next byte of application data byte to read in this segment. */
int pos;//能读取到的第一个byte,也就是byte数据起点
/** The first byte of available data ready to be written to. */
int limit;//能被写入的第一个byte的位置,也就是最后一个数据的位置
/** True if other segments or byte strings use the same byte array. */
boolean shared;//是否共享
/** True if this segment owns the byte array and can append to it, extending {@code limit}. */
boolean owner;//能否被修改
/** Next segment in a linked or circularly-linked list. */
Segment next;//下一个Segment
/** Previous segment in a circularly-linked list. */
Segment prev;//上一个Segment
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.shared = shared;
this.owner = owner;
}
final Segment sharedCopy() {//返回当前的数据拷贝,new的Segment是shared的
shared = true;
return new Segment(data, pos, limit, true, false);
}
/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {//区别于sharedCopy。返回一份clone
return new Segment(data.clone(), pos, limit, false, true);
}
/**
* Removes this segment of a circularly-linked list and returns its successor.
* Returns null if the list is now empty.
*/
public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
public final Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
/**
* Splits this head of a circularly-linked list into two segments. The first
* segment contains the data in {@code [pos..pos+byteCount)}. The second
* segment contains the data in {@code [pos+byteCount..limit)}. This can be
* useful when moving partial segments from one buffer to another.
*
* <p>Returns the new head of the circularly-linked list.
*/
public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
//public static void arraycopy(Object src, int srcPos, Object dest, int destPos, int length)
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
...
}
我们捡重点说,Segment主要数据存储在data中,这个data是一个size是8192的byte数组。pos是有效数据开始位置,limit是结束位置:
data数据开始结束
我们看上面的几个主要方法来理解它的主要参数:
pop和push很简单就是简单的单链表操作。
pop把自己从链表中拆出去,并吧自己的pre和next置null。
push把新Segment放到自己后面。
pop push图解
我们看split(int byteCount),byteCount是想要分隔的数量,先if判断。小于零或者大于实际data数据肯定是不行的。
判断是否大于SHARE_MINIMUM,是的话就直接返回copy,share自己的数据。反之,从SegmentPool中取一个unused的Segment,copy byteCount以内的数据过去(把自己data的pos开始的数据拷贝到prefix.data的0开始的位置,数量是byteCount)。然后把prefix.limit置成pos+byteCount,自己的数据开始位置pos+=byteCount,然后把新的Segment放到自己的prev后面,也就是放到prev和自己之间。 spilt方法图解 点图片看大图
Segment大致就是这样。然后来看Buffer,Buffer是个大类,我们先从简单用法入手,假如我们有个txt文件,我们要把这里面的内容读出来并打印,我们可能会这样调用
private static void readSimpleString(File file) {
try {
Source source = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(source);
String read = bufferedSource.readString(Charset.forName("utf-8"));
LogUtil.d("read->" + read);
} catch (IOException e) {
e.printStackTrace();
}
}
我们实际上在拿着BufferedSource在做操作,我们去看BufferedSource的readString
@Override public String readString(Charset charset) throws IOException {
if (charset == null) throw new IllegalArgumentException("charset == null");
buffer.writeAll(source);
return buffer.readString(charset);
}
@Override public String readString(long byteCount, Charset charset) throws IOException {
require(byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
return buffer.readString(byteCount, charset);
}
我们看实际上是调了buffer的readString,我们看Buffer类:
@Override public String readString(Charset charset) {
try {
return readString(size, charset);
} catch (EOFException e) {
throw new AssertionError(e);
}
}
@Override public String readString(long byteCount, Charset charset) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
if (byteCount == 0) return "";
Segment s = head;
if (s.pos + byteCount > s.limit) {//如果大于head的limit说明包含多个segment,调用readByteArray
// If the string spans multiple segments, delegate to readBytes().
return new String(readByteArray(byteCount), charset);
}
//一个Segment就能搞定的情况
String result = new String(s.data, s.pos, (int) byteCount, charset);
s.pos += byteCount;
size -= byteCount;
//这个Segment已经读完了,就扔到SegmentPool里面并且得到新的head
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return result;
}
我们看判断了count,如果count小于head的limit,也就是一个segment就能搞定就完事儿了,当然大部分情况一个Segment是搞不定了,那我们继续往下看readByteArray:
@Override public byte[] readByteArray(long byteCount) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
byte[] result = new byte[(int) byteCount];
readFully(result);
return result;
}
new了一个count size的byte数组,调用了readFully
@Override public void readFully(byte[] sink) throws EOFException {
int offset = 0;
while (offset < sink.length) {
int read = read(sink, offset, sink.length - offset);
if (read == -1) throw new EOFException();
offset += read;
}
}
这里用了while循环,使用offest作为判断条件,offest不到length不罢休。这里我们继续看到read里面
@Override public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
//找到当前的head Segment
Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);//得到需要拷贝的数量,是head的有效数据容量还是byteCount
System.arraycopy(s.data, s.pos, sink, offset, toCopy);//数据拷贝
s.pos += toCopy;//head 的pos加上已经拷贝的数量
size -= toCopy;//需要读的总的数量减去已经读到的
if (s.pos == s.limit) {//读过的segment扔到SegmentPool缓存或者丢掉
head = s.pop();//s的next作为新的head
SegmentPool.recycle(s);
}
return toCopy;
}
read方法的三个参数(目标byte数组,已经读了多少,还要读多少),这里读取的逻辑就清晰了,当前head的数据拷贝到目标byte数组sink里面,使用上面讲过的Segment.pop()方法找到next作为新的head等while继续读,读完为止。
这样我们就看完了Okio读一个String的源码。
下面我们看下写:
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
@Override public Buffer buffer() {
return buffer;
}
...
@Override public BufferedSink writeString(String string, Charset charset) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeString(string, charset);
return emitCompleteSegments();
}
@Override public BufferedSink writeByte(int b) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeByte(b);
return emitCompleteSegments();
}
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
...
}
我们看每个write都是调用到了emitCompleteSegments()这个方法,先获得要写入的count
/**
* Returns the number of bytes in segments that are not writable. This is the
* number of bytes that can be flushed immediately to an underlying sink
* without harming throughput.
*/
public final long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
再调用sink.write写入,我们应该还记得Okio里获得获取Sink的时候都是返回的一个匿名内部类,这里实现的write
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
还是使用了Segment,循环写入到os里面,完事儿调flush close就写进去了~
这里我们先总结一下看过的东西:
- Okio类是我们调用的总的入口,我们得到BufferedSource做读取,BufferedSource最终使用了Buffer类来做实际逻辑操作。
- Segment是实际存储byte数组的类,并提供了几个基本方法
2.1. pop:链表中弹出自己并返回下一个Segment
2.2. push:添加新的Segment到自己后面
2.3. spilt:根据SHARE_MINIMUM 切割一个新的Segment出来并放到链表里面(是否共享) - SegmentPool保存已经使用完的Segment来避免内存抖动和零填充(JVM开辟内存时置零的消耗),SegmentPool只能存8个Segment,多了直接丢掉。
3.1. recycle(Segment segment) //加新的segment进来
3.2. take() //取一个segment走 - Buffer类是实际做读写逻辑的类,readString的时候是使用readFully里面的while循环read,read里面使用Segment的pop方法不断获取next的Segment并读取data的内容一直到结束。
网友评论