美文网首页
okio 超时机制(很好的一次多线程编程学习机会)

okio 超时机制(很好的一次多线程编程学习机会)

作者: 一个追寻者的故事 | 来源:发表于2020-08-27 15:01 被阅读0次

okio 超时机制的原理(基于okio-2.7.0)

1.1 干什么用的

了解一项技术之前肯定要先知道它是解决什么问题的,也就是干什么用的。

我们先看一下实现超时机制的核心类 AsyncTimeout 的类注释,就知道超时机制解决了什么问题:

AsyncTimeout 使用了后台线程,能够在超时发生的时候准确的采取行动。使用它可以实现那些
本身不支持超时时间的情况。例如:Socket会在 write 方法执行的时候阻塞,且不支持超时时间。

总结:实现了一套超时机制,能够对于耗时、阻塞方法进行超时控制,其实是一套通用的机制,后边会演示如何使用。

把类 AsyncTimeout 的注释剩下部分补全:

子类应该事先 [timedOut]方法,当超时发生时去采取行动。这个方法会被共享的WatchDog后台线程调用。
所以不应该在[timedOut]中做耗时的操作,否则可能会影响其他超时时间的通知。

1.2 怎么使用

写一段代码,来演示下如何使用okio中的超时机制。

//定义一个普通业务类
open class SomeClass {
    @Volatile var isInterrupted = false

    //做一些耗时/阻塞的事情
    open fun doSomethingBlock() {
        println("start~")
        while (true) {
            if (isInterrupted) {  //可以人为打断,如果被打断,则抛出异常提示上层使用者
                throw InterruptedException("interrupted")
            }
        }
    }
}

// 定义超时时间实现类
class SomeClassAsyncTimeout(val obj: SomeClass) : AsyncTimeout() {
    //定义超时时间回调函数的处理逻辑。一般的情况是,如果超时了,打断方法调用
    override fun timedOut() {
        println("this operation is timed out")
        println("timedOut() 所在的线程: ${Thread.currentThread().name}")
        obj.isInterrupted = true    //打断耗时方法
    }
    
   //生成一个 SomeClass的代理对象
    fun delegate(): SomeClass {
        return object : SomeClass() {
            // 先执行代理方法
            override fun doSomethingBlock() {
                withTimeout { obj.doSomethingBlock() }
            }
        }
    }
}

fun main() {
    val someObj = SomeClassAsyncTimeout(SomeClass())
        .apply { timeout(4, TimeUnit.SECONDS) }  //设置超时时间
        .delegate()  //返回代理对象

    try{
        someObj.doSomethingBlock()    //执行代理对象的耗时方法
    }catch (e: Exception){
        e.printStackTrace()
        println("已经超时")
    }
    println("main - end")
}

我们看一下执行结果:

执行结果

可以发现,原本没有超时机制的方法,可以动态添加超时的行为。

1.3 超时怎么实现的

1.3.1 使用排序后的单链表组织超时时间
open class AsyncTimeout : Timeout() {
     //...
     /** The next node in the linked list.  */
     private var next: AsyncTimeout? = null

    //超时后的回调
     protected open fun timedOut() {}

     companion object {
          private var head: AsyncTimeout? = null
     }
     //...
}

AsyncTimeout 有一个next,其 Companion 对象中有一个 headWatchDog后台线程 处理 一个由 AsyncTimeout 元素组成的一个单链表,这个单链表按照超时时间排序(链表最前边的最先超时)。对于单链表的操作 使用synchronized(syncTimeout.class) 来保证操作互斥同步。

headnext 节点是第一个元素。第一个元素是下一个将要超时的元素。 初始化情况下head = nullwatchDog线程启动后,head会被创建(只是为了占位,真正数据节点是下一个元素),当watchDog线程空闲了 AsyncTimeout.IDLE_TIMEOUT_MILLIS 后会结束,同时 head = null

超时时间单链表示意图
1.3.2 后台线程WatchDog到底在干嘛

代码很短,我们把代码贴出来,一行一行分析。

private class Watchdog internal constructor() : Thread("Okio Watchdog") {
    init {
      isDaemon = true  // 设置为后台线程
    }

    override fun run() {
      while (true) {
        try {
          var timedOut: AsyncTimeout? = null
          // 访问链表结构时统一上锁
          synchronized(AsyncTimeout::class.java) {
            // 从单链表中获取下一个超时的节点。
            timedOut = awaitTimeout()

            // 当前队列中(单链表)中已经空了,直接让后台线程退出
            // 当下次有节点插入队列的时候,创建一个新的WatchDog吧
            if (timedOut === head) {
              head = null    //置空头结点 (头结点和单链表是同时存在的)
              return    // 结束线程
            }
          }

          // 如果节点存在则执行回调
          timedOut?.timedOut()
        } catch (ignored: InterruptedException) {
        }
      }
    }
  }


open class AsyncTimeout : Timeout() {

  companion object {
    // 从队列中(单链表)中获取一下一个超时的元素
    @Throws(InterruptedException::class)
    internal fun awaitTimeout(): AsyncTimeout? {
      // 获取下一个元素(上边说了,head和 WatchDog同时存在的,只有WatchDog自己会把 head 置空,所以此时head一定不为空)
      val node = head!!.next

       // 如果下一个元素为空,wait 空闲的时间(60s默认)后关闭线程,又或者因为队列前边插入了新的元素,会被唤醒。
      if (node == null) {
        val startNanos = System.nanoTime()
        (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
        return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
          head // 线程空闲等待的事件结束,线程后续会结束。
        } else {
          null // 有新的元素插入在队列的最前端了,结束本次循环
        }
      }
      
      // 获取这个这个节点的剩余超时时间
      var waitNanos = node.remainingNanos(System.nanoTime())

      // 此时队列还不存在超时的元素
      if (waitNanos > 0) {
        // 线程wait 刚才算出来的剩余的时间后在醒来。
        val waitMillis = waitNanos / 1000000L
        waitNanos -= waitMillis * 1000000L
        (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
        return null    //醒来后直接结束本次循环
      }

      // 这个node节点超时了,从队列中删除此结点
      head!!.next = node.next
      node.next = null
      return node
    }
  }
}

小结:WatchDog 这个后台线程,持续从超时队列中获取下一个超时的元素(这个过程是做了多线程同步的),

  1. 如果队列中没有元素了,在等待了一段时间后,线程会结束,直到又有新的节点插入队列时才会创建新的WatchDog后台线程;如果在空闲等待的这一段时间里,有元素插入到队列中,WatchDog就会被唤醒,继续 持续从超时队列中获取下一个超时的元素

  2. 如果队列中有元素,获取第一个元素(最前边的),如果发现此结点没有超时,算出还有多久超时,则再 wait 这一小段的时间后再来处理,当然这个过程中也有可能被唤醒,因为也有可能有新的节点插入了最气边; 如果发现此结点已超时,则直接移除队列,然后进行 timedOut 回调。

所以 WatchDog大部分时间都是在 waiting 在状态,不消耗 cpu 资源。

1.3.3 用户的方法和 AsyncTimeout 进行关联

讲了这么大一堆,到底如何把用户的方法扩展成带有超时机制的呢。看上边关于扩展 SomeClass 的例子中,通过 withTimeout 就会让原来的方法具有超时性。

分析一下 withTimeout

/**
block:就是具体耗时的操作
在执行 block 之前,先执行enter,block执行之后,执行exit,如果超时发生了,会抛出一个Exception,来自 [newTimeoutException] 
   */
  inline fun <T> withTimeout(block: () -> T): T {
    var throwOnTimeout = false
    enter()    // *****
    try {
      val result = block()
      throwOnTimeout = true
      return result
    } catch (e: IOException) {  // 如果block()调用过程中抛出了Exception
      //如果已经超时,会包装成新的超时Exception,如果没有超时,直接抛出方法本身的异常
      throw if (!exit()) e else `access$newTimeoutException`(e)
    } finally {  // 即便block没有抛出异常
      val timedOut = exit()      
      //判断超时后,依然会告知用户
      if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null)
    }
  }

围绕 用户耗时方法的前后,有连个方法 enterexit
enter : 会将耗时操作对应 AsyncTimeout节点按照超时时间插入到队列的合适位置。
exit: 如果此AsyncTimeout 节点还在队列中,则删除此节点(因为任务已完成,不需要在被监听)。同时会返回此 AsyncTimeout 节点是否已经超时(如果节点中已经没有此节点了,这说明此节点被WatchDog删除了,也就是超时了)。

着重看那一下入队列 和 出队列的方法,其它细节最好自己看源码。
入队列

private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
      synchronized(AsyncTimeout::class.java) {

        // 当head被创建的时候,启动WatchDog线程
        if (head == null) {
          head = AsyncTimeout()
          Watchdog().start()
        }

        // 根据用户设置的 timeoutNanos、deadlineNanoTime 转化成 timeoutAt
        val now = System.nanoTime()
        if (timeoutNanos != 0L && hasDeadline) {
          // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
          // around, minOf() is undefined for absolute values, but meaningful for relative ones.
          node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
        } else if (timeoutNanos != 0L) {
          node.timeoutAt = now + timeoutNanos
        } else if (hasDeadline) {
          node.timeoutAt = node.deadlineNanoTime()
        } else {
          throw AssertionError()
        }

        // Insert the node in sorted order.
        val remainingNanos = node.remainingNanos(now)
        var prev = head!!
        while (true) {
          if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {  // 找到合适的超时时间位置落座
            //插入节点
            node.next = prev.next    
            prev.next = node
            if (prev === head) {  // 如果此时是第一个元素(有可能是第一次放,也有可能是好几轮以后了)
              // Wake up the watchdog when inserting at the front.
             // 记得这时候唤醒WatchDog
              (AsyncTimeout::class.java as Object).notify()
            }
            break
          }
          prev = prev.next!!    //一直往后找
        }
      }
    }

出队列

 /** 返回true,如果超时已经发生 */
    private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
      synchronized(AsyncTimeout::class.java) {
        // Remove the node from the linked list.
       // 从但链表里删除此节点
        var prev = head
        while (prev != null) {
          if (prev.next === node) {
            prev.next = node.next
            node.next = null
            return false
          }
          prev = prev.next
        }

        // The node wasn't found in the linked list: it must have timed out!
        // 此节点没有找到,说明它对应的耗时方法已经超时了
        return true
      }
    }

至此,整个处理超时的过程比较明朗了。

timedOut 这个方法会在超时后被WatchDog回调。在这个回调里,可以打断耗时方法的执行,从而让耗时方法抛出异常终止方法继续执行,告知使用者已超时;也可以不坐任何操作,等到方法执行完成以后,也会抛出异常告知使用者已经超时。 这里说的打断耗时方法的执行,前提是耗时方法支持被打断,否则也无法中断程序。 超时机制的设置本来是为了给okhttp用的,例如 Socket 的write 方法没有超时支持,可以通过 AsyncTimeout 监听超时之后调用 socket#close 方法继而中断 write 操作。

注意 WatchDog 是个 Daemon Thread。 Java的线程分为两种:User Thread(用户线程)、DaemonThread(守护线程)。只要当前JVM实例中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作,Daemon作用是为其他线程提供便利服务,守护线程最典型的应用就是GC(垃圾回收器),他就是一个很称职的守护者。 用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。

这里涉及到 wait 、 notify 、synchronized 的多线程知识,是很好的一次多线程学习机会。

从本篇文章中可以看到 等待/通知的经典范式。分为两部分:等待方(消费者) 和 通知方(生产者)
等待方遵循原则:
1、获取对象的锁
2、如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件。
3、条件满足则执行对应的逻辑。
通知方遵循原则:
1、获得对象的锁
2、改变条件
3、通知所有等待在对象上的线程。

相关文章

网友评论

      本文标题:okio 超时机制(很好的一次多线程编程学习机会)

      本文链接:https://www.haomeiwen.com/subject/cflujktx.html