美文网首页
okio 超时机制(很好的一次多线程编程学习机会)

okio 超时机制(很好的一次多线程编程学习机会)

作者: 一个追寻者的故事 | 来源:发表于2020-08-27 15:01 被阅读0次

    okio 超时机制的原理(基于okio-2.7.0)

    1.1 干什么用的

    了解一项技术之前肯定要先知道它是解决什么问题的,也就是干什么用的。

    我们先看一下实现超时机制的核心类 AsyncTimeout 的类注释,就知道超时机制解决了什么问题:

    AsyncTimeout 使用了后台线程,能够在超时发生的时候准确的采取行动。使用它可以实现那些
    本身不支持超时时间的情况。例如:Socket会在 write 方法执行的时候阻塞,且不支持超时时间。
    

    总结:实现了一套超时机制,能够对于耗时、阻塞方法进行超时控制,其实是一套通用的机制,后边会演示如何使用。

    把类 AsyncTimeout 的注释剩下部分补全:

    子类应该事先 [timedOut]方法,当超时发生时去采取行动。这个方法会被共享的WatchDog后台线程调用。
    所以不应该在[timedOut]中做耗时的操作,否则可能会影响其他超时时间的通知。
    

    1.2 怎么使用

    写一段代码,来演示下如何使用okio中的超时机制。

    //定义一个普通业务类
    open class SomeClass {
        @Volatile var isInterrupted = false
    
        //做一些耗时/阻塞的事情
        open fun doSomethingBlock() {
            println("start~")
            while (true) {
                if (isInterrupted) {  //可以人为打断,如果被打断,则抛出异常提示上层使用者
                    throw InterruptedException("interrupted")
                }
            }
        }
    }
    
    // 定义超时时间实现类
    class SomeClassAsyncTimeout(val obj: SomeClass) : AsyncTimeout() {
        //定义超时时间回调函数的处理逻辑。一般的情况是,如果超时了,打断方法调用
        override fun timedOut() {
            println("this operation is timed out")
            println("timedOut() 所在的线程: ${Thread.currentThread().name}")
            obj.isInterrupted = true    //打断耗时方法
        }
        
       //生成一个 SomeClass的代理对象
        fun delegate(): SomeClass {
            return object : SomeClass() {
                // 先执行代理方法
                override fun doSomethingBlock() {
                    withTimeout { obj.doSomethingBlock() }
                }
            }
        }
    }
    
    fun main() {
        val someObj = SomeClassAsyncTimeout(SomeClass())
            .apply { timeout(4, TimeUnit.SECONDS) }  //设置超时时间
            .delegate()  //返回代理对象
    
        try{
            someObj.doSomethingBlock()    //执行代理对象的耗时方法
        }catch (e: Exception){
            e.printStackTrace()
            println("已经超时")
        }
        println("main - end")
    }
    

    我们看一下执行结果:

    执行结果

    可以发现,原本没有超时机制的方法,可以动态添加超时的行为。

    1.3 超时怎么实现的

    1.3.1 使用排序后的单链表组织超时时间
    open class AsyncTimeout : Timeout() {
         //...
         /** The next node in the linked list.  */
         private var next: AsyncTimeout? = null
    
        //超时后的回调
         protected open fun timedOut() {}
    
         companion object {
              private var head: AsyncTimeout? = null
         }
         //...
    }
    

    AsyncTimeout 有一个next,其 Companion 对象中有一个 headWatchDog后台线程 处理 一个由 AsyncTimeout 元素组成的一个单链表,这个单链表按照超时时间排序(链表最前边的最先超时)。对于单链表的操作 使用synchronized(syncTimeout.class) 来保证操作互斥同步。

    headnext 节点是第一个元素。第一个元素是下一个将要超时的元素。 初始化情况下head = nullwatchDog线程启动后,head会被创建(只是为了占位,真正数据节点是下一个元素),当watchDog线程空闲了 AsyncTimeout.IDLE_TIMEOUT_MILLIS 后会结束,同时 head = null

    超时时间单链表示意图
    1.3.2 后台线程WatchDog到底在干嘛

    代码很短,我们把代码贴出来,一行一行分析。

    private class Watchdog internal constructor() : Thread("Okio Watchdog") {
        init {
          isDaemon = true  // 设置为后台线程
        }
    
        override fun run() {
          while (true) {
            try {
              var timedOut: AsyncTimeout? = null
              // 访问链表结构时统一上锁
              synchronized(AsyncTimeout::class.java) {
                // 从单链表中获取下一个超时的节点。
                timedOut = awaitTimeout()
    
                // 当前队列中(单链表)中已经空了,直接让后台线程退出
                // 当下次有节点插入队列的时候,创建一个新的WatchDog吧
                if (timedOut === head) {
                  head = null    //置空头结点 (头结点和单链表是同时存在的)
                  return    // 结束线程
                }
              }
    
              // 如果节点存在则执行回调
              timedOut?.timedOut()
            } catch (ignored: InterruptedException) {
            }
          }
        }
      }
    
    
    open class AsyncTimeout : Timeout() {
    
      companion object {
        // 从队列中(单链表)中获取一下一个超时的元素
        @Throws(InterruptedException::class)
        internal fun awaitTimeout(): AsyncTimeout? {
          // 获取下一个元素(上边说了,head和 WatchDog同时存在的,只有WatchDog自己会把 head 置空,所以此时head一定不为空)
          val node = head!!.next
    
           // 如果下一个元素为空,wait 空闲的时间(60s默认)后关闭线程,又或者因为队列前边插入了新的元素,会被唤醒。
          if (node == null) {
            val startNanos = System.nanoTime()
            (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
            return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
              head // 线程空闲等待的事件结束,线程后续会结束。
            } else {
              null // 有新的元素插入在队列的最前端了,结束本次循环
            }
          }
          
          // 获取这个这个节点的剩余超时时间
          var waitNanos = node.remainingNanos(System.nanoTime())
    
          // 此时队列还不存在超时的元素
          if (waitNanos > 0) {
            // 线程wait 刚才算出来的剩余的时间后在醒来。
            val waitMillis = waitNanos / 1000000L
            waitNanos -= waitMillis * 1000000L
            (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
            return null    //醒来后直接结束本次循环
          }
    
          // 这个node节点超时了,从队列中删除此结点
          head!!.next = node.next
          node.next = null
          return node
        }
      }
    }
    

    小结:WatchDog 这个后台线程,持续从超时队列中获取下一个超时的元素(这个过程是做了多线程同步的),

    1. 如果队列中没有元素了,在等待了一段时间后,线程会结束,直到又有新的节点插入队列时才会创建新的WatchDog后台线程;如果在空闲等待的这一段时间里,有元素插入到队列中,WatchDog就会被唤醒,继续 持续从超时队列中获取下一个超时的元素

    2. 如果队列中有元素,获取第一个元素(最前边的),如果发现此结点没有超时,算出还有多久超时,则再 wait 这一小段的时间后再来处理,当然这个过程中也有可能被唤醒,因为也有可能有新的节点插入了最气边; 如果发现此结点已超时,则直接移除队列,然后进行 timedOut 回调。

    所以 WatchDog大部分时间都是在 waiting 在状态,不消耗 cpu 资源。

    1.3.3 用户的方法和 AsyncTimeout 进行关联

    讲了这么大一堆,到底如何把用户的方法扩展成带有超时机制的呢。看上边关于扩展 SomeClass 的例子中,通过 withTimeout 就会让原来的方法具有超时性。

    分析一下 withTimeout

    /**
    block:就是具体耗时的操作
    在执行 block 之前,先执行enter,block执行之后,执行exit,如果超时发生了,会抛出一个Exception,来自 [newTimeoutException] 
       */
      inline fun <T> withTimeout(block: () -> T): T {
        var throwOnTimeout = false
        enter()    // *****
        try {
          val result = block()
          throwOnTimeout = true
          return result
        } catch (e: IOException) {  // 如果block()调用过程中抛出了Exception
          //如果已经超时,会包装成新的超时Exception,如果没有超时,直接抛出方法本身的异常
          throw if (!exit()) e else `access$newTimeoutException`(e)
        } finally {  // 即便block没有抛出异常
          val timedOut = exit()      
          //判断超时后,依然会告知用户
          if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null)
        }
      }
    

    围绕 用户耗时方法的前后,有连个方法 enterexit
    enter : 会将耗时操作对应 AsyncTimeout节点按照超时时间插入到队列的合适位置。
    exit: 如果此AsyncTimeout 节点还在队列中,则删除此节点(因为任务已完成,不需要在被监听)。同时会返回此 AsyncTimeout 节点是否已经超时(如果节点中已经没有此节点了,这说明此节点被WatchDog删除了,也就是超时了)。

    着重看那一下入队列 和 出队列的方法,其它细节最好自己看源码。
    入队列

    private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
          synchronized(AsyncTimeout::class.java) {
    
            // 当head被创建的时候,启动WatchDog线程
            if (head == null) {
              head = AsyncTimeout()
              Watchdog().start()
            }
    
            // 根据用户设置的 timeoutNanos、deadlineNanoTime 转化成 timeoutAt
            val now = System.nanoTime()
            if (timeoutNanos != 0L && hasDeadline) {
              // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
              // around, minOf() is undefined for absolute values, but meaningful for relative ones.
              node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
            } else if (timeoutNanos != 0L) {
              node.timeoutAt = now + timeoutNanos
            } else if (hasDeadline) {
              node.timeoutAt = node.deadlineNanoTime()
            } else {
              throw AssertionError()
            }
    
            // Insert the node in sorted order.
            val remainingNanos = node.remainingNanos(now)
            var prev = head!!
            while (true) {
              if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {  // 找到合适的超时时间位置落座
                //插入节点
                node.next = prev.next    
                prev.next = node
                if (prev === head) {  // 如果此时是第一个元素(有可能是第一次放,也有可能是好几轮以后了)
                  // Wake up the watchdog when inserting at the front.
                 // 记得这时候唤醒WatchDog
                  (AsyncTimeout::class.java as Object).notify()
                }
                break
              }
              prev = prev.next!!    //一直往后找
            }
          }
        }
    

    出队列

     /** 返回true,如果超时已经发生 */
        private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
          synchronized(AsyncTimeout::class.java) {
            // Remove the node from the linked list.
           // 从但链表里删除此节点
            var prev = head
            while (prev != null) {
              if (prev.next === node) {
                prev.next = node.next
                node.next = null
                return false
              }
              prev = prev.next
            }
    
            // The node wasn't found in the linked list: it must have timed out!
            // 此节点没有找到,说明它对应的耗时方法已经超时了
            return true
          }
        }
    

    至此,整个处理超时的过程比较明朗了。

    timedOut 这个方法会在超时后被WatchDog回调。在这个回调里,可以打断耗时方法的执行,从而让耗时方法抛出异常终止方法继续执行,告知使用者已超时;也可以不坐任何操作,等到方法执行完成以后,也会抛出异常告知使用者已经超时。 这里说的打断耗时方法的执行,前提是耗时方法支持被打断,否则也无法中断程序。 超时机制的设置本来是为了给okhttp用的,例如 Socket 的write 方法没有超时支持,可以通过 AsyncTimeout 监听超时之后调用 socket#close 方法继而中断 write 操作。

    注意 WatchDog 是个 Daemon Thread。 Java的线程分为两种:User Thread(用户线程)、DaemonThread(守护线程)。只要当前JVM实例中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作,Daemon作用是为其他线程提供便利服务,守护线程最典型的应用就是GC(垃圾回收器),他就是一个很称职的守护者。 用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。

    这里涉及到 wait 、 notify 、synchronized 的多线程知识,是很好的一次多线程学习机会。

    从本篇文章中可以看到 等待/通知的经典范式。分为两部分:等待方(消费者) 和 通知方(生产者)
    等待方遵循原则:
    1、获取对象的锁
    2、如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件。
    3、条件满足则执行对应的逻辑。
    通知方遵循原则:
    1、获得对象的锁
    2、改变条件
    3、通知所有等待在对象上的线程。

    相关文章

      网友评论

          本文标题:okio 超时机制(很好的一次多线程编程学习机会)

          本文链接:https://www.haomeiwen.com/subject/cflujktx.html