美文网首页Android进阶之路Android收藏集Android技术知识
OkHttp3源码解析(二)——网络连接的管理(多路复用,连接池

OkHttp3源码解析(二)——网络连接的管理(多路复用,连接池

作者: 码农翻身记 | 来源:发表于2019-05-16 14:47 被阅读261次

    目录


    目录

    一、提出问题

    1.OkHttp底层也是通过Socket发送和接收请求,是如何支持http/https请求的?
    2.连接池的实现原理,如何支持多路复用?怎样从连接池选择复用连接?
    3.如何处理代理?
    4.Route、ConnectionPool、RealConnection、steamAllocation、HttpCodec分别的作用,如何协作?
    5.重定向请求或重试的处理流程?
    6.如何支持http2协议?

    如果刚开始学习OkHttp源码或对代理不了解的,可以先忽略代理部分的逻辑,先搞清楚直连请求的流程。OkHttp源码之所以复杂一部分原因是处理了代理和路由,但代理部分实际项目可能用不上。如果想深入了解OkHttp的代理,可以阅读:OkHttp源码解析 (三)——代理和路由(https://www.jianshu.com/p/63ba15d8877a)。

    二、网络管理涉及的角色及作用

    网络管理涉及的角色及作用

    外部发起的一次请求封装为一个RealCall,一个RealCall可能对应多个Request,如初始请求及后续的重定向请求,而每一个Request会创建一个StreamAllocation来管理连接,寻找合适的RealConnection,一个Call的所有Request偏向用同一个RealConnection,对于HTTP/1.x的请求,RealConnection同时只持有一个StreamAllocation,对于HTTP/2可以同时持有多个StreamAllocation。角色的对应关系如下图。


    角色对应关系.png

    三、各个角色的协作

    各个角色如何协作完成网络请求

    1、在RetryAndFollowUpInterceptor拦截器中,为新请求创建流StreamAllocation,如果请求返回需要重定向,创建重定向Request及新的StreamAllocation,继续上面的逻辑。

    2、在ConnectInterceptor拦截器中,StreamAllocation选择连接RealConnection:
    第一步:优先从连接池(connectionPool)中寻找,有合适的则直接复用;
    第二步:如果没有则创建新的RealConnection,并加入到连接池中,新创建的连接通过connect方法完成socket的三次握手,与服务器建立连接;
    第三步:建立连接后获得网络写入流(BufferedSink,封装了InputStream)和读取流(BufferedSource,封装了outputStream);
    第四步:最后创建HttpCodec,持有BufferedSink和BufferedSource,后续写入请求和读取响应通过HttpCodec操作。

    3、在CallServerInterceptor拦截器中,通过HttpCodec实现真正发送请求和读取服务器响应,最后构造Response并沿链路返回给上一级的拦截器。

    各拦截器的关键代码:
    RetryAndFollowUpInteceptor:

    public final class RetryAndFollowUpInterceptor implements Interceptor {
     @Override public Response intercept(Chain chain) throws IOException {
        ...
        //原始请求
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(request.url()), call, eventListener, callStackTrace);
        ...
        while (true) {
                 ...
                 //原始请求返回
                 response = realChain.proceed(request, streamAllocation, null, null);
                 ...
                 //创建重定向请求
                 Request followUp = followUpRequest(response, streamAllocation.route());
                 ... 
                 //重定向StreamAllocation
                 streamAllocation = new StreamAllocation(client.connectionPool(),createAddress(followUp.url()), call, eventListener, callStackTrace);
        }
    }
    

    ConnectInterceptor:

    public final class ConnectInterceptor implements Interceptor {
      public Response intercept(Chain chain) throws IOException {
          ...
         //从拦截器链里得到StreamAllocation对象
         StreamAllocation streamAllocation = realChain.streamAllocation();
         //寻找合适的连接并返回读写流
         HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
         //获取realConnetion
         RealConnection connection = streamAllocation.connection();
         //执行下一个拦截器
         return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    }
    

    CallServerInterceptor:

    public final class CallServerInterceptor implements Interceptor {
      public Response intercept(Chain chain) throws IOException {
             HttpCodec httpCodec = realChain.httpStream();
             Request request = realChain.request();
             //发送请求头和请求行(写入到缓冲区)
             httpCodec.writeRequestHeaders(request);
              if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
                    ...//发送body部分,post请求和"100-continue"请求
              }
              //flush正在发送
              httpCodec.finishRequest();
              //读取响应
              if (responseBuilder == null) {
                       realChain.eventListener().responseHeadersStart(realChain.call());
                       //读取头部
                        responseBuilder = httpCodec.readResponseHeaders(false);
               }
              //构造响应Response
              Response response = responseBuilder.request(request)
                                                  .handshake(streamAllocation.connection().handshake())
                                                  .sentRequestAtMillis(sentRequestMillis)
                                                  .receivedResponseAtMillis(System.currentTimeMillis())
                                                  .build();
                ...
               //返回Response 
               return response;
      }
    }
    

    四、详解各个角色的逻辑

    (一) StreamAllocation

    解释:流分配器,主要功能是管理一次连接上的流,为Request寻找合适的Realconnection,并获取网络读写流。重定向会创建新的StreamAllocation。每个Connection 有个变量allocationLimit,用于定义可以承载的并发的 streams 的数量。HTTP/1.x 的 Connection 一次只能有一个stream, HTTP/2 一般可以有多个。

    public final class StreamAllocation {
       public final Address address;//请求地址
       private RouteSelector.Selection routeSelection;//可选路由列表
       private Route route;//选中的路由
       private final ConnectionPool connectionPool;//连接池
       public final Call call;//请求call
       // State guarded by connectionPool.
       private final RouteSelector routeSelector;//路由选择器
       private HttpCodec codec;//编码网络请求和响应
    
       public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,
             EventListener eventListener, Object callStackTrace) {
          this.connectionPool = connectionPool;
          this.address = address;
          this.call = call;
          this.eventListener = eventListener;
          this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener);
          this.callStackTrace = callStackTrace;
          }
          /*
           * 获取流,通过findConnection得到连接,再获取读写流
           */
          public HttpCodec newStream(...) {}
          /*
           * 寻找可复用连接及判断是否"健康",如果不“健康”则继续循环直至找到“健康”连接
           */
          private RealConnection findHealthyConnection(...) throws IOException {}
          /*
           * 为新stream寻找可复用连接,可能来自连接池,如果没有则新建
           */
          private RealConnection findConnection(...){}
          /*
           * 释放当前持有的连接,如果连接是限制分配给新流的(noNewSteam为true),则返回socket进行关闭。
           * 对于HTTP/2,多个请求共享一个连接,所以对于follow-up请求期间可能被限制分配新流
           */
          private Socket releaseIfNoNewStreams(){}
          /*
           * 请求已经完成,从连接中移除正在当前执行的流,只有移除了连接才能被复用
           */
          public void streamFinished(){}
          /*
           * 禁止在承载此分配的连接上创建新流
           */
          public void noNewStreams(){} 
          /*
           * 取消或抛异常,释放连接
           */
          public void release() {}
          /*
           * 释放此流所持有的资源。如果分配了足够的资源,连接将被分离或关闭。调用者必须在连接池上同步。
           */
          private Socket deallocate(){}
    }
    

    (1)newStream方法
    下面看下为新请求寻找连接获取读写流的逻辑。

     public HttpCodec newStream(
             OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
          int connectTimeout = chain.connectTimeoutMillis();
          int readTimeout = chain.readTimeoutMillis();
          int writeTimeout = chain.writeTimeoutMillis();
          int pingIntervalMillis = client.pingIntervalMillis();
          boolean connectionRetryEnabled = client.retryOnConnectionFailure();
          try {
             RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                   writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
             HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
             synchronized (connectionPool) {
                codec = resultCodec;
                return resultCodec;
             }
          } catch (IOException e) {
             throw new RouteException(e);
          }
       }
    

    方法比较简单,第一步寻找连接RealConnection,第二步获取网络读写流HttpCodec,有点小疑问,为什么要先同步连接池再返回HttpCodec。

    (2)findHealthyConnection方法

    /**
        * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
        * until a healthy connection is found.
        * 寻找连接,如果是“健康”的则返回,如果不是继续循环寻找。
        */
       private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
             int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
             boolean doExtensiveHealthChecks) throws IOException {
          while (true) {
             RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                   pingIntervalMillis, connectionRetryEnabled);
             // If this is a brand new connection, we can skip the extensive health checks.
             //如果是新创建的连接,则不需判断是否“健康”,直接返回
             synchronized (connectionPool) {
                if (candidate.successCount == 0) {
                   return candidate;
                }
             }
             // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
             // isn't, take it out of the pool and start again.
             //检查连接是否良好,如果不是,则从连接池移除,然后继续寻找
             if (!candidate.isHealthy(doExtensiveHealthChecks)) {
                noNewStreams();
                continue;
             }
             return candidate;
          }
       }
    

    1、先调用findConnectoion方法返回连接对象RealConnection;
    2、根据RealConnection的属性successCount=0判断连接是新创建的,新创建的连接不需要判断是否“健康”,直接返回;
    3、如果successCount大于0,表示连接早已经创建,是从连接池中获取得到,这时需要判断连接是否“健康”;
    4、如果非“健康”连接,则设置该连接不允许承载新的流,继续第一步;
    findConnection及isHealthy的逻辑后面会分析。

    (3)findConnection方法

    /**
        * Returns a connection to host a new stream. This prefers the existing connection if it exists,
        * then the pool, finally building a new connection.
        * 为新流返回连接,优先从连接池中寻找复用,如果没有最终会创建新的连接
        */
       private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
             int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
          boolean foundPooledConnection = false;//是否从连接池中找到连接
          RealConnection result = null;//需要返回的连接
          Route selectedRoute = null;//找到路由
          Connection releasedConnection;//可释放的连接
          Socket toClose;//需要关闭的socket
          synchronized (connectionPool) {
             if (released) throw new IllegalStateException("released");
             if (codec != null) throw new IllegalStateException("codec != null");
             if (canceled) throw new IOException("Canceled");
    
             // Attempt to use an already-allocated connection. We need to be careful here because our
             // already-allocated connection may have been restricted from creating new streams.
           //??没想懂什么场景这个地方已经分配了连接
             releasedConnection = this.connection;
             toClose = releaseIfNoNewStreams();
             if (this.connection != null) {
                // We had an already-allocated connection and it's good.
                result = this.connection;
                releasedConnection = null;
             }
             if (!reportedAcquired) {
                // If the connection was never reported acquired, don't report it as released!
                releasedConnection = null;
             }
    
             if (result == null) {
                // Attempt to get a connection from the pool.
                //从连接池中获取一个连接,通过传入this对象,寻找到合适连接会赋值this.connection
                Internal.instance.get(connectionPool, address, this, null);
                if (connection != null) {
                   //寻找到合适的连接
                   foundPooledConnection = true;
                   result = connection;
                } else {
                   //连接池没有合适的连接,可能已经有路由信息(什么场景)
                   selectedRoute = route;
                }
             }
          }
          //??什么场景
          closeQuietly(toClose);
          ...
          if (result != null) {
             // If we found an already-allocated or pooled connection, we're done.
           //如果已经分配连接或者从连接池中寻找到合适的,则返回
             return result;
          }
    
          // If we need a route selection, make one. This is a blocking operation.
        
          boolean newRouteSelection = false;
          if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
             newRouteSelection = true;//需要尝试新的路由
             routeSelection = routeSelector.next();//路由集合
          }
    
          synchronized (connectionPool) {
             if (canceled) throw new IOException("Canceled");
    
             if (newRouteSelection) {
                // Now that we have a set of IP addresses, make another attempt at getting a connection from
                // the pool. This could match due to connection coalescing.
               // 现在已经有一个ip地址集合,再次尝试重连接池中寻找可复用连接
                List routes = routeSelection.getAll();
                for (int i = 0, size = routes.size(); i < size; i++) {
                   Route route = routes.get(i);
                   Internal.instance.get(connectionPool, address, this, route);
                   if (connection != null) {
                      foundPooledConnection = true;
                      result = connection;
                      this.route = route;
                      break;
                   }
                }
             }
    
             if (!foundPooledConnection) {//连接池中没有找到合适的连接
                if (selectedRoute == null) {
                   selectedRoute = routeSelection.next();//即将根据该路由创建新的连接
                }
    
                // Create a connection and assign it to this allocation immediately. This makes it possible
                // for an asynchronous cancel() to interrupt the handshake we're about to do.
              //创建新的连接并立即分配给当前的流,这样允许异步调用取消方法来打断即将进行的握手。
                route = selectedRoute;
                refusedStreamCount = 0;
                result = new RealConnection(connectionPool, selectedRoute);
                acquire(result, false);//分配连接给当前的流
             }
          }
    
          // If we found a pooled connection on the 2nd time around, we're done.    
        // 如果从连接池找到合适的连接则返回。
          if (foundPooledConnection) {
             eventListener.connectionAcquired(call, result);
             return result;
          }
    
          // Do TCP + TLS handshakes. This is a blocking operation.
          //新创建的连接,建立与服务器的连接,方法内会根据平台调用socket.connnect()
          result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
          //路由库记录新连接的路由
          routeDatabase().connected(result.route());
    
          Socket socket = null;
          synchronized (connectionPool) {
             reportedAcquired = true;
    
             // Pool the connection.
             //添加到连接池,添加前需要加锁
             Internal.instance.put(connectionPool, result);
    
             // If another multiplexed connection to the same address was created concurrently, then
             // release this connection and acquire that one.
             //
             if (result.isMultiplexed()) {
                socket = Internal.instance.deduplicate(connectionPool, address, this);
                result = connection;
             }
          }
          closeQuietly(socket);
    
          eventListener.connectionAcquired(call, result);
          return result;
       }
    

    总结findConnection的流程如下:
    1、先判断是否已经分配了连接,有则返回(没想懂是什么场景);
    2、没有则根据address从连接池中找可重用的连接Internal.intance.get(connectionPool,address,this,null),找到则返回;
    3、如果没有确定路由,则需要尝试新的路由,通过路由选择器返回路由集合,这时得到一个ip地址集合;
    4、遍历路由集合再次从连接池中寻找可复用的连接,有则设为待返回的连接;
    5、如最终从连接池中没有找到合适的连接,则新建连接new RealConnection(connectionPool,selectedRoute),并马上分配给当前的流;
    6、新建立的连接,建立与服务器连接result.connect(即sockect.connect),并把路由记录到路由库中,把创建的连接添加到连接池Internal.intance.put(connectionPool ,result).

    (二) RealConnection

    解析:建立在Socket之上的物理通信信道,持有StreamAllocation队列。

    public final class RealConnection extends Http2Connection.Listenerimplements Connection {
    private final ConnectionPool connectionPool;//连接池
    private final Route route; //当前连接到路由
    private Socket rawSocket; //底层socket
    private Socket socket; //应用层socket
    private Handshake handshake; //https的握手
    private Protocol protocol; //协议
    private Http2Connection http2Connection; //HTTP/2的链接
    private BufferedSource source; //网络读取流
    private BufferedSink sink; //网络写入流
    public boolean noNewStreams;  //标识是否能继续添加流,一但设为true,则一直为true,不能再添加流
    public int allocationLimit =1; //承载流(allocationStream)的最大的并发数
    public final List> allocations =new ArrayList<>(); //当前承载流的集合
    
    public RealConnection(ConnectionPool connectionPool, Route route){}
    /*
     * 连接,中创建新RealConnection对象后调用,完成socket连接
     */
    public void connect(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled, Call call,EventListener eventListener){}
    /*
     * 连接隧道
     */
    private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,EventListener eventListener)throws IOException{}
    /*
     * socket连接,完成tcp三次握手
     */
    private void connectSocket(int connectTimeout, int readTimeout, Call call,EventListener eventListener)throws IOException{}
    /*
     * 建立协议
     */
    private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,int pingIntervalMillis, Call call, EventListener eventListener)throws IOException{}
    /*
     *https请求,建立tls连接
     */
    private void connectTls(ConnectionSpecSelector connectionSpecSelector)throws IOException{}
    /*
     * 创建通道
     */
    private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,HttpUrl url)throws IOException{}
    /*
     * 构造创建通道的请求
     */
    private Request createTunnelRequest(){}
    /*
     *  是否符合条件,如果能分配新流则返回true,
     * /
    public boolean isEligible(Address address, @Nullable Route route){}
    /*
     * 将io流BufferedSource,BufferedSink封装为HttpCodec 
     */
    public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,StreamAllocation streamAllocation)throws SocketException{}
    /*
     * 是否“健康”,如果准备好建立新流则返回true
     */
    public boolean isHealthy(boolean doExtensiveChecks){}
    }
    

    (1)connect方法
    新创建RealConnection后,通过connect方法服务器建立连接,完成tcp三次握手,下面介绍下connect的方法。

    public void connect(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled, Call call,EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");//只能调用一次
    
          RouteException routeException = null;
          List connectionSpecs = route.address().connectionSpecs();
          ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    
          if (route.address().sslSocketFactory() == null) {
             if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
                throw new RouteException(new UnknownServiceException(
                      "CLEARTEXT communication not enabled for client"));
             }
             String host = route.address().url().host();
             if (!Platform.get().isCleartextTrafficPermitted(host)) {
                throw new RouteException(new UnknownServiceException(
                      "CLEARTEXT communication to " + host + " not permitted by network security policy"));
             }
          }
    
          while (true) { //对应SSLHandshakeException/SSLProtocolException的抛错,会重试
             try {
                //判断是否需要隧道,如果是通过HTTP代理完成https请求,则返回true
                //true的条件:address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP
                if (route.requiresTunnel()) {
                   //建立隧道,与http代理之间建立socket连接
                   connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
                   if (rawSocket == null) {
                      // We were unable to connect the tunnel but properly closed down our resources.
                      //无法与代理建立连接
                      break;
                   }
                } else {
                   //建立socket连接,不需要代理
                   connectSocket(connectTimeout, readTimeout, call, eventListener);
                }
               //创建协议,完成连接
                establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
                eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
                break;
             } catch (IOException e) {
                ...
                //connectionRetryEnabled :是否运行重试连接,在okhttpclient的builder设置,默认true
                //connectionSpecSelector.connectionFailed(e)针对不同的报错有不同的策略,返回true则重试
                if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
                       throw routeException;
                }
             }
          }
          ...
          if (http2Connection != null) { //http2运行一个RealConnection建立多个流
             synchronized (connectionPool) {
                allocationLimit = http2Connection.maxConcurrentStreams();
             }
          }
    

    总结connect的方法如下:
    1、判断是否需要隧道(隧道代理),如果需要则建立与代理服务器的sockect连接;
    2、不需要隧道则直接建立与服务器的sockect连接;
    3、确定网络协议,如果是https请求则进行tls握手;
    4、如果是HTTP/2,则新建http2Connection来处理请求,完成HTTP/2的协议协商。

    (2)connectSocket方法
    解析了连接的整理流程,下面对其中调用的方法进行分析,首先看下connectSocket方法:

    private void connectSocket(int connectTimeout, int readTimeout, Call call,EventListener eventListener) throws IOException {
          Proxy proxy = route.proxy();
          Address address = route.address();
          //如果是直连或者HTTP代理,则通过socketFactory创建socket,如果是socket代理,则直接new Socket
          rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
                ? address.socketFactory().createSocket(): new Socket(proxy);
          eventListener.connectStart(call, route.socketAddress(), proxy);
          rawSocket.setSoTimeout(readTimeout);
          try {
             //根据不同的平台,完成socket连接,实际是socket.connect(address, connectTimeout);
             Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
          } catch (ConnectException e) {
             ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
             ce.initCause(e);
             throw ce;
          }
          try {
             //okio封装sockect读写流
             source = Okio.buffer(Okio.source(rawSocket));
             sink = Okio.buffer(Okio.sink(rawSocket));
          } catch (NullPointerException npe) {
             if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
                throw new IOException(npe);
             }
          }
       }
    

    流程比较简单,总结如下:
    1、对于直连及http代理请求,通过SocketFactory创建socket,对于SOCKET代理传入proxy创建socket;
    2、设置socket超时时间;
    3、完成特定平台的socket连接,实际是socket.connect(address, connectTimeout);
    4、创建用于I/O的读写流source 、sink
    这里可以发现,代理请求处理的不同:
    · SOCKET代理:传入代理对象proxy手动创建socket,其他没有什么特别的处理,都交由java标准库的socket去处理,route的socketAddress包含目标http服务器的域名,对外界而言,不需要做处理。
    · HTTP代理:对于明文的HTTP代理, 也不需要特别的处理,route的socketAddress包含着代理服务器的IP地址,会自动建立与代理服务器的连接,代理服务器解析后再转发请求内容。

    (3)connectTunnel方法
    通过HTTP代理发送https请求需要用到隧道代理,也是一种协定方式,总结建立隧道的流程:
    1、客户端发送CONNECT请求到代理服务器,请求建立通道;请求会包含目标服务器的主机名和端口
    2、代理服务器与目标服务器建立TCP连接;
    3、代理服务器回应客户端;
    4、客户端向代理服务器发送请求,代理服务器原封不动转发客户端请求(原生TCP packet);
    5、响应过程同请求过程。

    了解隧道代理的基本流程,就好理解OkHttp中有关隧道代理的代码逻辑了。下面介绍connectTunnel方法:

    private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,EventListener eventListener) throws IOException {
          //构建连接请求
          Request tunnelRequest = createTunnelRequest();
          HttpUrl url = tunnelRequest.url();
          for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
             //建立与代理服务器的socket连接
             connectSocket(connectTimeout, readTimeout, call, eventListener);
             //建立隧道,发送不加密的代理请求并获取返回结果
             tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
             if (tunnelRequest == null) break; // 返回null表示已经与代理服务器建立隧道,退出循环
             
             // The proxy decided to close the connection after an auth challenge. We need to create a new
             // connection, but this time with the auth credentials.
             //如果代理服务器因身份认证问题关闭了连接,需要创建新的连接,带上身份凭证
             closeQuietly(rawSocket);
             rawSocket = null;
             sink = null;
             source = null;
             eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
          }
    }
    

    先创建与代理服务器的socket连接,然后再发送代理请求建立隧道(按照隧道代理的协议方式)。

    (4)createTunnelRequest方法
    建立隧道的需要构建代理请求, 那代理的请求发了什么,下面介绍createTunnelRequest方法:

    private Request createTunnelRequest() {
         return new Request.Builder()
               .url(route.address().url())
               .header("Host", Util.hostHeader(route.address().url(), true))
               .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
               .header("User-Agent", Version.userAgent())
               .build();
    }
    

    从代码可以看到通道请求包括很少的头部信息,是因为与服务器代理之间的连接是不加密的,避免发生如cookies等敏感信息到代理服务器。

    (5)createTunnel方法
    下面看下再看下如何创建隧道:

    private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
             HttpUrl url) throws IOException {
          // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
          // 建立隧道的协议请求内容
          String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
          while (true) {//??swtich各种分支都会退出循环,没想懂循环的场景
             Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
             source.timeout().timeout(readTimeout, MILLISECONDS);
             sink.timeout().timeout(writeTimeout, MILLISECONDS);
             // 发送代理请求
             tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
             tunnelConnection.finishRequest();
            // 读取响应
             Response response = tunnelConnection.readResponseHeaders(false)
                   .request(tunnelRequest)
                   .build();
             // The response body from a CONNECT should be empty, but if it is not then we should consume
             // it before proceeding.
             long contentLength = HttpHeaders.contentLength(response);
             if (contentLength == -1L) {
                contentLength = 0L;
             }
             Source body = tunnelConnection.newFixedLengthSource(contentLength);
             Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
             body.close();
    
             switch (response.code()) {
                case HTTP_OK:
                   // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
                   // that happens, then we will have buffered bytes that are needed by the SSLSocket!
                   // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
                   // that it will almost certainly fail because the proxy has sent unexpected data.
                   if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
                      throw new IOException("TLS tunnel buffered too many bytes!");
                   }
                   return null;
    
                case HTTP_PROXY_AUTH:
                   tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
                   if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
    
                   if ("close".equalsIgnoreCase(response.header("Connection"))) {
                      return tunnelRequest;
                   }
                   break;
    
                default:
                   throw new IOException(
                         "Unexpected response code for CONNECT: " + response.code());
             }
          }
       }
    

    (6)establishProtocol方法
    上面的方法只是建立了socket连接,无论是与目标服务器的连接,还是与代理服务器的,下面的方法是确定网络协议,如果是http请求,协议为http/1.1,可以直接返回,如果是https请求, 则建立ssl连接,如果HTTP/2则需要进行协议协商。这里就解析了OkHttp如何在socket连接之上实现http、https、HTTP/2等协议。

    private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
          int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
        if (route.address().sslSocketFactory() == null) {
          protocol = Protocol.HTTP_1_1;
          socket = rawSocket;
          return;
        }
        eventListener.secureConnectStart(call);
        connectTls(connectionSpecSelector);
        eventListener.secureConnectEnd(call, handshake);
        if (protocol == Protocol.HTTP_2) {
          socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
          http2Connection = new Http2Connection.Builder(true)
              .socket(socket, route.address().url().host(), source, sink)
              .listener(this)
              .pingIntervalMillis(pingIntervalMillis)
              .build();
          http2Connection.start();
        }
    }
    

    从代码可以看出:
    1、如果sslSocketFactory为空,说明是http请求,协议为HTTP_1_1,返回;
    2、如果sslSocketFactory非空,需要进行TLS握手;
    3、如果是协议是HTTP_2,则构建Http2Connection,完成与服务器的协商。

    (7)connectTls方法
    再看下如何建立TLS连接:

    private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
        Address address = route.address();
        SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
        boolean success = false;
        SSLSocket sslSocket = null;
        try {
          // Create the wrapper over the connected socket.
          //在原来的已经 建立连接的socket上加一层ssl,java中传入原始socket构造SSLSocket
          sslSocket = (SSLSocket) sslSocketFactory.createSocket(
              rawSocket, address.url().host(), address.url().port(), true /* autoClose */);
    
          // Configure the socket's ciphers, TLS versions, and extensions.
          //配置socket的加解密器 ,TLS版本及扩展内容
          ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
          if (connectionSpec.supportsTlsExtensions()) {
            Platform.get().configureTlsExtensions(
                sslSocket, address.url().host(), address.protocols());
          }
    
          // Force handshake. This can throw!
          //ssl握手
          sslSocket.startHandshake();
          // block for session establishment
          SSLSession sslSocketSession = sslSocket.getSession();
          if (!isValid(sslSocketSession)) {
            throw new IOException("a valid ssl session was not established");
          }
          Handshake unverifiedHandshake = Handshake.get(sslSocketSession);
    
          // Verify that the socket's certificates are acceptable for the target host.
          //验证socket的证书是否被服务器接受
          if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) { 
             //获取X509Certificate证书对象
            X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
            throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
                + "\n    certificate: " + CertificatePinner.pin(cert)
                + "\n    DN: " + cert.getSubjectDN().getName()
                + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
          }
    
          // Check that the certificate pinner is satisfied by the certificates presented.
          address.certificatePinner().check(address.url().host(),
              unverifiedHandshake.peerCertificates());
    
          // Success! Save the handshake and the ALPN protocol.
          String maybeProtocol = connectionSpec.supportsTlsExtensions()
              ? Platform.get().getSelectedProtocol(sslSocket)
              : null;
          socket = sslSocket;
          source = Okio.buffer(Okio.source(socket));
          sink = Okio.buffer(Okio.sink(socket));
          handshake = unverifiedHandshake;
          protocol = maybeProtocol != null
              ? Protocol.get(maybeProtocol)
              : Protocol.HTTP_1_1;
          success = true;
        } catch (AssertionError e) {
          if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        } finally {
          if (sslSocket != null) {
            Platform.get().afterHandshake(sslSocket);
          }
          if (!success) {
            closeQuietly(sslSocket);
          }
        }
    }
    

    TLS连接是对原始的TCP连接的一个封装,以提供TLS握手,及数据收发过程中的加密解密等功能。在Java中,用SSLSocket来描述。建立TLS连接的大致流程可总结为:
    1、在原始已经建立连接的socket的基础上,用SSLSocketFactory构建SSLSocket;
    2、配置SSLSocket,包括加解密器,TLS协议版本,如果ConnectionSpec支持TLS扩展参数,配置TLS扩展参数;
    3、开始TLS握手sslSocket.startHandshake();
    4、握手完后,获取服务器返回的证书信息SSLSession;
    5、对握手过程返回证书新息SSLSession进行验证hostnameVerifier().verify();
    6、验证远程主机证书;
    7、如果ConnectionSpec支持TLS扩展参数,获取握手过程完成的协议协商所选择的协议,主要用于http2的ALPN扩展;
    8、获取I/O操作的读写流,okio的BufferedSource和BufferedSink,保存协议及握手信息。

    (8)isEligible方法
    对于一个请求,优先从连接池寻找可复用的连接,如何判断连接是否能否被复用,下面解释判断能否复用的方法:

     /**
       * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
       * {@code route} is the resolved route for a connection.
       */
      public boolean isEligible(Address address, @Nullable Route route) {
        // If this connection is not accepting new streams, we're done.
        // 当前分配的流已达到上限或者已经设为不允许再分配流
        if (allocations.size() >= allocationLimit || noNewStreams) return false;
    
        // If the non-host fields of the address don't overlap, we're done.
        // host之外的配置要匹配,包括协议版本、代理、ssl、端口等
        if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    
        // If the host exactly matches, we're done: this connection can carry the address.
        // 如果host也完全匹配,则可放心复用
        if (address.url().host().equals(this.route().address().url().host())) {
          return true; // This connection is a perfect match.
        }
    
        // At this point we don't have a hostname match. But we still be able to carry the request if
        // our connection coalescing requirements are met. See also:
        // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
        // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
    
        // 1. This connection must be HTTP/2.
        // 运行到这里说明多个host指向一个ip的特殊情况,只允许HTTP/2复用,条件比较严格,要满足后续4点
        if (http2Connection == null) return false;
    
        // 2. The routes must share an IP address. This requires us to have a DNS address for both
        // hosts, which only happens after route planning. We can't coalesce connections that use a
        // proxy, since proxies don't tell us the origin server's IP address.
        if (route == null) return false;
        if (route.proxy().type() != Proxy.Type.DIRECT) return false;
        if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
        if (!this.route.socketAddress().equals(route.socketAddress())) return false;
    
        // 3. This connection's server certificate's must cover the new host.
        if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
        if (!supportsUrl(address.url())) return false;
    
        // 4. Certificate pinning must match the host.
        try {
          address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
        } catch (SSLPeerUnverifiedException e) {
          return false;
        }
    
        return true; // The caller's address can be carried by this connection.
      }
    

    判断连接是否可复用条件:
    1、先要满足流分配上限数(HTTP/1.x 1个,HTTP/2 多个);
    2、Address的配置完全相同,如SSL、代理、端口、主机名都要匹配;
    3、如果Address不匹配也可能有复用,主要是同一个主机配置了多个域名,且新请求已经选择的路由,条件必须同时满足:HTTP/2请求,新请求不是代理请求、当前连接也不是代理连接,路由ip、端口匹配,证书匹配。

    (三) ConnectionPool

    解析:我们都知道,在复杂的网络环境下,频繁创建和断开Socket连接是非常浪费资源和耗时的(需要3次握手4次挥手),如果是https连接还要进行ssl握手,http协议的keepalive对于解决这一问题有重要的作用。
    连接空闲后存活一段时间及连接复用需就要对连接进行管理,这里引入了连接池的概念。okhttp支持单个地址最多5个空闲连接(keepalive状态),保活时间是5分钟,超出时间的连接会被回收。okhttp用ConnectionPool实现连接池的功能,对连接进行管理和回收。
    ConnectionPool内部维护一个队列存放连接,一个线程池清理连接。

    /*
     * 连接池,实现连接的复用。通过一个队列维护当前所有的连接(RealConnection)
     * 最多同时持有5个空闲连接,保活时间为5分钟
     */
    public final class ConnectionPool {
       /**
        * Background threads are used to cleanup expired connections. There will be at most a single
        * thread running per connection pool. The thread pool executor permits the pool itself to be
        * garbage collected.用于清理失效连接的后台线程池。线程池允许自身进行垃圾回收。
        */
       private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
             Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
             new SynchronousQueue(), Util.threadFactory("OkHttp ConnectionPool", true));
       /** The maximum number of idle connections for each address. */
       private final int maxIdleConnections;//每个地址最多空闲连接数
       private final long keepAliveDurationNs;
       //清理连接线程,在线程池executor中调用
       private final Runnable cleanupRunnable = new Runnable() {
          @Override public void run() {
             while (true) {
                //执行清理,并返回下次清理的时间
                long waitNanos = cleanup(System.nanoTime());
                if (waitNanos == -1) return;
                if (waitNanos>0) {
                   long waitMillis = waitNanos / 1000000L;
                   waitNanos -= (waitMillis * 1000000L);
                   synchronized (ConnectionPool.this) {
                      try {
                         //阻塞等待,等时间到后继续循环清理
                         ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                      } catch (InterruptedException ignored) {
                      }
                   }
                }
             }
          }
       };
    
       private final Deque connections = new ArrayDeque<>();//存放连接的队列
       final RouteDatabase routeDatabase = new RouteDatabase();//路由库
       boolean cleanupRunning;
    
       /**
        * Create a new connection pool with tuning parameters appropriate for a single-user application.
        * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
        * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
        */
       public ConnectionPool() {
          this(5, 5, TimeUnit.MINUTES);
       }
    
        public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
          this.maxIdleConnections = maxIdleConnections;
          this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
          // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
          if (keepAliveDuration<= 0) {
             throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
          }
        }
        /*
         * 获取可复用连接,如果没有则返回null
         */
        @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {}
         /*
          * 将新创建的连接加入连接池
          */
         void put(RealConnection connection){}
    }
    

    (1)get方法

    /**
      * Returns a recycled connection to {@code address}, or null if no such connection exists. The
      * route is null if the address has not yet been routed.
      * 根据address获取可重用的连接,如果没有返回null。
      * 如果没有选择路由,入参route是null,StreamAllocation的findConnection方法,第一次调用get方法
      * 的入参route就是null
      */
      @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
           }
         }
         return null;
       }
    

    方法比较简单,遍历队列中每个连接,调用isEligible方法判断是否适合复用,能则分配给streamAllocation,判断是否适合的方法isEligible前面有分析。

    (2)put方法

       void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    }
    

    新创建的连接需要通过put方法加入到连接池,先执行清理,再添加到队列。

    (3)cleanUp方法

    /**
       * Performs maintenance on this pool, evicting the connection that has been idle the longest if
       * either it has exceeded the keep alive limit or the idle connections limit.
       *
       * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
       * -1 if no further cleanups are required.
       */
      long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
    
            idleConnectionCount++;
    
            // If the connection is ready to be evicted, we're done.
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
          // 找出空闲时间最长的连接
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            // A connection will be ready to evict soon.
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            return keepAliveDurationNs;
          } else {
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
          }
        }
    
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
      }
    

    清理的逻辑不复杂,就是遍历队列中的连接,调用pruneAndGetAllocationCount方法返回引用数,判断当前连接是否空闲,跳过正在被用的连接,对于空闲的连接,更新空闲持续的时间,通过遍历得到空闲时间最长的连接,如果超过了设定的保活时间或者空闲连接超过最大数量,则移除并关闭该连接,继续执行清除,如果没有需要移除的,返回下次清理时间,即最快达到设定保活的时间。

    (4)pruneAndGetAllocationCount方法

       * Prunes any leaked allocations and then returns the number of remaining live allocations on
       * {@code connection}. Allocations are leaked if the connection is tracking them but the
       * application code has abandoned them. Leak detection is imprecise and relies on garbage
       * collection.
       */
      private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        List<Reference<StreamAllocation>> references = connection.allocations;//弱引用列表
        for (int i = 0; i < references.size(); ) {
          Reference<StreamAllocation> reference = references.get(i);
          if (reference.get() != null) {
             i++;
             continue;
          }
          // We've discovered a leaked allocation. This is an application bug.
          StreamAllocation.StreamAllocationReference streamAllocRef =
              (StreamAllocation.StreamAllocationReference) reference;
          String message = "A connection to " + connection.route().address().url()
              + " was leaked. Did you forget to close a response body?";
          Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
    
          references.remove(i);
          connection.noNewStreams = true;
    
          // If this was the last allocation, the connection is eligible for immediate eviction.
          if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
          }
        }
    
        return references.size();
      }
    

    RealConnection通过一个列表记录当前建立的流List<Reference<StreamAllocation>>,这是一个弱引用列表,主要是为了防止内存泄漏,pruneAndGetAllocationCount方法主要是遍历该列表,如果发现引用的StreamAllocatin已经为空(程序出现bug,正常是不会出现的),则将该引用移出列表,最后返回当前持有引用的计数。

    (4)小结
    由上面的分析可总结连接池复用的原理:
    · OkHttp通过ConnectionPool维护线程池;
    · ConnectionPool通过队列Deque<RealConnection>持有当前所有的连接;
    · 新创建的连接通过put方法加入到队列,加入队列前先执行一遍清理;
    · get方法会根据传入的Address和Route遍历连接队列,返回可以复用的连接,复用的条件既要满足分配流的上限原则,也需protocol、ssl、host等配置匹配;
    · ConnectionPool通过一个专门的线程清理失效的连接,该线程每执行完一次清理都会根据返回的等待时间阻塞等待;
    · 清理的逻辑即遍历每个连接,通过连接对StreamAlloction的弱引用计数器来判断是否空闲(计数为0则说明空闲),通过遍历队列,找出空闲时长最长的连接,再根据已到保活的时长(keepalive)或空闲连接数的上限进行清理回收。

    五、总结

    至此基本解析了OkHttp网络连接管理的流程,由于篇幅及时间有限的原因,中间有些细节没有展开细分析。在分析的过程也解答了文章开头提出的疑问。

    参考

    https://www.jianshu.com/p/6166d28983a2
    https://blog.csdn.net/yueaini10000/article/details/83305787
    https://blog.csdn.net/FrancisHe/article/details/84667562#_HTTP__2

    公众号

    相关文章

      网友评论

        本文标题:OkHttp3源码解析(二)——网络连接的管理(多路复用,连接池

        本文链接:https://www.haomeiwen.com/subject/yidiaqtx.html