美文网首页
OkHttp阅读笔记(二)

OkHttp阅读笔记(二)

作者: 范锦浩 | 来源:发表于2017-08-21 15:12 被阅读24次

    第一篇入口
    本文基于上一篇的前提,着重介绍其中的默认拦截器的意义
    这里稍微回顾一下拦截器的调用顺序

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        //尝试建立一个拦截器列表,之后会进行一次链式调用
        //简单说就是从上到下执行获取response之前的过程,
        //然后获取到response之后,再从下到上执行
        List<Interceptor> interceptors = new ArrayList<>();
        //首先执行自定义的拦截器
        interceptors.addAll(client.interceptors());
        //处理重试逻辑
        interceptors.add(retryAndFollowUpInterceptor);
        //添加一些预定义头部之类的数据
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //处理http协议的缓存逻辑
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //处理socket建立连接或者复用连接过程,总之这里会建立连接
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        //在已经建立的链接上进行参数发送和获取响应封装等操作
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    

    接下来会按照拦截器的执行顺序进行介绍,稍微留意一下所有拦截器都是在子线程中执行,使用中不应该出现线程错误的问题

    自定义拦截器

    在子线程中,请求先执行的是OkHttpClient中的拦截器列表,这个实际上是自定义拦截器

        public Builder addInterceptor(Interceptor interceptor) {
          interceptors.add(interceptor);
          return this;
        }
    

    实际上OkHttpClient通过Builder模式构建,然后可以在Builder中通过上述方法添加自定义拦截器,然后自定义拦截器会按照添加的顺序来执行。
    作为优先执行的拦截器,实际上主要有两个好处,第一就是可以监听请求的开始,其次就是可以监听请求的结束,那么平时这里至少可以有两种常用的拦截器
    1.打印日志的拦截器:用于在请求开始前打印请求参数等数据,请求结束时打印请求结果数据等
    2.追踪请求的拦截器:这个一般可以用于测量一个请求从发起到接收的时间
    当然上面只是我想到的两种,实际使用中根据自身需求添加就是了。

    RetryAndFollowUpInterceptor

    顾名思义,重试和追随拦截器,其实就是用于在请求完成之后,如果当前请求失败但是允许重试,那么会重新发起请求。如果当前请求成功,但是要求重定向,那么请求重定向地址的请求。
    当然上面的描述比较粗浅,接下来看一下细节

    @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        //流的分配者,这里重点是通过当前请求连接构建Address和RouteSelector
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    
        int followUpCount = 0;//重定向的次数,最大20
        Response priorResponse = null;//用于记录重定向之前上一次的响应成果,这个响应是会清空响应体的
        while (true) {//用于重连或者重定向
          if (canceled) {//每一次请求之前先判断当前Call(Request)是否被要求取消
            streamAllocation.release();//清理连接池的资源,并且关闭Socket
            throw new IOException("Canceled");//直接在此处抛出IOException即可,会在AsyncCall中回调onFailure
          }
    
          Response response = null;
          //用于标记每一次请求是否应该释放套接字连接
          boolean releaseConnection = true;
          try {
            //进行请求
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            //正常来说此时服务端已经返回结果,单次请求完成
            releaseConnection = false;
          } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            // 尝试连接到指定的节点时候失败
            // 1.在通过host去dns查找ip地址的时候出现异常
            // 2.进行socket连接过程中出现异常,实际上socket在连接的时候会尝试一直进行连接
            // 除非retryOnConnectionFailure要求不能重连,或者socket连接过程中出现不可重新连接相关的异常的时候会抛出
            // 后续会进行异常回调
            if (!recover(e.getLastConnectException(), false, request)) {
              throw e.getLastConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            // 这个异常可能更多的出现在网络传输数据的时候
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // We're throwing an unchecked exception. Release any resources.
            if (releaseConnection) {// 当前连接中出现异常,并且不可尝试重新连接
              streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
              streamAllocation.release();//标记当前流分配者后续不再可用
            }
          }
    
          // Attach the prior response if it exists. Such responses never have a body.
          // 在重试之前记录上一次请求的结果
          if (priorResponse != null) {
            //这里记录了上一次请求返回的响应,不过上一次请求的正文体在这里被置空
            //因为之前的响应的不应该被关心的,应该关心的是当前响应的正文体
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
          //判断是否存在代理、重定向和一些异常情况,可能需要尝试发出新的请求
          //这里可能要进行新的请求的构建
          Request followUp = followUpRequest(response);
    
          if (followUp == null) {//不需要重定向
            if (!forWebSocket) {//App连接非WebSocket
              //标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
              //通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
              //否则会待在复用池中,等待复用或者在指定的时间后被清理
              streamAllocation.release();
            }
            return response;//返回成功的响应结果
          }
          //如果到这里,说明要重定向
          //先关闭之前服务端响应的输入流
          closeQuietly(response.body());
    
          if (++followUpCount > MAX_FOLLOW_UPS) {//最大重定向次数20
            streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
    
          if (followUp.body() instanceof UnrepeatableRequestBody) {
            streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
            throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
          }
          //判断当前是否可以重用连接,主要是Route的问题,包括ip地址等数据
          //如果不能则需要通过新的请求地址来新建连接
          if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
            //通过新的Address构建新的流分配者
            streamAllocation = new StreamAllocation(
                client.connectionPool(), createAddress(followUp.url()), callStackTrace);
          } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                + " didn't close its backing stream. Bad interceptor?");
          }
          //准备进行下一次重定向的请求
          request = followUp;
          priorResponse = response;
        }
      }
    

    代码相对抽象,通过流程相对好阐述这个过程:
    1.正常的一次请求成功:

    try {
            //进行请求
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            //正常来说此时服务端已经返回结果,单次请求完成
            releaseConnection = false;
        }finally {
            if (releaseConnection) {//正常请求成功不释放当前连接,主要是为了连接的复用
                streamAllocation.streamFailed(null);
                streamAllocation.release();
            }
        }
        if (followUp == null) {//正常请求不会返回重定向
            if (!forWebSocket) {//App连接非WebSocket
                //标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
                //通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
                //否则会待在复用池中,等待复用或者在指定的时间后被清理
                //总之就是当前流在后续一段时间内可以复用
                streamAllocation.release();
            }
            return response;//返回成功的响应结果
        }
    

    非常简单,主要就是请求成功之后释放连接,让连接可以在连接池中被后续复用。
    2.第一次连接失败:

    while(true){
            try {
                //进行请求
                response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
                //抛出异常
            }catch (IOException e) {
                boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
                //判断当前是否可以重试,如果不能,这里抛出异常会回调onFailed,此时releaseConnection = true,走finally
                if (!recover(e, requestSendStarted, request)) throw e;
                //可以重试,先不释放连接
                releaseConnection = false;
                continue;
            }finally {
                if (releaseConnection) {//当前不可尝试重新连接
                    streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
                    streamAllocation.release();//标记当前流分配者后续不再可用
                }
            }
            //连接成功之后,走之前的流程
        }
    
    private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
        streamAllocation.streamFailed(e);//这里会关闭之前的socket
    
        // 应用层是否允许在连接失败之后重新尝试连接
        if (!client.retryOnConnectionFailure()) return false;
    
        // We can't send the request body again.
        // 这个是Http2协议中的情况,这里先不考虑
        if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
    
        // This exception is fatal.
        // 检查当前异常类型,因为有的异常是无法再次进行重试连接的
        if (!isRecoverable(e, requestSendStarted)) return false;
    
        // 当前没有连接节点可以去尝试
        // 一般来说就是当前节点
        if (!streamAllocation.hasMoreRoutes()) return false;
    
        // For failure recovery, use the same route selector with a new connection.
        return true;
      }
    
      private boolean isRecoverable(IOException e, boolean requestSendStarted) {
        // If there was a protocol problem, don't recover.
        // 协议异常,比方说协议规定的报文格式不符之类的情况
        // 总之就是一些不满足当前协议的条件
        if (e instanceof ProtocolException) {
          return false;
        }
    
        // If there was an interruption don't recover, but if there was a timeout connecting to a route
        // we should try the next route (if there is one).
        // 这个一般是Okio的异常,会在流操作超时之后抛出
        // SocketTimeoutException一般可以认为是socket连接超时或者读写超时
        // 这种时候可以尝试重连
        if (e instanceof InterruptedIOException) {
          return e instanceof SocketTimeoutException && !requestSendStarted;
        }
    
        // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
        // again with a different route.
        // 当前是Https握手失败异常
        if (e instanceof SSLHandshakeException) {
          // 如果是证书异常,这样没有必要重试,因为重试了也会失败
          // If the problem was a CertificateException from the X509TrustManager,
          // do not retry.
          if (e.getCause() instanceof CertificateException) {
            return false;
          }
        }
        if (e instanceof SSLPeerUnverifiedException) {
          // e.g. a certificate pinning error.
          return false;
        }
    
        // An example of one we might want to retry with a different route is a problem connecting to a
        // proxy and would manifest as a standard IOException. Unless it is one we know we should not
        // retry, we return true and try a new route.
        return true;
      }
    
    

    当连接失败之后,会进行重新连接的判断,首先先标记当前流失败
    1.当前应用层是否允许重新连接
    2.当前异常是否是重连无意义异常,比方说Http证书校验异常,这种就算重连也会失败
    3.检查当前是否有节点可用
    在判断条件之前,会先标记当前流失败,看一下细节实现,具体注释会对应当前场景

    public void streamFailed(IOException e) {
            Socket socket;
            boolean noNewStreams = false;
    
            synchronized (connectionPool) {
                if (e instanceof StreamResetException) {//这个是Http2协议的,先忽略
                    //...
                } else if (connection != null
                        && (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {//当前连接不为空,且不是http2协议
                    noNewStreams = true;//标记之后不会新建连接
                    // 当前连接没有成功,标记当前连接节点失败
                    // 当前连接被标记成功的条件是要求完成一次请求,并且从服务端成功读取响应体中的正文部分
                    if (connection.successCount == 0) {
                        if (route != null && e != null) {
                            routeSelector.connectFailed(route, e);
                        }
                        route = null;//会导致当前节点失效,那么如果要重新连接,要求必须有下一个备用的节点
                    }
                }
                //不新建流、当前流完成、不释放流分配者
                //当前socket连接会被关闭
                socket = deallocate(noNewStreams, false, true);
            }
    
            closeQuietly(socket);
        }
    
        private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
            assert (Thread.holdsLock(connectionPool));
            //连接完成之后,无论成功失败,都清理当前codec
            if (streamFinished) {
                this.codec = null;
            }
            //当前流分配者是否继续可用
            if (released) {
                this.released = true;
            }
            Socket socket = null;
            if (connection != null) {
                if (noNewStreams) {//当前连接是否可以重用
                    //一旦被标记,会导致连接池移除当前连接
                    //后续会关闭socket连接
                    connection.noNewStreams = true;
                }
                //在一次正常的请求完成之后,要进行连接的释放
                //包括socket的关闭
                if (this.codec == null && (this.released || connection.noNewStreams)) {
                    release(connection);
                    if (connection.allocations.isEmpty()) {
                        connection.idleAtNanos = System.nanoTime();
                        if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
                            socket = connection.socket();
                        }
                    }
                    connection = null;
                }
            }
            return socket;
        }
    
        private void release(RealConnection connection) {
            //这里实际上就是将连接和流分配者的关联解除
            //如果允许连接复用,那么不应该关闭socket连接,会在一定时间内等待复用
            //否则后面应该主动关闭socket连接
            for (int i = 0, size = connection.allocations.size(); i < size; i++) {
                Reference<StreamAllocation> reference = connection.allocations.get(i);
                if (reference.get() == this) {
                    connection.allocations.remove(i);
                    return;
                }
            }
            throw new IllegalStateException();
        }
    

    因为当前请求没有完全成功,那么是没有办法进行复用的,所以说这里会关闭当前的socket连接并且从复用池中移除。
    除此之外,还进行了节点是否有效的判断,因为复用的时候必须要有指定的连接节点,当前连接出现异常的请求会有一个对应的节点,如果当前节点连接成功,那么当前节点是有效的,重连的时候可以连接这个节点,否则就应该标记当前节点无效,重连的时候必须找寻其他节点。

    连接复用池

    上面既然提到了连接复用池,那么这里就看一下ConnectionPool的实现
    首先是为什么需要使用复用池?
    在Http1.1的协议中,通过"Connection: Keep-Alive"这个头部报文,可以指定当前socket连接为长连接,这意味者如果服务端/客户端没有主动终止当前连接,那么这个连接会一直保持
    (实际中可能会因为很多原因导致长连接中断,这个不是重点)
    此外在一次socket连接的过程中,至少会有3次握手这个流程的开销,可能还会有SSL证书/域名校验这一块,那么如果每一次连接都重复上述操作,显然不是什么很好的选择。
    那么复用之前已经建立的长连接就是最好的一个选择

    public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
    

    OkHttp中默认允许最多5个可以复用的连接,连接的最大等待时间是5分钟。
    上面在将重试拦截器的时候提到,如果一个连接完成之后会进行连接释放,其中

      if (connection.allocations.isEmpty()) {
              connection.idleAtNanos = System.nanoTime();
              if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
                socket = connection.socket();
              }
            }
    
    boolean connectionBecameIdle(RealConnection connection) {
        assert (Thread.holdsLock(this));
        //noNewStreams实际上是标记当前流不可复用
        if (connection.noNewStreams || maxIdleConnections == 0) {
          connections.remove(connection);//直接从连接池中移除
          return true;
        } else {
          //这里是唤醒cleaup线程
          notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
          return false;
        }
      }
    
    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {//计算下一次唤醒进行清理任务的间隔
              long waitMillis = waitNanos / 1000000L;//毫秒
              waitNanos -= (waitMillis * 1000000L);//纳秒
              synchronized (ConnectionPool.this) {
                try {//线程挂起指定时间
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
          //遍历当前连接池中的所有连接
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
            // 如果当前连接正在使用中,不回收,继续查找下一个
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
    
            idleConnectionCount++;//空闲连接数+1
    
            // If the connection is ready to be evicted, we're done.
            // 当前连接空闲时间 = 当前时间 - 当前连接被释放允许参与复用的时间
            long idleDurationNs = now - connection.idleAtNanos;
            // 这里是记录当前连接池中所有空闲连接中空闲的最久的连接
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;//当前最大的空闲的连接到现在所经过的纳秒数
              longestIdleConnection = connection;//当前空闲最久的可复用的连接
            }
          }
    
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {//当前连接超过最大的空闲时间或者数量大于最大的空闲连接数量
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            // 从连接池中移除,不再复用
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {//当前存在空闲连接,并且空闲连接没有超时
            // A connection will be ready to evict soon.
            // 当前清理线程挂起最大的空闲时间,下一次唤醒可能会清理当前最久的空闲连接
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            return keepAliveDurationNs;
          } else {
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
          }
        }
        //每一次清理的对象为当前空闲连接列表中最久的空闲连接
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
      }
    
    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        //往连接池中添加连接
        if (!cleanupRunning) {//当前清理线程没有运行
          cleanupRunning = true;//执行清理线程
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    
    private final Deque<RealConnection> connections = new ArrayDeque<>();
    

    连接池中核心就是一个连接队列和一个清理线程,在每一次添加连接到连接队列的时候尝试开启清理线程,清理线程会尝试清理当前在链接队列中空闲最久的连接。
    这里只是讲了逻辑,实际上在OkHttp中Http1.1要想复用连接,那么必须读取完ResponseBody中的数据,这样才会释放连接

    总结

    这一篇主要是了解重连和复用策略,下一篇会着重讲解Http缓存相关

    相关文章

      网友评论

          本文标题:OkHttp阅读笔记(二)

          本文链接:https://www.haomeiwen.com/subject/bmmorxtx.html