okio 超时机制的原理(基于okio-2.7.0)
1.1 干什么用的
了解一项技术之前肯定要先知道它是解决什么问题的,也就是干什么用的。
我们先看一下实现超时机制的核心类 AsyncTimeout
的类注释,就知道超时机制解决了什么问题:
AsyncTimeout 使用了后台线程,能够在超时发生的时候准确的采取行动。使用它可以实现那些
本身不支持超时时间的情况。例如:Socket会在 write 方法执行的时候阻塞,且不支持超时时间。
总结:实现了一套超时机制,能够对于耗时、阻塞方法进行超时控制,其实是一套通用的机制,后边会演示如何使用。
把类 AsyncTimeout
的注释剩下部分补全:
子类应该事先 [timedOut]方法,当超时发生时去采取行动。这个方法会被共享的WatchDog后台线程调用。
所以不应该在[timedOut]中做耗时的操作,否则可能会影响其他超时时间的通知。
1.2 怎么使用
写一段代码,来演示下如何使用okio中的超时机制。
//定义一个普通业务类
open class SomeClass {
@Volatile var isInterrupted = false
//做一些耗时/阻塞的事情
open fun doSomethingBlock() {
println("start~")
while (true) {
if (isInterrupted) { //可以人为打断,如果被打断,则抛出异常提示上层使用者
throw InterruptedException("interrupted")
}
}
}
}
// 定义超时时间实现类
class SomeClassAsyncTimeout(val obj: SomeClass) : AsyncTimeout() {
//定义超时时间回调函数的处理逻辑。一般的情况是,如果超时了,打断方法调用
override fun timedOut() {
println("this operation is timed out")
println("timedOut() 所在的线程: ${Thread.currentThread().name}")
obj.isInterrupted = true //打断耗时方法
}
//生成一个 SomeClass的代理对象
fun delegate(): SomeClass {
return object : SomeClass() {
// 先执行代理方法
override fun doSomethingBlock() {
withTimeout { obj.doSomethingBlock() }
}
}
}
}
fun main() {
val someObj = SomeClassAsyncTimeout(SomeClass())
.apply { timeout(4, TimeUnit.SECONDS) } //设置超时时间
.delegate() //返回代理对象
try{
someObj.doSomethingBlock() //执行代理对象的耗时方法
}catch (e: Exception){
e.printStackTrace()
println("已经超时")
}
println("main - end")
}
我们看一下执行结果:
执行结果可以发现,原本没有超时机制的方法,可以动态添加超时的行为。
1.3 超时怎么实现的
1.3.1 使用排序后的单链表组织超时时间
open class AsyncTimeout : Timeout() {
//...
/** The next node in the linked list. */
private var next: AsyncTimeout? = null
//超时后的回调
protected open fun timedOut() {}
companion object {
private var head: AsyncTimeout? = null
}
//...
}
AsyncTimeout
有一个next
,其 Companion
对象中有一个 head
,WatchDog
后台线程 处理 一个由 AsyncTimeout
元素组成的一个单链表,这个单链表按照超时时间排序(链表最前边的最先超时)。对于单链表的操作 使用synchronized(syncTimeout.class)
来保证操作互斥同步。
head
的 next
节点是第一个元素。第一个元素是下一个将要超时的元素。 初始化情况下head = null
,watchDog
线程启动后,head会被创建(只是为了占位,真正数据节点是下一个元素),当watchDog
线程空闲了 AsyncTimeout.IDLE_TIMEOUT_MILLIS
后会结束,同时 head = null
1.3.2 后台线程WatchDog到底在干嘛
代码很短,我们把代码贴出来,一行一行分析。
private class Watchdog internal constructor() : Thread("Okio Watchdog") {
init {
isDaemon = true // 设置为后台线程
}
override fun run() {
while (true) {
try {
var timedOut: AsyncTimeout? = null
// 访问链表结构时统一上锁
synchronized(AsyncTimeout::class.java) {
// 从单链表中获取下一个超时的节点。
timedOut = awaitTimeout()
// 当前队列中(单链表)中已经空了,直接让后台线程退出
// 当下次有节点插入队列的时候,创建一个新的WatchDog吧
if (timedOut === head) {
head = null //置空头结点 (头结点和单链表是同时存在的)
return // 结束线程
}
}
// 如果节点存在则执行回调
timedOut?.timedOut()
} catch (ignored: InterruptedException) {
}
}
}
}
open class AsyncTimeout : Timeout() {
companion object {
// 从队列中(单链表)中获取一下一个超时的元素
@Throws(InterruptedException::class)
internal fun awaitTimeout(): AsyncTimeout? {
// 获取下一个元素(上边说了,head和 WatchDog同时存在的,只有WatchDog自己会把 head 置空,所以此时head一定不为空)
val node = head!!.next
// 如果下一个元素为空,wait 空闲的时间(60s默认)后关闭线程,又或者因为队列前边插入了新的元素,会被唤醒。
if (node == null) {
val startNanos = System.nanoTime()
(AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
head // 线程空闲等待的事件结束,线程后续会结束。
} else {
null // 有新的元素插入在队列的最前端了,结束本次循环
}
}
// 获取这个这个节点的剩余超时时间
var waitNanos = node.remainingNanos(System.nanoTime())
// 此时队列还不存在超时的元素
if (waitNanos > 0) {
// 线程wait 刚才算出来的剩余的时间后在醒来。
val waitMillis = waitNanos / 1000000L
waitNanos -= waitMillis * 1000000L
(AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
return null //醒来后直接结束本次循环
}
// 这个node节点超时了,从队列中删除此结点
head!!.next = node.next
node.next = null
return node
}
}
}
小结
:WatchDog 这个后台线程,持续从超时队列中获取下一个超时的元素(这个过程是做了多线程同步的),
-
如果队列中没有元素了,在等待了一段时间后,线程会结束,直到又有新的节点插入队列时才会创建新的WatchDog后台线程;如果在空闲等待的这一段时间里,有元素插入到队列中,WatchDog就会被唤醒,继续 持续从超时队列中获取下一个超时的元素
-
如果队列中有元素,获取第一个元素(最前边的),如果发现此结点没有超时,算出还有多久超时,则再
wait
这一小段的时间后再来处理,当然这个过程中也有可能被唤醒,因为也有可能有新的节点插入了最气边; 如果发现此结点已超时,则直接移除队列,然后进行timedOut
回调。
所以 WatchDog大部分时间都是在 waiting 在状态,不消耗 cpu 资源。
1.3.3 用户的方法和 AsyncTimeout
进行关联
讲了这么大一堆,到底如何把用户的方法扩展成带有超时机制的呢。看上边关于扩展 SomeClass
的例子中,通过 withTimeout
就会让原来的方法具有超时性。
分析一下 withTimeout
/**
block:就是具体耗时的操作
在执行 block 之前,先执行enter,block执行之后,执行exit,如果超时发生了,会抛出一个Exception,来自 [newTimeoutException]
*/
inline fun <T> withTimeout(block: () -> T): T {
var throwOnTimeout = false
enter() // *****
try {
val result = block()
throwOnTimeout = true
return result
} catch (e: IOException) { // 如果block()调用过程中抛出了Exception
//如果已经超时,会包装成新的超时Exception,如果没有超时,直接抛出方法本身的异常
throw if (!exit()) e else `access$newTimeoutException`(e)
} finally { // 即便block没有抛出异常
val timedOut = exit()
//判断超时后,依然会告知用户
if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null)
}
}
围绕 用户耗时方法的前后,有连个方法 enter
、exit
enter
: 会将耗时操作对应 AsyncTimeout
节点按照超时时间插入到队列的合适位置。
exit
: 如果此AsyncTimeout
节点还在队列中,则删除此节点(因为任务已完成,不需要在被监听)。同时会返回此 AsyncTimeout
节点是否已经超时(如果节点中已经没有此节点了,这说明此节点被WatchDog删除了,也就是超时了)。
着重看那一下入队列 和 出队列的方法,其它细节最好自己看源码。
入队列
:
private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
synchronized(AsyncTimeout::class.java) {
// 当head被创建的时候,启动WatchDog线程
if (head == null) {
head = AsyncTimeout()
Watchdog().start()
}
// 根据用户设置的 timeoutNanos、deadlineNanoTime 转化成 timeoutAt
val now = System.nanoTime()
if (timeoutNanos != 0L && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
// around, minOf() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
} else if (timeoutNanos != 0L) {
node.timeoutAt = now + timeoutNanos
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime()
} else {
throw AssertionError()
}
// Insert the node in sorted order.
val remainingNanos = node.remainingNanos(now)
var prev = head!!
while (true) {
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { // 找到合适的超时时间位置落座
//插入节点
node.next = prev.next
prev.next = node
if (prev === head) { // 如果此时是第一个元素(有可能是第一次放,也有可能是好几轮以后了)
// Wake up the watchdog when inserting at the front.
// 记得这时候唤醒WatchDog
(AsyncTimeout::class.java as Object).notify()
}
break
}
prev = prev.next!! //一直往后找
}
}
}
出队列
:
/** 返回true,如果超时已经发生 */
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
synchronized(AsyncTimeout::class.java) {
// Remove the node from the linked list.
// 从但链表里删除此节点
var prev = head
while (prev != null) {
if (prev.next === node) {
prev.next = node.next
node.next = null
return false
}
prev = prev.next
}
// The node wasn't found in the linked list: it must have timed out!
// 此节点没有找到,说明它对应的耗时方法已经超时了
return true
}
}
至此,整个处理超时的过程比较明朗了。
timedOut 这个方法会在超时后被WatchDog回调。在这个回调里,可以打断耗时方法的执行,从而让耗时方法抛出异常终止方法继续执行,告知使用者已超时;也可以不坐任何操作,等到方法执行完成以后,也会抛出异常告知使用者已经超时。 这里说的打断耗时方法的执行,前提是耗时方法支持被打断,否则也无法中断程序。 超时机制的设置本来是为了给okhttp用的,例如 Socket 的write 方法没有超时支持,可以通过 AsyncTimeout 监听超时之后调用 socket#close 方法继而中断 write 操作。
注意 WatchDog 是个 Daemon Thread。 Java的线程分为两种:User Thread(用户线程)、DaemonThread(守护线程)。只要当前JVM实例中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作,Daemon作用是为其他线程提供便利服务,守护线程最典型的应用就是GC(垃圾回收器),他就是一个很称职的守护者。 用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。
这里涉及到 wait 、 notify 、synchronized 的多线程知识,是很好的一次多线程学习机会。
从本篇文章中可以看到 等待/通知的经典范式。分为两部分:等待方(消费者) 和 通知方(生产者)
等待方遵循原则:
1、获取对象的锁
2、如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件。
3、条件满足则执行对应的逻辑。
通知方遵循原则:
1、获得对象的锁
2、改变条件
3、通知所有等待在对象上的线程。
网友评论