美文网首页
StreamAllocation

StreamAllocation

作者: 嗯哼嗯哼嗯哼嗯哼 | 来源:发表于2019-11-25 11:19 被阅读0次

    OkHttp

    StreamAllocation

    此类协调了三个类之间的关系:

    Connection:连接到远程服务器的物理Socket连接,Connection建立起来可能会比较慢,并且能够取消已经建立的Connection

    Stream:建立在Connection上的逻辑上的HTTP的请求对的数据流,每个连接都有自己的allocation limit,allocation limit 定义了Connection同时可以并发请求的数据流的个数,Http 1.1 连接一次只能分配一个Stream,Http 2 通常可以分配多个流(多路复用)

    Calls:Stream的逻辑顺序,通常是初始请求以及重定向请求。并且希望单个请求的所有流都被保存在同一个Connection上,以实现更好的行为

    在Connection和StreamAllocation之间的关系是,多个Stream可以共用一个Socket连接,每个tcp连接都是通过一个socket来完成的,socket对应一个host和port,如果有多个Stream(也就是多个Request)都是连接在一个host和port上,那么它们就可以共用同一个socket,这样做的好处就是可以减少tcp的三次握手时间。在OkHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。

    在讲解这个类之前,先熟悉3个概念:请求,连接,流。我们要明白Http通信执行网络“请求”需要在“连接”上建立一个新的“流”,我们将StreamAllocation称之流的桥梁,它负责为一次“请求”寻找“连接”并建立“流”,从而完成远程通信。所以说StreamAllocation与“请求”,“连接”,“流”都有关。

    从注释我们看到。Connection是建立在Socket之上的物理通信信道,而Stream则是代表逻辑的流,至于Call是对一次请求过程的封装。之前也说过一个Call可能会涉及多个流(比如重定向或者auth认证等情况)。所以我们想一下,如果StreamAllocation要想解决上述问题,需要两个步骤,一是寻找连接,二是获取流。所以StreamAllocation里面应该包含一个Stream(上文已经说到了,OKHttp里面的流是HttpCodec);还应该包含连接Connection。如果想找到合适的连接,还需要一个连接池ConnectionPool属性。所以应该有一个获取流的方法在StreamAllocation里面是newStream();找到合适的流的方法findConnection();还应该有完成请求任务的之后finish()的方法来关闭流对象,还有终止和取消等方法,以及释放资源的方法。

      public final Address address;//地址
      private RouteSelector.Selection routeSelection;
      private Route route;
      private final ConnectionPool connectionPool;//连接池
      public final Call call;//请求
      public final EventListener eventListener;
      private final Object callStackTrace;
    
      // State guarded by connectionPool.
      private final RouteSelector routeSelector;
      private int refusedStreamCount;
      private RealConnection connection;//连接
      private boolean reportedAcquired;
      private boolean released;
      private boolean canceled;
      private HttpCodec codec;
    

    根据请求,基于连接Connection构建出一个流

      public HttpCodec newStream(
          OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
        try {
          //寻找到一个可用的连接,下面会在这个连接上构造流
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
          //构造出HttpCodec,就是跟服务端的输入输出流
          HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    
    1. 寻找到一个可用的连接Connection
    2. 下面流的构造会在基于连接Connection上
      /**
       * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
       * until a healthy connection is found.
       */
      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
          boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
            //调用Connection,获取一个符合要求的RealConnection
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              pingIntervalMillis, connectionRetryEnabled);
    
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0) {
              return candidate;
            }
          }
    
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          //确认连接Connection必须是健康的,如果是健康的则重用,否者重新查找RealConnection
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
          }
    
          return candidate;
        }
      }
    
    
    
    
      /**
       * Returns a connection to host a new stream. This prefers the existing connection if it exists,
       * then the pool, finally building a new connection.
       */
      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        Connection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
          if (released) throw new IllegalStateException("released");
          if (codec != null) throw new IllegalStateException("codec != null");
          if (canceled) throw new IOException("Canceled");
    
          // Attempt to use an already-allocated connection. We need to be careful here because our
          // already-allocated connection may have been restricted from creating new streams.
          //尝试去用已经分配的连接
          releasedConnection = this.connection;
          toClose = releaseIfNoNewStreams();
          if (this.connection != null) {
            // We had an already-allocated connection and it's good.
            result = this.connection;
            releasedConnection = null;
          }
          if (!reportedAcquired) {
            // If the connection was never reported acquired, don't report it as released!
            releasedConnection = null;
          }
    
          if (result == null) {
            // Attempt to get a connection from the pool.
            //尝试从连接池里面获取可用的连接
            Internal.instance.get(connectionPool, address, this, null);
            if (connection != null) {
              foundPooledConnection = true;
              result = connection;
            } else {
              selectedRoute = route;
            }
          }
        }
        closeQuietly(toClose);
    
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
        }
        //如果已经从已经分配的或者从ConnectionPool里面找到了Connection,则返回 
        if (result != null) {
          // If we found an already-allocated or pooled connection, we're done.
          return result;
        }
    
        //如果上面没找到,那么切换路由,接着寻找连接
        // If we need a route selection, make one. This is a blocking operation.
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
    
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
    
          if (newRouteSelection) {
            // Now that we have a set of IP addresses, make another attempt at getting a connection from
            // the pool. This could match due to connection coalescing.
            List<Route> routes = routeSelection.getAll();
            for (int i = 0, size = routes.size(); i < size; i++) {
              Route route = routes.get(i);
              Internal.instance.get(connectionPool, address, this, route);
              if (connection != null) {
                foundPooledConnection = true;
                result = connection;
                this.route = route;
                break;
              }
            }
          }
    
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
    
            // Create a connection and assign it to this allocation immediately. This makes it possible
            // for an asynchronous cancel() to interrupt the handshake we're about to do.
            route = selectedRoute;
            refusedStreamCount = 0;
            //构造出一个新的连接Connection
            result = new RealConnection(connectionPool, selectedRoute);
            //把当前StreamAllocation添加到连接Connection中的allocations列表中
            acquire(result, false);
          }
        }
    
        // If we found a pooled connection on the 2nd time around, we're done.
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
    
        // Do TCP + TLS handshakes. This is a blocking operation.
        //做TCP和TLS的连接,握手,阻塞操作
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());
    
        Socket socket = null;
        synchronized (connectionPool) {
          reportedAcquired = true;
    
          // Pool the connection.
          //把这个连接放入到ConnectionPool中
          Internal.instance.put(connectionPool, result);
    
          // If another multiplexed connection to the same address was created concurrently, then
          // release this connection and acquire that one.
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
    
        eventListener.connectionAcquired(call, result);
        return result;
      }
    

    HttpCodec

    HttpCodec是一个接口,用于与服务端做输入输出流的包装,内部是用Okio来做的,我们这里只看Http1Codec,表示只用于Http1协议。Http1Codec是一个Socket连接,用于发送 Http 1的消息,这个类严格遵守以下的生命周期:

    1. 写请求头
    2. 写请求体
    3. 读取响应头
    4. 读取响应体
    //连接处于空闲状态,准备好写请求头了
    private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
    private static final int STATE_OPEN_REQUEST_BODY = 1;//准备写请求体
    private static final int STATE_WRITING_REQUEST_BODY = 2;//写入请求体
    private static final int STATE_READ_RESPONSE_HEADERS = 3;//读取响应头
    private static final int STATE_OPEN_RESPONSE_BODY = 4;//准备读取响应体
    private static final int STATE_READING_RESPONSE_BODY = 5;//读取响应体
    private static final int STATE_CLOSED = 6;//关闭
    /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
    final OkHttpClient client;
    /** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
    final StreamAllocation streamAllocation;//请求对应的流
    
    final BufferedSource source;//从服务器读取的响应流
    final BufferedSink sink;//向服务器写入的请求流
    int state = STATE_IDLE;//处于的生命周期
    private long headerLimit = HEADER_LIMIT;
    
    1. 看的出来一个HttpCodec有7种状态,对应着4个动作

    下面只分析一个写入请求头的方法,其他的跟这个类似,只是还是会将请求体和响应体细分为固定长度和非固定长度,分别用不同流来表示。

      /**
       * Prepares the HTTP headers and sends them to the server.
       *
       * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the
       * output stream has been written to. Otherwise the body would need to be buffered!
       *
       * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the
       * output stream has been written to and closed. This ensures that the {@code Content-Length}
       * header field receives the proper value.
       */
       //写入请求头
      @Override public void writeRequestHeaders(Request request) throws IOException {
        String requestLine = RequestLine.get(
            request, streamAllocation.connection().route().proxy().type());
        writeRequest(request.headers(), requestLine);
      }
      
        /** Returns bytes of a request header for sending on an HTTP transport. */
      public void writeRequest(Headers headers, String requestLine) throws IOException {
        if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
        //写入请求行和请求头
        sink.writeUtf8(requestLine).writeUtf8("\r\n");
        for (int i = 0, size = headers.size(); i < size; i++) {
          sink.writeUtf8(headers.name(i))
              .writeUtf8(": ")
              .writeUtf8(headers.value(i))
              .writeUtf8("\r\n");
        }
        sink.writeUtf8("\r\n");
        //状态改变为准备写入请求体
        state = STATE_OPEN_REQUEST_BODY;
      }
      
      
      
        /**
       * Returns the request status line, like "GET / HTTP/1.1". This is exposed to the application by
       * {@link HttpURLConnection#getHeaderFields}, so it needs to be set even if the transport is
       * HTTP/2.
       */
       //返回请求行,例如“GET / HTTP/1.1”
      public static String get(Request request, Proxy.Type proxyType) {
        StringBuilder result = new StringBuilder();
        result.append(request.method());
        result.append(' ');
    
        if (includeAuthorityInRequestLine(request, proxyType)) {
          result.append(request.url());
        } else {
          result.append(requestPath(request.url()));
        }
    
        result.append(" HTTP/1.1");
        return result.toString();
      }
    

    看到上面的向服务器写入的数据,也可以看到OkHttp的框架属于哪一层?OkHttp是与UrlConnection同一层的网络框架,直接对Socket进行封装。其实OkHttp就是实现了Http协议,并且对接口做了比较好的封装的框架。

    相关文章

      网友评论

          本文标题:StreamAllocation

          本文链接:https://www.haomeiwen.com/subject/ufpdwctx.html