美文网首页
okhttp之旅(九)--创建链接,建立协议

okhttp之旅(九)--创建链接,建立协议

作者: brock | 来源:发表于2019-02-01 19:57 被阅读45次

    连接的创建是在StreamAllocation对象统筹下完成的,我们前面也说过它早在RetryAndFollowUpInterceptor就被创建了,StreamAllocation对象
    主要用来管理两个关键角色:

    • RealConnection:真正建立连接的对象,利用Socket建立连接
    • ConnectionPool:连接池,用来管理和复用连接。

    1 创建连接

    • 我们在前面的ConnectInterceptor分析中已经说过,connectInterceptor用来完成连接。
    • 真正的连接在RealConnection中实现,连接由连接池ConnectPool来管理,连接池最多保持5个地址的连接keep-alive,每个keep-alive时长为5分钟,并有异步线程清理无效的连接。
    ///主要由以下两个方法完成:
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    
    

    1.0 newStream()

    • StreamAllocation.newStream() 主要做的事情正是创建HttpCodec。
    • StreamAllocation.newStream() 根据 OkHttpClient中的设置,连接超时、读超时、写超时及连接失败是否重试,调用 findHealthyConnection() 完成 连接,即RealConnection 的创建。
    • 然后根据HTTP协议的版本创建Http1Codec或Http2Codec。
    • HttpCodec编码HTTP请求并解码HTTP响应。
    • HttpCodec提供了这样的一些操作:
    • 为发送请求而提供的,写入请求头部。
    • 为发送请求而提供的,创建请求体,以用于发送请求体数据。
    • 为发送请求而提供的,结束请求发送。
    • 为获得响应而提供的,读取响应头部。
    • 为获得响应而提供的,打开请求体,以用于后续获取请求体数据。
    • 取消请求执行。
        public HttpCodec newStream(
                OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
            int connectTimeout = chain.connectTimeoutMillis();
            int readTimeout = chain.readTimeoutMillis();
            int writeTimeout = chain.writeTimeoutMillis();
            int pingIntervalMillis = client.pingIntervalMillis();
            boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
            try {
                //完成 连接,即RealConnection 的创建。
                RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
                //根据HTTP协议的版本创建Http1Codec或Http2Codec
                HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
                synchronized (connectionPool) {
                    codec = resultCodec;
                    return resultCodec;
                }
            } catch (IOException e) {
                throw new RouteException(e);
            }
        }
    
    

    1.1StreamAllocation.newStream()调用findConnection()方法来建立连接。

    最终调用Java里的套接字Socket里的connect()方法
    调用顺序
    StreamAllocation.newStream()-->findHealthyConnection()-->findConnection()
    findConnection()中的流程如下:

    • 1.查找是否有完整的连接可用:
      Socket没有关闭

    输入流没有关闭
    输出流没有关闭
    Http2连接没有关闭

    • 2.连接池中是否有可用的连接,如果有则可用。
    • 3.如果没有可用连接,则自己创建一个。
    • 4.开始TCP连接以及TLS握手操作。(握手操作调用RealConnection.connect()-->最终调用ava里的套接字Socket里的connect()方法)
    • 5.将新创建的连接加入连接池。
    public final class StreamAllocation {
        /**
         * Returns a connection to host a new stream. This prefers the existing connection if it exists,
         * then the pool, finally building a new connection.
         * 返回一个连接来托管一个新的流。 这更喜欢现有的连接(如果存在的话),然后是池,最后建立一个新的连接。
         */
        private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
                                              int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
            boolean foundPooledConnection = false;
            RealConnection result = null;
            Route selectedRoute = null;
            Connection releasedConnection;
            Socket toClose;
            synchronized (connectionPool) {
                if (released) throw new IllegalStateException("released");
                if (codec != null) throw new IllegalStateException("codec != null");
                if (canceled) throw new IOException("Canceled");
    
                // Attempt to use an already-allocated connection. We need to be careful here because our
                // already-allocated connection may have been restricted from creating new streams.
                //尝试使用已分配的连接。 我们在这里需要小心,因为我们已经分配的连接可能已经被限制在创建新的流中。
                releasedConnection = this.connection;
                toClose = releaseIfNoNewStreams();
                //1 查看是否有完好的连接
                if (this.connection != null) {
                    // We had an already-allocated connection and it's good.
                    result = this.connection;
                    releasedConnection = null;
                }
                if (!reportedAcquired) {
                    // If the connection was never reported acquired, don't report it as released!
                    releasedConnection = null;
                }
                //2 连接池中是否用可用的连接,有则使用
                if (result == null) {
                    // Attempt to get a connection from the pool.
                    Internal.instance.get(connectionPool, address, this, null);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                    } else {
                        selectedRoute = route;
                    }
                }
            }
            closeQuietly(toClose);
    
            if (releasedConnection != null) {
                eventListener.connectionReleased(call, releasedConnection);
            }
            if (foundPooledConnection) {
                eventListener.connectionAcquired(call, result);
            }
            if (result != null) {
                // If we found an already-allocated or pooled connection, we're done.
                //如果我们找到了已经分配或者连接的连接,我们就完成了。
                return result;
            }
    
            // If we need a route selection, make one. This is a blocking operation.
            //如果我们需要路线选择,请选择一个。 这是一项阻止操作。
            //线程的选择,多IP操作
            boolean newRouteSelection = false;
            if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
                newRouteSelection = true;
                routeSelection = routeSelector.next();
            }
            //3 如果没有可用连接,则自己创建一个
            synchronized (connectionPool) {
                if (canceled) throw new IOException("Canceled");
    
                if (newRouteSelection) {
                    // Now that we have a set of IP addresses, make another attempt at getting a connection from
                    // the pool. This could match due to connection coalescing.
                   // 现在我们有一组IP地址,再次尝试从池中获取连接。 这可能由于连接合并而匹配。
                    List<Route> routes = routeSelection.getAll();
                    for (int i = 0, size = routes.size(); i < size; i++) {
                        Route route = routes.get(i);
                        Internal.instance.get(connectionPool, address, this, route);
                        if (connection != null) {
                            foundPooledConnection = true;
                            result = connection;
                            this.route = route;
                            break;
                        }
                    }
                }
    
                if (!foundPooledConnection) {
                    if (selectedRoute == null) {
                        selectedRoute = routeSelection.next();
                    }
    
                    // Create a connection and assign it to this allocation immediately. This makes it possible
                    // for an asynchronous cancel() to interrupt the handshake we're about to do.
                    //创建一个连接并立即将其分配给该分配。 这使得异步cancel()可以中断我们即将进行的握手。
                    route = selectedRoute;
                    refusedStreamCount = 0;
                    result = new RealConnection(connectionPool, selectedRoute);
                    acquire(result, false);
                }
            }
    
            // If we found a pooled connection on the 2nd time around, we're done.
            //如果我们第二次发现一个连接池,我们就完成了。
            if (foundPooledConnection) {
                eventListener.connectionAcquired(call, result);
                return result;
            }
    
            // Do TCP + TLS handshakes. This is a blocking operation.
            //4 开始TCP以及TLS握手操作,这是阻塞操作
            result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                    connectionRetryEnabled, call, eventListener);
            routeDatabase().connected(result.route());
    
            //5 将新创建的连接,放在连接池中
            Socket socket = null;
            synchronized (connectionPool) {
                reportedAcquired = true;
    
                // Pool the connection.
                Internal.instance.put(connectionPool, result);
    
                // If another multiplexed connection to the same address was created concurrently, then
                // release this connection and acquire that one.
                //如果同时创建了到同一地址的另一个多路复用连接,则释放此连接并获取该连接。
                if (result.isMultiplexed()) {
                    socket = Internal.instance.deduplicate(connectionPool, address, this);
                    result = connection;
                }
            }
            closeQuietly(socket);
    
            eventListener.connectionAcquired(call, result);
            return result;
        }
    }
    
    

    1.2 调用该方法的RealConnection.connect()方法建立连接

    • connect-->connectSocket()进行socket连接-->Platform.get().connectSocket()-->socket.connect(address, connectTimeout);(此时进行了三次握手)
    • 握手完成后调用establishProtocol()

    注:
    当客户端调用connect时,触发了连接请求,向服务器发送了SYN J包,这时connect进入阻塞状态;服务器监听到连接请求,即收到SYN J包,调用accept函 数接收请求向客户端发送SYN K ,ACK J+1,这时accept进入阻塞状态;客户端收到服务器的SYN K ,ACK J+1之后,这时connect返回,并对SYN K进行确认;服务器收到ACK K+1时,accept返回,至此三次握手完毕,连接建立。

    connect()

        public void connect(int connectTimeout, int readTimeout, int writeTimeout,
                            int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
                            EventListener eventListener) {
            if (protocol != null) throw new IllegalStateException("already connected");
            //线路选择
            RouteException routeException = null;
            List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
            ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    
            if (route.address().sslSocketFactory() == null) {
                if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
                    throw new RouteException(new UnknownServiceException(
                            "CLEARTEXT communication not enabled for client"));
                }
                String host = route.address().url().host();
                if (!Platform.get().isCleartextTrafficPermitted(host)) {
                    throw new RouteException(new UnknownServiceException(
                            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
                }
            }
            //开始连接
            while (true) {
                try {
                    //建立隧道连接
                    if (route.requiresTunnel()) {
                        connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
                        if (rawSocket == null) {
                            // We were unable to connect the tunnel but properly closed down our resources.
                            //我们无法连接隧道,但正确关闭了我们的资源。
                            break;
                        }
                    } else {
                        //建立普通连接
                        connectSocket(connectTimeout, readTimeout, call, eventListener);
                    }
                    // 建立协议
                    //不管是建立隧道连接,还是建立普通连接,都少不了 建立协议 这一步。
                    // 这一步是在建立好了TCP连接之后,而在该TCP能被拿来收发数据之前执行的。
                    // 它主要为数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协议协商等
                    establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
                    eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
                    break;
                    //完成连接
                } catch (IOException e) {
                    closeQuietly(socket);
                    closeQuietly(rawSocket);
                    socket = null;
                    rawSocket = null;
                    source = null;
                    sink = null;
                    handshake = null;
                    protocol = null;
                    http2Connection = null;
    
                    eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
    
                    if (routeException == null) {
                        routeException = new RouteException(e);
                    } else {
                        routeException.addConnectException(e);
                    }
    
                    if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
                        throw routeException;
                    }
                }
            }
    
            if (route.requiresTunnel() && rawSocket == null) {
                ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
                        + MAX_TUNNEL_ATTEMPTS);
                throw new RouteException(exception);
            }
    
            if (http2Connection != null) {
                synchronized (connectionPool) {
                    allocationLimit = http2Connection.maxConcurrentStreams();
                }
            }
        }
    
    

    connectSocket()

        /**
         * Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
         * 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作。
         */
        private void connectSocket(int connectTimeout, int readTimeout, Call call,
                                   EventListener eventListener) throws IOException {
            Proxy proxy = route.proxy();
            Address address = route.address();
            //根据代理类型的不同处理Socket
            rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
                    ? address.socketFactory().createSocket()
                    : new Socket(proxy);
    
            eventListener.connectStart(call, route.socketAddress(), proxy);
            rawSocket.setSoTimeout(readTimeout);
            try {
                //建立Socket连接
                Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
            } catch (ConnectException e) {
                ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
                ce.initCause(e);
                throw ce;
            }
    
            // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
            // More details:
            // https://github.com/square/okhttp/issues/3245
            // https://android-review.googlesource.com/#/c/271775/
            try {
                //获取输入/输出流
                source = Okio.buffer(Okio.source(rawSocket));
                sink = Okio.buffer(Okio.sink(rawSocket));
            } catch (NullPointerException npe) {
                if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
                    throw new IOException(npe);
                }
            }
        }
    
    public class Platform {
      public void connectSocket(Socket socket, InetSocketAddress address,
          int connectTimeout) throws IOException {
        //最终调用java的connect
        socket.connect(address, connectTimeout);
      }
    }
    
    

    connectTunnel()隧道链接

        /**
         * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
         * proxy server can issue an auth challenge and then close the connection.
         * 是否通过代理隧道建立HTTPS连接的所有工作。 这里的问题是代理服务器可以发出一个验证质询,然后关闭连接。
         */
        private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
                                   EventListener eventListener) throws IOException {
            //1-构造一个 建立隧道连接 请求。
            Request tunnelRequest = createTunnelRequest();
            HttpUrl url = tunnelRequest.url();
            for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
                //2 与HTTP代理服务器建立TCP连接。
                connectSocket(connectTimeout, readTimeout, call, eventListener);
                //3 创建隧道。这主要是将 建立隧道连接 请求发送给HTTP代理服务器,并处理它的响应
                tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
    
                if (tunnelRequest == null) break; // Tunnel successfully created.
    
                // The proxy decided to close the connection after an auth challenge. We need to create a new
                // connection, but this time with the auth credentials.
                closeQuietly(rawSocket);
                rawSocket = null;
                sink = null;
                source = null;
                eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
                //重复上面的第2和第3步,直到建立好了隧道连接。
            }
        }
    
    

    createTunnel()

    public final class RealConnection extends Http2Connection.Listener implements Connection {
        /**
         * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
         * the proxy connection. This may need to be retried if the proxy requires authorization.
         * 要通过HTTP代理建立HTTPS连接,请发送未加密的CONNECT请求以创建代理连接。 如果代理需要授权,则可能需要重试。
         */
        private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
                                     HttpUrl url) throws IOException {
            // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
            // 在每个SSL +代理连接的第一个消息对上创建一个SSL隧道。
            String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
            while (true) {
                Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
                source.timeout().timeout(readTimeout, MILLISECONDS);
                sink.timeout().timeout(writeTimeout, MILLISECONDS);
                tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
                //sink.flush();
                tunnelConnection.finishRequest();
                Response response = tunnelConnection.readResponseHeaders(false)
                        .request(tunnelRequest)
                        .build();
                // The response body from a CONNECT should be empty, but if it is not then we should consume
                // it before proceeding.
                long contentLength = HttpHeaders.contentLength(response);
                if (contentLength == -1L) {
                    contentLength = 0L;
                }
                Source body = tunnelConnection.newFixedLengthSource(contentLength);
                Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                body.close();
    
                switch (response.code()) {
                    case HTTP_OK:
                        // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
                        // that happens, then we will have buffered bytes that are needed by the SSLSocket!
                        // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
                        // that it will almost certainly fail because the proxy has sent unexpected data.
                        if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
                            throw new IOException("TLS tunnel buffered too many bytes!");
                        }
                        return null;
    
                    case HTTP_PROXY_AUTH:
                        tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
                        if (tunnelRequest == null)
                            throw new IOException("Failed to authenticate with proxy");
    
                        if ("close".equalsIgnoreCase(response.header("Connection"))) {
                            return tunnelRequest;
                        }
                        break;
    
                    default:
                        throw new IOException(
                                "Unexpected response code for CONNECT: " + response.code());
                }
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:okhttp之旅(九)--创建链接,建立协议

          本文链接:https://www.haomeiwen.com/subject/uvodsqtx.html