OkHttp 4.2.2 源码分析
原文链接
注意:OkHttp 3.12.0 以上版本需要 Android 21+,否则会奔溃,这个问题之前遇见过,但是具体什么错误因为当时没有记录所以遗忘了,但是这里分析的代码还是采用当前最新的版本 4.2.2,并且这个版本的代码已经采用了 Kotlin 了。
基本使用
val okHttpClient = OkHttpClient.Builder().build()
val request = Request.Builder().get().url("https://www.baidu.com").build()
val call = okHttpClient.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.d("OkHttpClient", e.toString())
}
override fun onResponse(call: Call, response: Response) {
Log.d("OkHttpClient", response.toString())
}
})
以上就是 OkHttp 的基本使用,里面涉及到了 OkHttpClient、Request、Call、Callback 对象,为了最快的理解 OkHttp 是如何工作的我们可以从 call.enqueue() 入手,但是 Call 仅仅是一个接口,所以我们要看看它的实现类和这个实现类是怎么来的。
一、 OkHttpClient.newCall
// OkHttpClient.kt
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}
// RealCall.kt
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
仅仅是调用了 RealCall 的方法,把 OkHttpClient 和 Request 对象当作参数,新建了一个 RealCall,然后初始化 Transmitter(发射器)对象。
二、RealCall.execute
我们发起请求的方式有同步和异步两种,execute 就是同步请求,而 enqueue 为异步请求。
override fun execute(): Response {
//1、
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
//2、
transmitter.timeoutEnter()
//3、
transmitter.callStart()
try {
//4、
client.dispatcher.executed(this)
//5、
return getResponseWithInterceptorChain()
} finally {
//6、
client.dispatcher.finished(this)
}
}
-
首先通过 executed 判断是否已经调用过,如果多次调用则抛出异常。
-
请求超时的判断处理。
-
在初始化 OkHttpClient 的时候我们可以创建 EventListener.Factory 对象,为每个 RealCall 对象创建监听对象,在 transmitter.callStart 中会调用 EventListener.callStart。
-
调用 Dispatcher 对象的 executed 方法,把 RealCall 传入,事实上仅仅是加入到一个列表当中。
@Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) }
-
getResponseWithInterceptorChain 是真正开始处理请求的地方,这里我们后面单独讲。
-
请求结束后从列表中移除,如果发现 Dispatcher 空闲还会调用空闲的回调,
三、RealCall.enqueue
enqueue 为异步请求。
// RealCall.kt
override fun enqueue(responseCallback: Callback) {
//1、
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
//2、
transmitter.callStart()
//3、
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
- 防止重复调用。
- 事件开始分发。
- 与同步调用不同的地方,这里把我们使用的时候传入的 Callback 对象,进一步用 AsyncCall 封装,然后调用 Dispatcher.enqueue 方法。
//Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//4、
readyAsyncCalls.add(call)
if (!call.get().forWebSocket) {
//5、
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//6、
promoteAndExecute()
}
- 把 AsycCall 对象添加到等待队列。
- 如果不是 WebScoket 的请求(一般都不是),会进一步从正在执行的和等待中的异步请求列表中查找 Host 相同的请求,把它的 callsPerHost 对象的引用拷贝过来,用来统计相同 Host 的请求数。reuseCallsPerHostFrom 方法实现见下。
// RealCall.kt 中内部类 AsyncCall
@Volatile private var callsPerHost = AtomicInteger(0)
fun callsPerHost(): AtomicInteger = callsPerHost
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
- promoteAndExecute 方法见下。
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 7、
while (i.hasNext()) {
val asyncCall = i.next()
//到达最大请求数,默认为 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//到达单个 host 最大请求数,默认为 5
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 8、
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
- 遍历等待执行的异步请求列表 readyAsyncCalls,将符合要求的请求收集到 executableCalls 列表中,同时在 readyAsyncCalls 中移除。这里有两种情况为暂时不处理的请求:一个是到达单个 host 最大请求数,另一个是总体请求到达最大请求数。
- 遍历收集到的请求,调用 AsyncCall.executeOn,参数为线程池。
四、AsyncCall.executeOn
AsyncCall 实现了 Runnable 接口,这里又把 executorService 传入,不难想到后面的逻辑就是用线程池执行 Runnable 的 run 方法,下面我们来看下。
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
//1、
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
//2、
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
//3、
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
- 用线程池执行,后面马上就会看 run 的实现。
- 异常处理,并分发。
- 异常结束的结束处理。
override fun run() {
// 4、
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
// 5、
transmitter.timeoutEnter()
try {
// 6、
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
// 7、
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
// 8、
client.dispatcher.finished(this)
}
}
}
- 修改线程名,在执行完内部代码之后修改回来,赞美 kotlin 的便捷性。
- 请求超时的判断处理。
- 殊途同归的 getResponseWithInterceptorChain,后面分析,紧跟着调用回调。
- 异常处理,以及回调。
- 处理结束把 AsyncCall 从 Dispatcher 的列表中移除,相同 Host 的请求计数减去一。
五、getResponseWithInterceptorChain
OkHttp 以责任链模式通过拦截器把需要进行的网络请求进行了责任的隔离,各个拦截器承担着各自的职责。
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 1、
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 2、
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
// 3、
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
- 把各个拦截器添加到列表,包括:自定义拦截器,重试重定向拦截器,桥接拦截器,缓存拦截器,连接拦截器,自定义网络拦截器,网络请求拦截器。
- 把前面的拦截器列表封装成 RealInterceptorChain 对象。
- 调用 RealInterceptorChain 对象的 proceed 方法,获取 Response 对象,最后进行一些收尾的工作。
// RealInterceptorChain.kt
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
// 省略一些检查代码
calls++
// Call the next interceptor in the chain.
//4、
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
//5、
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
return response
}
-
新建 RealInterceptorChain 对象,把当前的对象的参数传入,index + 1。
-
获取当前 index 的拦截器,调用 intercept 方法,传入步骤 4 新建的 RealInterceptorChain 对象,返回结果 Response。
那么这样能清楚拦截器的调用过程了,每个拦截器的 intercept 会被 RealInterceptorChain.proceed 触发,然后接收到一个新的 RealInterceptorChain 对象,只要在自身的 intercept 方法中调用接收到的对象的 proceed 方法,就能触发下一个动作,拿到一个 Response 对象。这样层层调用,就能走完我们一开始收集的拦截器列表中所有的拦截方法。这个就是核心思想。
六、拦截器
1、重试重定向拦截器 RetryAndFollowUpInterceptor
重试重定向拦截器,主要是请求失败或者请求需要重定向的时候进行新的请求。
// RetryAndFollowUpInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
var request = chain.request()
val realChain = chain as RealInterceptorChain
val transmitter = realChain.transmitter()
var followUpCount = 0
var priorResponse: Response? = null
// 循环操作
while (true) {
//1、
transmitter.prepareToConnect(request)
if (transmitter.isCanceled) {
throw IOException("Canceled")
}
var response: Response
var success = false
try {
//2、
response = realChain.proceed(request, transmitter, null)
success = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
// 路由错误,如果不可以恢复则直接抛出异常
if (!recover(e.lastConnectException, transmitter, false, request)) {
throw e.firstConnectException
}
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
//
val requestSendStarted = e !is ConnectionShutdownException
// IO 错误,如果不可以恢复则直接抛出异常
if (!recover(e, transmitter, requestSendStarted, request)) throw e
continue
} finally {
// The network call threw an exception. Release any resources.
// 3、
if (!success) {
transmitter.exchangeDoneDueToException()
}
}
// Attach the prior response if it exists. Such responses never have a body.
// 4、第一次 priorResponse 肯定是空的,如果经过重试的话则会把之前的 Response 赋值给新的 Response
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = response.exchange
val route = exchange?.connection()?.route()
// 5、
val followUp = followUpRequest(response, route)
// 6、
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
transmitter.timeoutEarlyExit()
}
return response
}
val followUpBody = followUp.body
// 7、一次性请求的话直接返回
if (followUpBody != null && followUpBody.isOneShot()) {
return response
}
response.body?.closeQuietly()
// 8、
if (transmitter.hasExchange()) {
exchange?.detachWithViolence()
}
// 9、次数判断,最大为20
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 10、
request = followUp
priorResponse = response
}
}
-
transmitter 对象是在创建 RealCall 的时候初始化的,判断是否有可以重用的连接或者对没用的连接的回收,如果没有则创建新的 ExchangeFinder 对象。
-
realChain.proceed 调用到下一个拦截器,该行代码之后都是请求回来后的处理。
-
catch 和 finally,catch 中主要是出现异常后,判断是否可以重试,如果可以就 continue 否则抛出异常并进入 finally 做回收操作。稍微看下 recover 方法。
private fun recover( e: IOException, transmitter: Transmitter, requestSendStarted: Boolean, userRequest: Request ): Boolean { // 应用层面不允许重试,我们可以在 OkHttpClient 初始化时候设置 if (!client.retryOnConnectionFailure) return false // 已经发起过请求,且只允许发送一次 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false // 一些致命问题,比如证书认证错误等 if (!isRecoverable(e, requestSendStarted)) return false // 没有更多的路由去尝试 if (!transmitter.canRetry()) return false // For failure recovery, use the same route selector with a new connection. return true }
-
第一次 priorResponse 肯定是空的,如果经过重试的话则会把之前的 Response 赋值给新的 Response 的 priorResponse 成员变量,并且显示的将 priorResponse 的 body 置空。这个对象的赋值意义是什么?其实作用是在步骤 5 中,作为停止重试的判断使用:如果先后两次 Http 请求结果获取的状态码都是 408(请求超时)或者都是 503(服务不可用)则直接停止重试。
-
followUpRequest 根据请求回来的 Response 的响应码构建新的 Request,其中就包含了 301,302 等重定向的处理,步骤 4 提到的处理,以及我们一开始初始化 OkHttpClient 对象的是否允许重试的处理等。如果需要重新请求则 followUp 对象不为空,否则为 null 停止继续请求。
-
如果 followUp 为 null 则可以直接返回请求结果,这里还涉及到一个 Exchange 对象,这个我们后面再讲。
-
一次性请求的话直接返回。
-
走到这里表示,我们需要跟进一个新的请求,但是之前请求的一些操作还没有结束,则需要在这里停止。
-
重试次数判断,最大为20。
-
更新 Request 对象为新生成的对象,priorResponse 赋值(作用在步骤 4 中已经说明),进入下一次循环。
2、桥接拦截器 BridgeInterceptor
在发起请求之前,用应用层发起的请求将 Http 请求需要的请求头填充完整,在请求之后,将 Http 响应解析为应用层的响应。
// BridgeInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
// 1、
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
// 2、
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 3、
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
// 4、
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 5、
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
- 如果 Request.body 不为空,则添加 Content-Type、Content-Length 或者 Transfer-Encoding 请求头。
- 添加 Host、Connection 请求头,如果支持压缩还需要添加 Accept-Encoding:gzip,相应的需要在请求回来之后进行解压的操作。
- CookJar 是对 cookie 的支持,但是它默认是一个空实现(具体可以看 OkHttpClient.Builder 中的代码),如果我们需要则可以自己实现一个 CookJar 来保存和读取 cookie。
- 这一步,以及之后的逻辑都是响应后的处理了,这里首先是 cookie 的保持处理,默认是没有的。
- 如果之前请求的时候在请求头中添加了 Accept-Encoding:gzip,而且响应头中也的确说明了是 gzip 的,则进行解压。
3、缓存拦截器 CacheInterceptor
主要是用于从缓存读取和写入请求所对应的响应。这里缓存的策略事实上和 Http 的缓存机制有很大的关联。
// CacheInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// 1、
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 2、
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 3、
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// If we don't need the network, we're done.
// 4、
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
// 5、
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
// 6、
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
cacheResponse.body?.closeQuietly()
}
}
// 7、
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
- Cache 对象可以在 OkHttpClient 初始化的时候构建,里面是通过 DiskLruCache 进行缓存的,以 url 生成 key 在缓存中查找到目标之后重新构造成 Response,然后进一步对比 Request 和从缓存恢复 Response,如果没命中则会返回 null,否则返回 Response。
- 获取缓存策略,根据策略的 networkRequest 和 cacheResponse 是否为 null 会有不同的处理。具体缓存策略的逻辑稍后再讲。
- 如果 networkRequest 和 cacheResponse 都为 null,则不再进行网络请求,直接构造失败的 Response, 结束。
- 如果只有 networkRequest 为 null,则表示缓存可用,结束。
- 如果 networkRequest 不为 null,则进行网络请求。
- 从这里开始是网络请求回来后的逻辑,当 cacheResponse 不为 null 且 networkResponse 的相应码为 304(客户端有缓存。http 请求时候发现自己缓存的文件有 Last Modified ,那么在请求中会包含 If Modified Since,服务器通过对比,如果没有改变,就会返回 304,响应体就不需要继续发送了以此减少带宽的消化。),则合并两个响应头,更新缓存,结束。
- 如果 cache 不为 null,验证 networkResponse 是否需要缓存,并缓存,结束;否则返回结果,结束。
CacheStrategy
前面讲到了缓存策略,事实证明返回的 CacheStrategy 的 networkRequest 和 cacheResponse 直接影响到了后续的调用逻辑。缓存策略是怎么产生的,主要还是要看 CacheStrategy.compute 方法:
//CacheStrategy.kt
fun compute(): CacheStrategy {
val candidate = computeCandidate()
// We're forbidden from using the network and the cache is insufficient.
// 如果获得的 networkRequest 为 null,且只支持从缓存读取,那么就可以直接构造失败的响应了
// 对应前面步骤 3
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
// 缓存没有命中,发起网络请求,对应前面步骤 5
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// 请求为 https,缓存没有握手信息,发起网络请求,对应前面步骤 5
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// isCacheable 中对 cacheResponse 的响应码以及 request 和 cacheResponse
// Cache-Control 头做了判断,如果不满足,则对应前面步骤 5
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
val requestCaching = request.cacheControl
// hasConditions 中对请求头中是否存在 If-Modified-Since 和 If-None-Match 做了判断,
// 事实上如果有这两个字段,也可以发起请求,如果服务端验证没有过期,则只发送响应头回来
// 对应步骤 5 以及后续验证的步骤 6
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
// 有缓存,不新鲜则进行标记,但是没必要发起网络请求,对应步骤 4
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
// 对应 cacheResponse 响应头中的 ETag 字段,
// 如果不为空则给新的请求头添加 If-None-Match
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
// 对应 cacheResponse 响应头中的 Last-Modified 字段,
// 如果不为空则给新的请求头添加 If-Modified-Since
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
// 对应 cacheResponse 响应头中的 Date 字段,
// 如果不为空则给新的请求头添加 If-Modified-Since
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
// 如果不满足以上情况,则直接发起请求,对应步骤 5
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
// 如果有满足上面 when 的条件,则添加进相应的请求头中,对应步骤 5以及后续验证的步骤 6
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
那么分析到这里,了解过 Http 缓存机制的同学应该也发现了,这里的代码和 Http 缓存机制完全一致。而且缓存策略中的处理,也和缓存拦截器里面的操作都能相互对应上。
4、连接拦截器 ConnectInterceptor
用于开启一个连接。
// ConnectInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// 1、
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
- 当请求方式不是 GET 的时候需要更全面的检查,获取 Exchange 对象(就叫他转换器),transmitter 之前提到过在创建 RealCall 的时候创建的。
//Transmitter.kt
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
// 2、
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
// 3、
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
// 4、
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
- noMoreExchanges 如果为 true 表示 Transmitter 已经要释放不再接受新的处理,exchange 不为 null 表示之前的响应还没有关闭。
- 通过重试拦截器里面初始化的 ExchangeFinder 对象,查找或者新建一个健康的连接并封装到一个 ExchangeCodec(编码器)返回,根据 http 的不同版本,编码器也不同。
- 将 ExchangeCodec 编码器等参数封装到 Exchange 对象返回,通过最终通过 RealInterceptorChain.proceed 方法传递到下一个拦截器(如果我们有自定义的网络拦截器那么就会被调用到,如果没有则到了最后一个拦截器:网络请求拦截器)。
如何获取一个健康的连接?
连接拦截器自身的代码看起来很简洁,但核心点是怎么查找连接。所以我们还需要深入看下 ExchangeFinder.find 方法。
// ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {
// 省略一些代码
try {
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
return resultConnection.newCodec(client, chain)
}
// 省略一些代码
}
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
// 循环查找可用的连接
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 如果成功的次数是0,表示找到的连接为新的,就不需要做全面的检查直接返回
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
// 检查连接池里面的连接是否可用,如果可用则直接返回,否则从连接池中移除,进入下一次循环查找
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
}
return candidate
}
}
// ExchangeFinder.kt
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
// 如果已经通过发射器标记了取消,则直接抛异常退出
if (transmitter.isCanceled) throw IOException("Canceled")
// 新尝试的标记
hasStreamFailure = false // This is a fresh attempt.
// 第一次进来应该为 null,因为在循环中所以可以先看下边
releasedConnection = transmitter.connection
// 如果不为空,且持有的连接被标记不能做更多的处理,则释放资源(怎么释放呢?不着急,我们先看怎么获取)
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
if (transmitter.connection != null) {
// 如果能走到这里表示,是复用的连接,而且没有被上面的操作回收掉,即 noNewExchanges 为 false
result = transmitter.connection
releasedConnection = null
}
if (result == null) {
// result 为 null 需要尝试从连接池里面获取一个
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
// 走到这里表示已经从连接池里面获取到
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
// 标记1、第一次应该不会到这里,目前看不明白,我们一会回来看就清楚了
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
// 第一次应该不会到这里
selectedRoute = transmitter.connection!!.route()
}
}
}
// 关闭上面需要释放的 socket
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 复用的或者是刚刚从连接池里面查找到的
return result!!
}
// 创建一个路由选择器,第一次进来的话会从这里获取
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
// 如果我们没有自己给 OkHttpClient 设置额外的 Proxy 或者 ProxySelector 的话,
// routeSelector 创建的时候,会走到 sun.net.spi.DefaultProxySelector 获取 Routes
// next 里面会把 Routes 依次通过 dns 解析地址之后重新封装成 Route,
// 再经过之前是否存在失败的情况筛选,
// 最终保存在 Selection 对象里,返回。
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// 通过上一步创建 routeSelection 的操作,我们获取了一个 ip 地址的集合
// 再次尝试从连接池中匹配是否有合适的连接
// 与上一次不同的是 routes 对象不为空
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
// 连接池没有找到,那么我们从路由选择中选中一个路由
selectedRoute = routeSelection!!.next()
}
// 创建一个全新的连接,
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// 如果是复用的那么到这里就结束了,如果是新建的还需要进行其他的操作
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// 新建的连接,建立 TCP 连接,如果是 Https 那么还需要进行密钥的协商
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
// 从失败的路由集合中移除成功的路由
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// 最终再次从连接池中获取可以多路复用的连接,注意和之前两次不同的是最后一个参数为 true
// 如果有多个并发的连接到同一个 host 这时候后来的就能找到前面的
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 走到这里,就表示可以从连接池中获取的现成的,那么刚刚新建的就可以回收掉了
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
// 有可能刚获取到的之后马上这个连接就不可用了,那么我们把之前能成功连接的路由暂时保存一下
// 见前面注释中标记 1 的位置
nextRouteToTry = selectedRoute
} else {
// 没有在连接池中找到,那么我们就把新建的添加进去
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面的步骤比较多,但是已经详细注释了,有些地方可以第一次不能理解,但是继续看下去就能发现什么情况才能调用到了,毕竟整个过程在一个循环中。
下面单独提一下 connectionPool.transmitterAcquirePooledConnection、transmitter.acquireConnectionNoEvents 以及 transmitter.releaseConnectionNoEvents 三个方法,分别是 transmitter 从连接池获取连接,取得后的绑定,和释放的过程。
// RealConnectionPool.kt
fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
assert(Thread.holdsLock(this))
// 遍历所有连接
for (connection in connections) {
// 如果是多路复用,但是连接不支持则跳过
if (requireMultiplexed && !connection.isMultiplexed) continue
// 检查是否合适,不合适则跳过
if (!connection.isEligible(address, routes)) continue
// 找到了,绑定一下
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
// Transmitter.kt
fun acquireConnectionNoEvents(connection: RealConnection) {
assert(Thread.holdsLock(connectionPool))
check(this.connection == null)
this.connection = connection
// 处理上很简单,把 Transmitter 自身用弱引用添加进 RealConnection 的集合中
connection.transmitters.add(TransmitterReference(this, callStackTrace))
}
fun releaseConnectionNoEvents(): Socket? {
assert(Thread.holdsLock(connectionPool))
// 从 RealConnection 的请求集合中查找的自己的索引,然后移除并置空 this.connection 的引用
val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
check(index != -1)
val released = this.connection
released!!.transmitters.removeAt(index)
this.connection = null
// 进一步判断 RealConnection.transmitters 已经空了,则让连接池把这个连接移除
// 最终返回 socket 等待回收资源
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime()
if (connectionPool.connectionBecameIdle(released)) {
return released.socket()
}
}
return null
}
通过前面三个方法,发射器对象通过持有连接的引用,然后持有的请求就会在这个连接处理;
而连接很可能是处理多个请求的,所以用集合保存了发射器对象的弱引用;
而每个请求完成的时候那么就需要从这个弱引用集合中移除,当集合中所有的发射器对象都请求完毕之后,那么就可以考虑从连接池中移除这个连接释放资源了。
5、网络请求拦截器 CallServerInterceptor
开始真正的向服务器发送请求,读取响应,数据交换。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange()
val request = realChain.request()
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 1、
exchange.writeRequestHeaders(request)
var responseHeadersStarted = false
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 2、
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseHeadersStarted = true
exchange.responseHeadersStart()
responseBuilder = exchange.readResponseHeaders(true)
}
// 3、
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// 4、
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
// 5、
exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart()
}
if (responseBuilder == null) {
// 6、
responseBuilder = exchange.readResponseHeaders(false)!!
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// 7、
response = exchange.readResponseHeaders(false)!!
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// 8、
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
// 9、
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
// 10、
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
- 写入请求头。
- 如果不为 GET 或者 HEAD 请求,而且请求体不为空,检查到 Expect: 100-continue 包含在请求头中,则先发送请求头,暂时不发送请求体。等待读取响应头,如果这里得到服务器的响应码为 100,则获得的 responseBuilder 为 null,否则不为 null。
- responseBuilder 为 null 表示服务器响应 100,那么我就可以继续发送请求体(先发响应头的操作就是为了减少带宽消耗),ps:暂不讨论请求体支持双工的情况,因为没有看到支持双工的子类。
- 根据步骤 2、3 那么现在就可以开始发送请求体了。
- 走到这一步,表示在步骤 2 中 Expect: 100-continue 的请求没有被服务器同意,那么就不发送请求体,并标记请求完成,针对不可以多路复用的连接则直接标记使用完成。
- 没有响应头的,就再次读取响应头,经历过步骤 5 的不会走到这里。
- 如果步骤 6 中读取的响应码是 100,就直接尝试读取真正的响应。
- 如果是 WebSocket 且响应码为 101(升级协议),则给一个空的响应体,准备升级协议。
- 解析响应体的类型、长度以及准备字节流。
- 如果请求或者响应头里面包含 Connection:close,则标记连接使用完毕,防止被重用。
- 针对响应码为 204(没有新文档)205(没有新内容),但是内容长度又大于 0 的响应,直接抛出异常。
小结
没有小结,毕竟是源码分析,看完了自己能理清流程才是真的收获,不能建立整体的概念,太注重别人的总结的话最终会忽略很多细节。
结语
简单分析了 OkHttp 的调用流程,以及各个拦截器的实现,还有很多细节没有提到,如果有兴趣可以自己再钻研一下,复杂的东西拆解了就不会那么困难。很多时候阅读源码第一次阅读可能会的复杂,先粗略的了解建立整体的轮廓,再各个击破才是阅读源码的方法。
另外,喜欢的同学,觉得对自己有帮助的同学,务必请花一点点时间帮我点个赞!点赞之交淡如水,但这个很重要!
网友评论