美文网首页
OkHttp 源码剖析系列(四)——连接的建立概述

OkHttp 源码剖析系列(四)——连接的建立概述

作者: N0tExpectErr0r | 来源:发表于2019-08-09 12:30 被阅读0次

    系列索引

    本系列文章基于 OkHttp3.14

    OkHttp 源码剖析系列(一)——请求的发起及拦截器机制概述

    OkHttp 源码剖析系列(二)——拦截器大体流程分析

    OkHttp 源码剖析系列(三)——缓存机制分析

    OkHttp 源码剖析系列(四)——连接的建立概述

    OkHttp 源码剖析系列(五)——路由选择机制

    OkHttp 源码剖析系列(六)——连接复用机制及连接的建立

    OkHttp 源码剖析系列(七)——请求的发起及响应的读取

    前言

    前面的文章分析完了 OkHttp 中的缓存机制,现在让我们继续来研究其在 ConnectInterceptor 中所进行的连接建立的相关原理。由于连接建立的过程涉及到很多在 OkHttp 中非常重要的机制,因此将分为多篇文章进行介绍,这篇文章主要是对连接建立的大体流程进行介绍。

    连接建立流程概述

    ConnectInterceptor.intercept 方法中真正实现了连接的建立的代码如下:

    // 如果请求是GET格式,需要一些额外的检查
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    

    根据上面的代码我们可以推测,这个 Exchange 类与我们的连接是有一些关系的,真正连接的建立过程在 transmitter.newExchange 中实现。

    我们看到 transmitter.newExchange 方法:

    /**
     * Returns a new exchange to carry a new request and response.
     */
    Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        synchronized (connectionPool) {
            if (noMoreExchanges) {
                throw new IllegalStateException("released");
            }
            if (exchange != null) {
                throw new IllegalStateException("cannot make a new request because the previous response "
                        + "is still open: please call response.close()");
            }
        }
        // 寻找ExchangeCodec对象
        ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
        // 通过找到的codec对象构建Exchange对象
        Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
        // 进行一些变量的赋值
        synchronized (connectionPool) {
            this.exchange = result;
            this.exchangeRequestDone = false;
            this.exchangeResponseDone = false;
            return result;
        }
    }
    

    获取连接

    上面首先通过 exchangeFinder.find 方法进行了对 ExchangeCodec 的查找,找到对应的 ExchangeCodec 对象,之后通过这个 codec 对象构建了一个 Exchange 对象并返回

    那么什么是 ExchangeCodec 对象呢?我们先看到 exchangeFinder.find 方法:

    public ExchangeCodec find(
            OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
        try {
            RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                    writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
            return resultConnection.newCodec(client, chain);
        } catch (RouteException e) {
            trackFailure();
            throw e;
        } catch (IOException e) {
            trackFailure();
            throw new RouteException(e);
        }
    }
    

    可以看到这里调用到了 findHealthyConnection 方法从而获取 RealConnection 对象,看来这个就是我们的连接了,之后调用了 RealConnection.newCodec 方法获取 ExchangeCodec 对象。

    寻找可用连接

    我们先看到 findHealthyConnection 方法:

    /**
     * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
     * until a healthy connection is found.
     */
    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
                                                 int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
                                                 boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
            RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                    pingIntervalMillis, connectionRetryEnabled);
            // If this is a brand new connection, we can skip the extensive health checks.
            synchronized (connectionPool) {
                if (candidate.successCount == 0) {
                    return candidate;
                }
            }
            // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
            // isn't, take it out of the pool and start again.
            if (!candidate.isHealthy(doExtensiveHealthChecks)) {
                candidate.noNewExchanges();
                continue;
            }
            return candidate;
        }
    }
    

    可以看到这里是一个循环,不断地在调用 findConnection 方法寻找连接,若找不到 Healthy(可用)的连接,则继续循环直到找到为止。

    寻找连接

    我们先看到 findConnection 方法:

    /**
     * Returns a connection to host a new stream. This prefers the existing connection if it exists,
     * then the pool, finally building a new connection.
     */
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
                                          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        RealConnection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
            if (transmitter.isCanceled()) throw new IOException("Canceled");
            hasStreamFailure = false; // This is a fresh attempt.
            
            // 尝试使用之前已分配的连接,但可能该连接不能用来创建新的Exchange
            releasedConnection = transmitter.connection;
            // 如果当前的连接不能被用来创建新的Exchange,则将连接释放并返回对应Socket准备close
            toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
                    ? transmitter.releaseConnectionNoEvents()
                    : null;
            if (transmitter.connection != null) {
                // 存在已分配的连接,将其置为result,并置releasedConnection为null
                result = transmitter.connection;
                releasedConnection = null;
            }
            if (result == null) {
                // 如果不存在已经分配的连接,则尝试从连接池中获取连接
                if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
                    foundPooledConnection = true;
                    result = transmitter.connection;
                } else if (nextRouteToTry != null) {
                    // 修改当前选择路由为下一个路由
                    selectedRoute = nextRouteToTry;
                    nextRouteToTry = null;
                } else if (retryCurrentRoute()) {
                    // 如果当前Connection的路由应当重试,则将选择的路由设置为当前路由
                    selectedRoute = transmitter.connection.route();
                }
            }
        }
        closeQuietly(toClose);
        if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
            // 如果已经找到了已分配的或从连接池中取出的Connection,则直接返回
            return result;
        }
        // 如果需要进行路由选择,则进行一次路由选择
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
        }
        List<Route> routes = null;
        synchronized (connectionPool) {
            if (transmitter.isCanceled()) throw new IOException("Canceled");
            if (newRouteSelection) {
                // 路由选择过后如今有了一组IP地址,我们再次尝试从连接池中获取连接
                routes = routeSelection.getAll();
                if (connectionPool.transmitterAcquirePooledConnection(
                        address, transmitter, routes, false)) {
                    foundPooledConnection = true;
                    result = transmitter.connection;
                }
            }
            if (!foundPooledConnection) {
                if (selectedRoute == null) {
                    selectedRoute = routeSelection.next();
                }
                // 如果第二次尝试从连接池获取连接仍然失败,则创建新的连接。
                result = new RealConnection(connectionPool, selectedRoute);
                connectingConnection = result;
            }
        }
        if (foundPooledConnection) {
            // 如果第二次尝试从连接池获取连接成功,则将其返回
            eventListener.connectionAcquired(call, result);
            return result;
        }
        // 执行TCP+TLS握手,这是个阻塞的过程
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
        connectionPool.routeDatabase.connected(result.route());
        Socket socket = null;
        synchronized (connectionPool) {
            connectingConnection = null;
            // 最后一次尝试从连接池中获取连接,这种情况只可能在一个host下多个并发连接这种情况下
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
                // 如果成功拿到则关闭我们前面创建的连接的Socket,并返回连接池中的连接
                result.noNewExchanges = true;
                socket = result.socket();
                result = transmitter.connection;
            } else {
                // 如果失败则在连接池中放入我们刚刚创建的连接,并将其设置为transmitter中的连接
                connectionPool.put(result);
                transmitter.acquireConnectionNoEvents(result);
            }
        }
        closeQuietly(socket);
        eventListener.connectionAcquired(call, result);
        return result;
    }
    

    这个寻找连接的过程是非常复杂的,主要是下列几个步骤:

    1. 尝试获取 transmitter 中已经存在的连接,也就是当前 Call 之前创建的连接。
    2. 若获取不到,则尝试从连接池中调用 transmitterAcquirePooledConnection 方法获取连接,传入的 routes 参数为 null
    3. 若仍获取不到连接,判断是否需要路由选择,如果需要,调用 routeSelector.next 进行路由选择
    4. 如果进行了路由选择,则再次尝试从连接池中调用 transmitterAcquirePooledConnection 方法获取连接,传入的 routes 为刚刚路由选择后所获取的路由列表
    5. 若仍然获取不到连接,则调用 RealConnection 的构造函数创建新的连接,并对其执行 TCP + TLS握手。
    6. TCP + TSL握手之后,会再次尝试从连接池中通过 transmitterAcquirePooledConnection 方法获取连接,这种情况只会出现在一个 Host 对应多个并发连接的情况下(因为 HTTP/2 支持了多路复用,使得多个请求可以并发执行,此时可能有其他使用该 TCP 连接的请求也创建了连接,就不需要重新创建了)。
    7. 若最后一次从连接池中获取连接获取成功,会释放之前创建的连接的相关资源。
    8. 若仍获取不到,则将该连接放入连接池,并将其设置为 transmitter 的连接。

    可以看到,寻找连接的过程主要被分成了三种行为,分别是

    • 尝试获取 transmitter 中已经分配的连接
    • 尝试从线程池中调用 transmitterAcquirePooledConnection 获取连接
    • 创建新连接。

    有点类似图片加载的三级缓存,显然自上而下是越来越消耗资源的,因此 OkHttp 更偏向于前面直接能够获取到连接,尤其是尝试从连接池进行获取连接这一操作进行了三次

    不过我们现在只是知道了大体流程,还有许多疑问没有解开。比如路由选择是怎样的?OkHttp 中的连接池是如何实现的?连接的建立过程是如何实现的?等等疑问都还没有解开,我们将在后续文章中介绍到。

    判断连接是否可用

    我们接着看看 RealConnection.isHealthy 的实现,看看它是如何判断一个连接是否可用的:

    /**
     * Returns true if this connection is ready to host new streams.
     */
    public boolean isHealthy(boolean doExtensiveChecks) {
        // 判断Socket是否可用
        if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
            return false;
        }
        // 如果包含Http2的连接,检测是否shutdown
        if (http2Connection != null) {
            return !http2Connection.isShutdown();
        }
        if (doExtensiveChecks) {
            try {
                int readTimeout = socket.getSoTimeout();
                try {
                    // 设置一秒延时,检测Stream是否枯竭,若枯竭则该连接不可用
                    socket.setSoTimeout(1);
                        if (source.exhausted()) {
                        return false; // Stream is exhausted; socket is closed.
                    }
                    return true;
                } finally {
                    socket.setSoTimeout(readTimeout);
                }
            } catch (SocketTimeoutException ignored) {
                // Read timed out; socket is good.
            } catch (IOException e) {
                return false; // Couldn't read; socket is closed.
            }
        }
        return true;
    }
    

    可以看到,上面主要是对 SocketHTTP2连接Stream 进行了检测,从而判断该连接是否可用。

    什么是 Exchange

    现在我们已经知道了连接究竟是如何寻找到的,现在让我们回到 Exchange 类,让我们研究一下究竟什么是 Exchange,它是用来做什么的。

    让我们先从它的 JavaDoc 看到:

    Transmits a single HTTP request and a response pair. This layers connection management and events
    on {@link ExchangeCodec}, which handles the actual I/O.
    

    可以看到,这里讲到,Exchange 是一个用于发送 HTTP 请求和读取响应的类,而真正进行 I/O 的类是它的一个成员变量——ExchangeCodec 。在 Exchange 中暴露了许多对 Stream 进行读写的方法,如 writeRequestHeaderscreateRequestBody 等等,在 CallServerInterceptor 中就会通过 Exchange ,向服务器发起请求,并读取其所返回的响应。

    什么是 ExchangeCodec

    让我们看看 ExchangeCodec 又是什么:

    /**
     * Encodes HTTP requests and decodes HTTP responses.
     */
    public interface ExchangeCodec {
       
        int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
        
        RealConnection connection();
       
        Sink createRequestBody(Request request, long contentLength) throws IOException;
        
        void writeRequestHeaders(Request request) throws IOException;
      
        void flushRequest() throws IOException;
       
        void finishRequest() throws IOException;
       
        @Nullable
        Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
       
        long reportedContentLength(Response response) throws IOException;
        
        Source openResponseBodySource(Response response) throws IOException;
        
        Headers trailers() throws IOException;
       
        void cancel();
    }
    

    可以看到,它仅仅是个接口,根据上面的 JavaDoc 可以看出,它的作用是用于对请求进行编码,以及对响应进行解码

    我们看看它有哪些实现类,通过 Android Studio 我们可以很容易找到它有如下两个实现类:

    • Http1ExchangeCodec
    • Http2ExchangeCodec

    看得出来,OkHttp 采用了一种非常典型的面向接口编程,将对 Http 请求的编码及解码等功能抽象成了接口,再通过不同的实现类来实现将相同的 Request 对象编码为 HTTP1 及 HTTP2 的格式的数据,将 HTTP1 及 HTTP2 格式的数据解码为相同格式的 Response 对象。通过这样的一种面向接口的设计,大大地提高了 OkHttp 的可扩展性,可以通过实现接口的形式对更多的应用层进行支持。

    什么是 Transmitter

    接下来我们看看贯穿了我们整个请求流程的 Transimitter,究竟是一个用来做什么的类。我们先从 JavaDoc 入手:

    Bridge between OkHttp's application and network layers. This class exposes high-level application
    layer primitives: connections, requests, responses, and streams.

    根据上面的注释可以看出,Transmitter 是一座 OkHttp 中应用层与网络层沟通的桥梁。就像我们之前的连接创建,就是在应用层通过了 transmitter.newExchange 方法来通知网络层进行 Exchange 的获取,并返回给应用层。那么 Transmitter 是什么时候创建的呢?

    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.transmitter = new Transmitter(client, call);
        return call;
    }
    

    可以看到,它是在 RealCall 被创建的时候进行创建的,也就是说一个 Trasmitter 对应了一个 Call。这个 Call 在应用层通过 trasnmitter 与它的网络层进行通信。

    相关文章

      网友评论

          本文标题:OkHttp 源码剖析系列(四)——连接的建立概述

          本文链接:https://www.haomeiwen.com/subject/azgcjctx.html