Okio 超时机制

作者: yqyzxd | 来源:发表于2018-07-05 11:09 被阅读16次

    Okio中有同步超时和异步超时。

    同步超时Timeout类

    Timeout类提供了两个互补的控制机制来定义超时策略。

    timeoutNanos 指定一个单一操作完成的最大时间。超时通常用于检测网络之类的问题。例如,如果远程对等点在10秒内没有返回任何数据,我们可以假定对等点不可用。

    deadlineNanoTime 截止日期指定了在工作上花费的最大时间,由一个或多个操作组成。用最后期限来设定工作投入时间的上限。

    Okio.source方法中返回的Source的read方法中用到了timeout

    private static Source source(final InputStream in, final Timeout timeout) {
              ...
        return new Source() {
          @Override public long read(Buffer sink, long byteCount) throws IOException {
              ...
              //检测是否到达截止时间
              timeout.throwIfReached();
              ...
            }
          }
        };
      }
    
      /**
        *如果已经到达最后期限或当前线程被中断,则抛出一个{@link InterruptedIOException}。该方法不检测timeoutNanos,这应该实现为异步中止正在进行的操作。
        */
      public void throwIfReached() throws IOException {
        if (Thread.interrupted()) {
          throw new InterruptedIOException("thread interrupted");
        }
    
        if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
          throw new InterruptedIOException("deadline reached");
        }
      }
    

    异步超时AsyncTimeout类

    注意:AsyncTimeout以链表数据结构组织每个AsyncTimeout对象是链表中的一个结点
    这个类使用一个后台线程在超时发生时执行操作。使用它来实现原生不支持超时的情况,例如在写入时阻塞的套接字。
    子类应该覆盖timeout方法以便在超时发生时执行必要的逻辑。这个方法会被共享的watchdog线程调用,所以不应该执行时间比较长的操作。否则就会影响其他timeout类的timeout方法的调用时机。
    使用sink方法或source方法来应用AsyncTimeout,流的操作会被应用超时机制。
    AsyncTimeout类的source方法

     public final Source source(final Source source) {
        return new Source() {
          @Override public long read(Buffer sink, long byteCount) throws IOException {
            boolean throwOnTimeout = false;
            enter();//①
            try {
              long result = source.read(sink, byteCount);//②
              throwOnTimeout = true;
              return result;
            } catch (IOException e) {
              throw exit(e);//③
            } finally {
              exit(throwOnTimeout);//④
            }
          }
    
          @Override public void close() throws IOException {
            boolean throwOnTimeout = false;
            try {
              source.close();
              throwOnTimeout = true;
            } catch (IOException e) {
              throw exit(e);
            } finally {
              exit(throwOnTimeout);
            }
          }
    
          @Override public Timeout timeout() {
            return AsyncTimeout.this;
          }
    
          @Override public String toString() {
            return "AsyncTimeout.source(" + source + ")";
          }
        };
      }
    

    看的出source方法内部其实用了装饰者模式,增强了read方法。
    代码中标出了4处。

    ① enter方法

    作用是把当前AsyncTimeout对象加入链表中,启动守护进程(如果还没启动的话),守护进程的作用的是找出超时的AsyncTimeout结点,并调用该结点的timeout方法。

     public final void enter() {
        if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
        long timeoutNanos = timeoutNanos();
        boolean hasDeadline = hasDeadline();
        if (timeoutNanos == 0 && !hasDeadline) {
          return; // 没有设置超时时间和截止时间,无需操作
        }
        inQueue = true;
    //当前结点加入AsyncTimeout组成的链表(以超时时间排序)
        scheduleTimeout(this, timeoutNanos, hasDeadline);
      }
    
    
    private static synchronized void scheduleTimeout(
          AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
        // head头结点为null,创建头结点并启动watchdog守护线程
        if (head == null) {
          head = new AsyncTimeout();
          new Watchdog().start();
        }
    
        long now = System.nanoTime();
        //设置超时发生的时间点
        if (timeoutNanos != 0 && hasDeadline) {
          node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
        } else if (timeoutNanos != 0) {
          node.timeoutAt = now + timeoutNanos;
        } else if (hasDeadline) {
          node.timeoutAt = node.deadlineNanoTime();
        } else {
          throw new AssertionError();
        }
    
        //将当前结点插入以超时时间排序的链表
        long remainingNanos = node.remainingNanos(now);
        for (AsyncTimeout prev = head; true; prev = prev.next) {
          if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;//常规链表插入结点操作
            prev.next = node;
            if (prev == head) {
              AsyncTimeout.class.notify(); //唤醒watchdog守护线程
            }
            break;
          }
        }
      }
    

    接下来看一下Watchdog守护线程

    private static final class Watchdog extends Thread {
        Watchdog() {
          super("Okio Watchdog");
          //设置为守护线程
          setDaemon(true);
        }
    
        public void run() {
          while (true) {
            try {
              AsyncTimeout timedOut;
              synchronized (AsyncTimeout.class) {
                //寻找超时的结点
                timedOut = awaitTimeout();
    
                // 还没有结点超时,继续寻找
                if (timedOut == null) continue;
    
                // 链接中没有除了头结点没有其他结点并且守护线程达到最大等待时间,清空链表,退出线程
                if (timedOut == head) {
                  head = null;
                  return;
                }
              }
    
              //找到超时的结点,调用该结点timeout方法
              timedOut.timedOut();
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    

    awaitTimeout方法的作用是寻找超时的结点,如果链表中只有head头结点,则等待最大60s,等待过程中若有结点插入,则线程被notify(scheduleTimeout中调用了notify方法)。此时链表中除了头结点还有刚加入的结点,awaitTimeout返回null,线程继续循环。第二次进入awaitTimeout方法head.next不为null,检查该结点是否超时,如果还没有超时,则继续等待该结点的超时时间,若超时了,则返回该结点,并将该结点从链表中删除。

    static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
        // Get the next eligible node.
        AsyncTimeout node = head.next;
    
        // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
        if (node == null) {
          long startNanos = System.nanoTime();
          AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
          return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
              ? head  
              : null; //返回null,说明提前被notify,有新结点加入了
        }
    
        long waitNanos = node.remainingNanos(System.nanoTime());
    
        // waitNanos>0表示还没有超时
        if (waitNanos > 0) {
      
          //等待超时
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
          return null;
        }
    
        //超时了,从链表中删除该结点
        head.next = node.next;
        node.next = null;
        return node;
      }
    

    回到AsyncTimeout类的source方法的2处

    ② read方法

    调用被装饰者的read方法

    read方法如果正常结束则接着会调用④处exit(throwOnTimeout)方法,如果发生了异常则调用③处抛出异常。

    ④ exit(boolean)方法

    exit方法参数throwOnTimeout 为true。方法内又调用了无参数的exit方法返回值为boolean类型的timedOut,意思是是否超时了。
    如果throwOnTimeout和timedOut都为true,则抛出超时异常。

    final void exit(boolean throwOnTimeout) throws IOException {
      //是否超时了    
    boolean timedOut = exit();
        
        if (timedOut && throwOnTimeout) throw newTimeoutException(null);
      }
      //无参的exit方法,inQueue表示是否在链表中,内部又调用了cancelScheduledTimeout方法
     public final boolean exit() {
        if (!inQueue) return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
      }
    //若参数node表示的结点还在链表中,则将其中链表中移除,并且返回false;否则返回true,表示已经超时。
      private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
        
        for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
          if (prev.next == node) {//链表中找到了node,将其移除,说明还没超时
            prev.next = node.next;
            node.next = null;
            return false;
          }
        }
    
        // 结点没有在链表中,说明已经被超时移除了(watchdog线程中的awaitTimeout方法会将超时的结点从链表中移除)
        return true;
      }
    

    ③ exit(IOException)

    如果②处read方法抛出了IOException,则来到exit(IOException)。该方法首先检测本结点是否已经超时,如果没有直接抛出原IOException,如果是因为超时抛出的IOException,则抛出一个包装后的Exception。

      final IOException exit(IOException cause) throws IOException {
        if (!exit()) return cause;
        return newTimeoutException(cause);
      }
    
     protected IOException newTimeoutException(@Nullable IOException cause) {
        InterruptedIOException e = new InterruptedIOException("timeout");
        if (cause != null) {
          e.initCause(cause);
        }
        return e;
      }
    

    相关文章

      网友评论

        本文标题:Okio 超时机制

        本文链接:https://www.haomeiwen.com/subject/wioeuftx.html