OkHttp 源码阅读笔记(二)
在前面的一篇文章中介绍了 OkHttp
的同步调用和异步调用,Dispatcher
的任务调度器工作方式和 RealInterceptorChain
拦截器链的工作方式,没有看过的同学可以看看OkHttp 源码阅读笔记(一)。
本篇文章主要介绍网络链接获取。
获取网络链接缓存
我在前面的文章中也讲到 ConnectInterceptor
拦截器中会获取网络链接,我们来看看他的 intercept()
方法:
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 获取 Exchange 对象
val exchange = realChain.call.initExchange(realChain)
// 把 exchange 对象写入到一个新的 RealInterceptorChain 对象中
val connectedChain = realChain.copy(exchange = exchange)
// 用新的 chain 对象执行后面的任务链调用
return connectedChain.proceed(realChain.request)
}
前面我也讲过如果有 Exchange
对象就表示网络链接已经创建,在 ConnectInterceptor
拦截器之后执行的拦截器执行时就已经有网络链接了,除了系统拦截器外,这些拦截器就是我们自定义的网络拦截器,我们自定义的普通拦截器执行时就还没有创建网络链接。
我们看到它时调用了 RealCall#initExchange()
方法来获取 Exchange
对象的,我们来看看它的实现:
internal fun initExchange(chain: RealInterceptorChain): Exchange {
// 各种状态检查
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
// 这个 ExchangeFinder 对象是在第一个系统拦截 RetryAndFollowUpInterceptor 中触发创建的
val exchangeFinder = this.exchangeFinder!!
// 找到一个可用的链接
val connection = exchangeFinder.find()
val codec = connection.newCodec(client, chain)
// 构建出 Exchange 对象.
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
通过以上的代码我们可以知道是通过 ExchangeFinder#find()
方法去查找的一个可用的链接,然后通过它最终构建一个 Exchange
对象。
其中触发 ExchangeFinder
实例创建的代码是第一个系统拦截器 RetryAndFlowUpInterceptor
,我们简单看看相关的代码:
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ...
while (true) {
// 触发 ExchangeFinder 对象创建
call.enterNetworkInterceptorExchange(request, newRoutePlanner, chain)
// ...
}
}
我们看到 RetryAndFlowUpInterceptor
调用了 RealCall#enterNetworkInterceptorExchange()
方法,我们来看看它的实现:
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
synchronized(this) {
check(!responseBodyOpen) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
check(!requestBodyOpen)
}
if (newExchangeFinder) {
// 直接创建一个 ExchangeFinder 对象
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
我们继续看看 ExchangeFinder#find()
方法的实现:
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
// 找到一个可用的链接
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
// 创建一个对应的 ExchangeCodec 对象返回
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
我们继续追踪 findHealthyConnection()
方法:
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
// 这里是一个死循环去查找可用的链接,直到找到可用的链接为止
while (true) {
// 查找
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
// 判断链接可用
if (candidate.isHealthy(doExtensiveHealthChecks)) {
返回可用链接
return candidate
}
// If it isn't, take it out of the pool.
// 标记链接不可用
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
// 如果还有可重试的链接,继续查找
if (nextRouteToTry != null) continue
// 如果还有可选择的 Route,继续查找
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
// 如果还有可选择的 Proxy,继续查找
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
// 没有找到可用的 Route,直接报错
throw IOException("exhausted all routes")
}
}
这里是一个循环去查找一个可用的链接,你可能有点懵,怎么还有多个可用的链接呢?我这里先大概介绍一下,后面还会详细介绍。在 OkHttp
怎么确定一个链接呢?Proxy
+ IP Address
就可以设定一个确定的链接,OkHttp
可以设置 Proxy
,一个域名也可以有多个对应的 IP Address
。所以 一次请求可能有多个可以使用链接,就像一个请求有多个可达的路径,OkHttp
在它的内部用 Route
类来描述这个可达的路径。当一个请求有多个可达的路径时,就会找到一个可用的然后返回,所以上面是一个循环去查询的原因。首先遍历 Proxy
,然后再遍历 IP Address
直到找到一个可用的链接,然后再返回。
OK,我们再看看 findConnnection()
方法的代码:
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// 先从 `RealCall` 中去获取上次的 Connection,判断是否可用
// Attempt to reuse the connection from the call.
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
// 这里表示上次的链接不可用
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
// 这里表示上次的链接可用,直接返回
check(toClose == null)
return callConnection
}
// The call's connection was released.
// 关闭不可用的链接
toClose?.closeQuietly()
// 关闭链接时会通知 `eventListener`。
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 尝试从 ConnectionPool 中获取一个可以使用的链接
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
// 表示从 ConnectionPool 中找到可以使用的链接直接返回
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
// 后续就是新创建一个链接的代码,后面再分析。
// ...
return newConnection
}
这里有三种方式来获取链接:
-
直接从
RealCall
中获取
你可能有点懵RealCall
中怎么会有链接呢?我们一路看代码都没有看到。先不要急,这个可能是上次网络请求的时候保存下来的,但是也不对啊,一个RealCall
只能执行一次,确实是这样RealCall
是只能执行一次,但是一次的RealCall
调用中可能有多次的Http
请求,比如说重定向请求,我第一次请求后服务器返回了一个重定向的地址,我就还需要再请求一次重定向地址,所以我第二次请求时就有第一次请求是创建的链接,所以我就可以尝试使用第一次的链接来完成第二次请求,但是两次的域名有可能是不一样的,所以需要检查一次上次的链接是否可以在本次使用。 -
从
ConnectionPool
中获取
就是缓存后的链接,如果RequestHeander
和ResponseHeader
中都有Connection: Keep-Alive
就表示后续可以继续使用这个链接,我们就会在把他保存在ConnectionPool
中供下次使用,文章后面会单独看ConnectionPool
。 -
新创建链接
我把新创建链接的代码省略了,文章后面单独看新链接创建的过程。
新的网络链接创建
我们接着看 ExchangeFinder#findConnection()
方法中创建网络链接的相关代码:
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
// ...
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// 重试的 Route
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// 从 RouteSelection 中获取下一个 Route
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
// 创建 RouteSeletor
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
// 获取可用的 RouteSelection
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
// 再尝试从 ConnectionPool 中获取一次,如果获取到了,使用 Pool 中的链接
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// 获取到一个可用的 Route
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
// 创建新的链接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 执行链接
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
// 再再次尝试从 ConnectionPool 中获取链接
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
// 把新的链接放入到 ConnectionPool 中
connectionPool.put(newConnection)
// 把这个链接保存到 RealCall 中
call.acquireConnectionNoEvents(newConnection)
}
// 链接成功后通知 EventListener
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
这里整理一下主要逻辑:
-
链接
Route
的获取
Route
的获取有三种方式:- 获取上次需要重试的
Route
。 - 从上次保存的
RouteSelection
中获取Route
。 - 创建新的
RouteSelector
,从RouteSelector
中获取Selection
, 然后从Selection
中获取Route
(Selection
和RouteSelector
后面会单独讲)。
- 获取上次需要重试的
-
RealConnection
创建链接
调用RealConnection#connect()
方法来建立连接(后面单独讲)。 -
创建成功的链接保存到
RealCall
和ConnectionPool
中
RouteSelector
和 Selection
RouteSelector
和 Selection
你可能还有点懵,RouteSelector#next()
方法就是从对应的一个 Proxy
中获取一组对应的 Route
,来描述这个 Proxy
的 Route
组的就是一个 Selection
,从 Selection#next()
方法中获取的就是一个特定的 Route
。简单来说就是从 RouteSelector
中获取 Selection
,从 Selection
中获取一个 Route
,通过 Route
就能够创建网络链接了。
我们先来看看 RouteSelector
的构造函数:
init {
resetNextProxy(address.url, address.proxy)
}
/** Prepares the proxy servers to try. */
private fun resetNextProxy(url: HttpUrl, proxy: Proxy?) {
fun selectProxies(): List<Proxy> {
// If the user specifies a proxy, try that and only that.
if (proxy != null) return listOf(proxy)
// If the URI lacks a host (as in "http://</"), don't call the ProxySelector.
val uri = url.toUri()
if (uri.host == null) return immutableListOf(Proxy.NO_PROXY)
// Try each of the ProxySelector choices until one connection succeeds.
val proxiesOrNull = address.proxySelector.select(uri)
if (proxiesOrNull.isNullOrEmpty()) return immutableListOf(Proxy.NO_PROXY)
return proxiesOrNull.toImmutableList()
}
// 通知 EventListener 开始获取代理
eventListener.proxySelectStart(call, url)
// 获取代理
proxies = selectProxies()
nextProxyIndex = 0
// 通知 Event Listener 代理获取结束
eventListener.proxySelectEnd(call, url, proxies)
}
在构造函数中会去获取代理,在开始获取代理前会通过 proxySelectStart()
方法通知 EventListener
,结束后通过 proxySelectEnd
通知 EventListener
,这里的代理列表不会为空,默认情况下没有代理的话列表中只有一个 Proxy.NO_PROXY
。代理的类型分为 Dirct
、Http
和 Socks
,而我们默认的 Proxy.NO_PROXY
对象就是 Dirct
类型,也就是不用代理。
我们继续看看 RouteSelector#next()
方法(next()
方法在最下面哈,注意看):
@Throws(IOException::class)
private fun resetNextInetSocketAddress(proxy: Proxy) {
// Clear the addresses. Necessary if getAllByName() below throws!
val mutableInetSocketAddresses = mutableListOf<InetSocketAddress>()
inetSocketAddresses = mutableInetSocketAddresses
val socketHost: String
val socketPort: Int
if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
// 如果是 DIRECT 和 SOCKS 就直接使用 url 的域名和端口.
socketHost = address.url.host
socketPort = address.url.port
} else {
// 如果是 HTTP 使用代理的 url 的域名和端口.
val proxyAddress = proxy.address()
require(proxyAddress is InetSocketAddress) {
"Proxy.address() is not an InetSocketAddress: ${proxyAddress.javaClass}"
}
socketHost = proxyAddress.socketHost
socketPort = proxyAddress.port
}
// 判断端口是否合法
if (socketPort !in 1..65535) {
throw SocketException("No route to $socketHost:$socketPort; port is out of range")
}
if (proxy.type() == Proxy.Type.SOCKS) {
mutableInetSocketAddresses += InetSocketAddress.createUnresolved(socketHost, socketPort)
} else {
val addresses = if (socketHost.canParseAsIpAddress()) {
// 这里是域名本来就是 ip 的样式,比如这样的:192.168.1.1:80,就不需要使用 dns 域名查询
listOf(InetAddress.getByName(socketHost))
} else {
// 后续执行域名查询操作
// 通知 eventListener 域名查询开始
eventListener.dnsStart(call, socketHost)
// Try each address for best behavior in mixed IPv4/IPv6 environments.
// 执行域名查询,这个 dns 是可以自定义的.
val result = address.dns.lookup(socketHost)
if (result.isEmpty()) {
// 域名查询失败
throw UnknownHostException("${address.dns} returned no addresses for $socketHost")
}
// 通知 eventListener 域名查询结束
eventListener.dnsEnd(call, socketHost, result)
result
}
// 将查询的结果新建一个 InetSocketAddress 写入到列表中
for (inetAddress in addresses) {
mutableInetSocketAddresses += InetSocketAddress(inetAddress, socketPort)
}
}
}
@Throws(IOException::class)
private fun nextProxy(): Proxy {
if (!hasNextProxy()) {
throw SocketException(
"No route to ${address.url.host}; exhausted proxy configurations: $proxies")
}
// 获取下一个代理
val result = proxies[nextProxyIndex++]
// 获取代理对应的 IP 地址
resetNextInetSocketAddress(result)
return result
}
@Throws(IOException::class)
operator fun next(): Selection {
if (!hasNext()) throw NoSuchElementException()
// Compute the next set of routes to attempt.
val routes = mutableListOf<Route>()
// 查找一个 IP 地址列表不为空的代理
while (hasNextProxy()) {
// Postponed routes are always tried last. For example, if we have 2 proxies and all the
// routes for proxy1 should be postponed, we'll move to proxy2. Only after we've exhausted
// all the good routes will we attempt the postponed routes.
// 获取代理
val proxy = nextProxy()
for (inetSocketAddress in inetSocketAddresses) {
// 通过 IP 地址构建一个新的 Route
val route = Route(address, proxy, inetSocketAddress)
if (routeDatabase.shouldPostpone(route)) {
postponedRoutes += route
} else {
routes += route
}
}
// 如果当前的代理 Route 不为空,就结束
if (routes.isNotEmpty()) {
break
}
}
if (routes.isEmpty()) {
// We've exhausted all Proxies so fallback to the postponed routes.
routes += postponedRoutes
postponedRoutes.clear()
}
// 返回结果
return Selection(routes)
}
在 RouteSelector#next()
方法中主要获取一个可用的 Proxy
然后去查询它所对应可使用的 IP
,对应的 IP
获取成功后会封装成 Route
对象,最后 Route
列表作为 Selection
对象的参数,最后返回 Selection
对象。
IP
地址的查询是通过 resetNextInetSocketAddress()
方法来处理的,这里获取域名和端口的方式取决于不同的 Proxy
类型,如果是 Dirct
和 Socks
直接用请求的 url
的域名和端口,如果是 Http
类型就用代理的域名和端口。如果域名就是 IP
地址的形式就跳过 dns
域名查询,如果不是 IP
形式,就通过 dns
(可以自定义) 执行域名查询。查询开始前后会通过 dnsStart()
和 dnsEnd()
通知 eventListener
。
到这里为止域名对应的 IP Address
就获取到了。
然后继续看看 Selection#next()
方法的实现:
operator fun next(): Route {
if (!hasNext()) throw NoSuchElementException()
return routes[nextRouteIndex++]
}
这个就很简单了,直接获取当前 nextRouteIndex
的 Route
对象。
创建链接
在 Route
对象获取到后就会通过它为参数创建一个 RealConnection
对象,然后调用对应的 connect()
方法,忘记了的同学再去看看前面的源码。我们再来看看 RealConnection#connect()
方法实现:
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(protocol == null) { "already connected" }
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
// Http 协议
if (route.address.sslSocketFactory == null) {
// CLEARTEXT 表示支持不加密明文传输,如果没有就报错
if (ConnectionSpec.CLEARTEXT !in connectionSpecs) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication not enabled for client"))
}
val host = route.address.url.host
// 判断平台是否支持 Http 明文传输,不支持就报错
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication to $host not permitted by network security policy"))
}
} else {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
throw RouteException(UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"))
}
}
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
// 创建 TCP 连接
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
// 构建 TLS/SSL 安全连接通道
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
// 通知 eventListener 连接结束
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
socket?.closeQuietly()
rawSocket?.closeQuietly()
socket = null
rawSocket = null
source = null
sink = null
handshake = null
protocol = null
http2Connection = null
allocationLimit = 1
// 通知 eventListener 连接失败
eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
if (routeException == null) {
routeException = RouteException(e)
} else {
routeException.addConnectException(e)
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
idleAtNs = System.nanoTime()
}
这里你可能对 ConnectionSpecs
有点懵,默认的 OkHttp
设置的是 MODERN_TLS
和 CLEARTEXT
,分别表示支持 TLS/SSL
加密通道和支持明文传输。如果 sslSocketFactory
为空就表示当前的请求是 http
请求,反之就是 https
请求,如果是 http
请求,但是不支持 CLEARTEXT
就会报错,平台不支持也会报错。
然后后续通过 connectSocket()
方法创建 TCP
连接,通过 establishProtocol()
方法来创建 TLS/SSL
安全通道。我们再分别看看上面的两个方法实现。
@Throws(IOException::class)
private fun connectSocket(
connectTimeout: Int,
readTimeout: Int,
call: Call,
eventListener: EventListener
) {
val proxy = route.proxy
val address = route.address
// 创建 Socket 对象,可以通过设置 SocketFactory,来自定义创建 `Socket` 过程
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}
this.rawSocket = rawSocket
// 通知 eventListener 链接开始
eventListener.connectStart(call, route.socketAddress, proxy)
rawSocket.soTimeout = readTimeout
try {
// 执行 TCP 链接
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
} catch (e: ConnectException) {
throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
initCause(e)
}
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
// 把连接好的 Socket,构建成 OkIo 中的 source (读) 和 sink (写).
source = rawSocket.source().buffer()
sink = rawSocket.sink().buffer()
} catch (npe: NullPointerException) {
if (npe.message == NPE_THROW_WITH_NULL) {
throw IOException(npe)
}
}
}
首先通过 SocketFactory
(可以自定义) 创建一个 Socket
实例,然后 TCP
连接到前面 dns
解析的地址上,连接成功后,把 Socket
的读写,分别绑定到 OkIo
的 Source
和 Sink
对象上,到这里我们的 TCP
连接就创建完成了。
然后再看看 establishProtocol()
方法构建 TLS/SSL
安全通道的代码:
@Throws(IOException::class)
private fun establishProtocol(
connectionSpecSelector: ConnectionSpecSelector,
pingIntervalMillis: Int,
call: Call,
eventListener: EventListener
) {
if (route.address.sslSocketFactory == null) {
// 如果是 http 协议,直接返回,跳过 TLS 链接创建
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
socket = rawSocket
protocol = Protocol.H2_PRIOR_KNOWLEDGE
startHttp2(pingIntervalMillis)
return
}
socket = rawSocket
protocol = Protocol.HTTP_1_1
return
}
// 通知 eventListener 安全通道开始
eventListener.secureConnectStart(call)
// 执行 TLS 安全连接创建
connectTls(connectionSpecSelector)
// 通知 eventListener 安全通道建立完成
eventListener.secureConnectEnd(call, handshake)
if (protocol === Protocol.HTTP_2) {
startHttp2(pingIntervalMillis)
}
}
由于我自己对 TLS
安全通道没有特别了解,connectTls()
方法的代码暂时不分析,说不定哪天想了解这块儿了,再单独写文章来看,暂时存档。
创建成功的链接的保存
让我们再回到 ExchangeFinder#findConnection()
方法中,看看成功创建的链接保存在了哪些地方,首先通过 ConnectionPool#put()
方法保存在了 ConnectionPool
中 (这一块儿,我们单独讲 ConnectionPool
的时候再单独讲),然后还通过 RealCall#acquireConnectionNoEvents()
方法保存在了 RealCall
中。
我们来看看 RealCall#acquiteConnectionNoEvents()
方法的实现:
fun acquireConnectionNoEvents(connection: RealConnection) {
connection.assertThreadHoldsLock()
check(this.connection == null)
// 将 Connection 保存在 RealCall 中
this.connection = connection
// 将 RealCall 保存在 Connection 中
connection.calls.add(CallReference(this, callStackTrace))
}
这个方法里面很简单,只是单纯的把 RealConnection
保存在 RealCall
中,同时 RealConnection
中也会持有 RealCall
的引用。
链接池
我们在前面分析源码的时候,在很多地方都看到了 ConnectionPool
,它的真正实现类是 RealConnectionPool
,在 RealConnectionPool
还有两个非常重要的参数,maxIdleConnections
和 keepAliveDuration
,分别表示池中最大的限制链接数量和最大的闲置线程保存时间,当超过了上面两个限制之一的链接,就会被回收。后面我们会单独分析这个机制。
我们这里来整理一下 RealConnectionPool
的关键方法。
callAcquirePooledConnection()
callAcquirePooledConnection()
方法是尝试从 RealConnectionPool
获取可用的链接然后存放到 RealCall
中,在这里要先区别一下 Http 1.1
和 Http 2
。在 Http 1.1
中,一个链接同时只能处理一个 Http
请求,而 Http 2
就可以同时处理多个 Http
请求。对应到 OkHttp
的代码中就是,在 Http 1.1
中的链接,同一个时间只能引用一个 RealCall
,而 Http 2
中的链接,同一时间就可以引用多个 RealCall
。
我们来看看源码:
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
// 遍历池中的所有链接
for (connection in connections) {
synchronized(connection) {
// 如果请求是 http2,而当前链接不是 http2 跳过。
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
// 链接如果和请求的能够对应,直接返回
if (!connection.isEligible(address, routes)) return@synchronized
// 在 RealCall 中添加这个链接
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
在上面我们看到了通过 RealConnection#isEligible()
方法来判断是否合法,我们再来看看它的源码:
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
assertThreadHoldsLock()
// If this connection is not accepting new exchanges, we're done.
// 判断当前应用的 call 的数量是否达到最大的限制,http 1.1 是 1,http2 是大于 1;还判断当前链接是否可用。
if (calls.size >= allocationLimit || noNewExchanges) return false
// If the non-host fields of the address don't overlap, we're done.
// 判断除了 url 外的 Address 参数
if (!this.route.address.equalsNonHost(address)) return false
// If the host exactly matches, we're done: this connection can carry the address.
// 最后判断域名
if (address.url.host == this.route().address.url.host) {
return true // This connection is a perfect match.
}
// 后面是 Http2 判断的代码,我省略了
// ...
return true // The caller's address can be carried by this connection.
}
这里有三个判断:
- 是否达到最大的
RealCall
引用限制,Http 1.1
为 1,Http 2
大于 1;noNewExchanges
为true
表示当前链接已经不可用。 - 判断
host
以外的Address
参数。 - 判断
host
。
只有当上面的条件都满足了就表示当前链接可以使用(省略后面的 Http 2
的判断)。
put()
在链接创建成功后,会通过该方法来将链接保存在 RealConnectionPool
中。
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
// 直接添加到列表中
connections.add(connection)
// 添加一个异步任务,清除不满足条件闲置链接
cleanupQueue.schedule(cleanupTask)
}
上面的代码很简单,我们去看看清除任务的相关代码:
// ...
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
// ...
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
// Find either a connection to evict, or the time that the next eviction is due.
// 遍历所有链接
for (connection in connections) {
synchronized(connection) {
// If the connection is in use, keep searching.
// 获取链接引用的 RealCall 数量和清除泄漏的链接
if (pruneAndGetAllocationCount(connection, now) > 0) {
// 正在使用的链接数量
inUseConnectionCount++
} else {
// 闲置的链接数量
idleConnectionCount++
// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
// 记录闲置时间最长的链接
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
} else {
Unit
}
}
}
}
when {
// 当限制的任务数量达到了最大值,或者最长的一个链接时间达到了最大值,就会执行清除
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
// We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
// 标识不可用
connection.noNewExchanges = true
// 从池中移除
connections.remove(longestIdleConnection)
}
// 关闭 Socket
connection.socket().closeQuietly()
if (connections.isEmpty()) cleanupQueue.cancelAll()
// Clean up again immediately.
return 0L
}
idleConnectionCount > 0 -> {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs
}
inUseConnectionCount > 0 -> {
// All connections are in use. It'll be at least the keep alive duration 'til we run
// again.
return keepAliveDurationNs
}
else -> {
// No connections, idle or in use.
return -1
}
}
}
判断 RealConnection
闲置的方式是检查 RealCall
的引用,如果大于 0 表示忙碌,小于 0 表示闲置。限制的数量或者最大的时间超过了限制就会触发链接的回收(默认的最大限制数量是 5,最大的超时时间是 5 min,这两个值都是可以自定义的),最后 cleanup()
方法的返回值表示下次触发回收的时间。
connectionBecameIdle()
fun connectionBecameIdle(connection: RealConnection): Boolean {
connection.assertThreadHoldsLock()
// 判断是否已经限制,或者已经被回收
return if (connection.noNewExchanges || maxIdleConnections == 0) {
// 清除目标链接
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true
} else {
// 触发回收任务
cleanupQueue.schedule(cleanupTask)
false
}
}
connectionBecamIdle()
是外部主动告诉 ConnectionPool
某个链接已经限制了,如果已经闲置就会清除,如果没有限制就会触发一次回收任务。例如 RealCall
中的请求失败了,他就可以明确这次的链接后续不会用了,然后就会调用该方法尝试回收它,然后释放资源。
evictAll()
fun evictAll() {
val i = connections.iterator()
while (i.hasNext()) {
val connection = i.next()
val socketToClose = synchronized(connection) {
// 闲置的链接
if (connection.calls.isEmpty()) {
// 直接回收,关闭。
i.remove()
connection.noNewExchanges = true
return@synchronized connection.socket()
} else {
return@synchronized null
}
}
socketToClose?.closeQuietly()
}
if (connections.isEmpty()) cleanupQueue.cancelAll()
}
该方法是直接回收关闭所有的闲置的链接,这个方法在 OkHttp
中我没有看到调用的地方,这个方法是提供给库的开发者调用的,用来提前释放池中的闲置的链接。
最后
我个人认为链接的相关部分代码是 OkHttp
中最复杂的代码,从本篇文章的篇幅也可以看出来,很多人分析 OkHttp
源码也是绕过去,当你认真了解了它的工作原理后,我相信你一定会有所收获,如果看不懂就多看几遍,下一篇文章继续分析系统拦截器的源码。
网友评论