OkHttp已经很出名,这里就不啰嗦了,直接进入主题,以下代码演示均使用Kotlin。
OkHttp版本:4.9.0
我们先从发起一个简单的请求来说起。
val url = "https://api.github.com/users/huangcv/repos"
val client = OkHttpClient()
val request = Request
.Builder()
.url(url)
.build()
client.newCall(request)
.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
println("Response status code:${response.code}")
}
})
结果如下:
一个简单的请求就结束了。
接下来我们简单的来分析下源码,先从最近的地方开始着手看源码,就是
.enqueue()
这里,我们点击去发现是一个接口,然后退出来找上一个节点的代码,就是.newCall()
这里,看创建了一个怎样的Call
对象。
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
我们在跟到RealCall
对象里面。
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
......
}
构造函数中第一个参数,就是我们最开始创建的那个OkHttpClient
,这个类就相当于一个大总管,一些通用的配置都在这里,比如读写超时时间,设置代理,缓存等都是在这个类中设置的。
第二个参数:Request,初始的请求,就是我们newCall
传进来的Request,而且看它的名字是originalRequest,因为从我们发起请求到最终将一个请求发送出去这个过程中Request会被包装好几层。
第三个从参数:是否是WebSocket,默认false,不是WebSocket请求。
所以到这一步我们就清楚了,newCall
的作用就是创建一个RealCall
对象,将OkHttpClient和Request传递进去。
接下来我们来看下RealCall具体干了什么?
还记得我们第一次从 enqueue
函数点进去的吗,因为里面是一个接口,所以我们才来到了newCall
,因为RealCall实现了Call这个接口,接下来我们就要看下RealCall实现了Call之后的enqueue
函数了。
点进去可以看到:
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
第一行检查这个call是否被执行过了
第二行:callStart(),我们点进去看下;
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
第一行用于跟踪错误,用于错误分析。
第二行:eventListener.callStart(this),事件监听器的回调函数。
这里没有重要的信息,我们再回过去看第三行代码:
client.dispatcher.enqueue(AsyncCall(responseCallback))
这句话是重点,我们需要看client的dispatcher的enqueue函数干了什么,并且这里还传入了一个AsyncCall对象。
我们先看这个dispatcher是个什么东西?
点进去:
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
我们在点进去,到dispatcher类中看下:
/**
* Policy on when async requests are executed.
*
* Each dispatcher uses an [ExecutorService] to run calls internally. If you supply your own
* executor, it should be able to run [the configured maximum][maxRequests] number of calls
* concurrently.
*/
class Dispatcher constructor() {
......
}
我们看到类描述,dispatcher是用来做线程调度用的,因为同一时间我们可能发起了好几个请求,因为每一个请求都是一个线程,也就是说线程的执行,就代表着请求的开始,所以实质上dispatcher是对线程的调度管理,从他执行过程中用的是ExecutorService就可以进一步说明,dispatcher是一个多线程调度管理器。
也就是说我们在分析dispatcher里面的代码,基本上都是关于调度的。
对于Dispatcher我们可以简单来看下:
@get:Synchronized var maxRequests = 64
可以同时进行请求的最大数量,超过这个数量,就要进行等待。
@get:Synchronized var maxRequestsPerHost = 5
每个主机下同时进行请求的最大数量,如果通过主机超过这个数量,就需要等待。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
Diapatcher 底层对多线程的管理工具,使用的是Java自带的多线程管理工具。
好了我们现在知道Dispatcher是干什么用的了,之后我们再看下dispatcher.enqueue()干了什么?
点进去看下:
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
可以看到最关键的就是两句代码,中间那一坨注释已经说的很明白了:就是针对同一个主机,检查当前正在执行的请求数量,是否超过最大请求数,这个当前正在请求的请求数对于每个请求都是共享的。
- readyAsyncCalls.add(AsyncCall)
- promoteAndExecute()
第一句是将我们传进来的AsyncCall对象添加到一个双向队列中,这里队列中存放的都是准备要执行但是还没有执行的任务。
再看第二句:promoteAndExecute(),从字面意思来看执行异步任务。
来看下具体的代码:
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
代码稍微多了一点,其主要功能就是:
将上一步已经准备好但还没有执行的队列中的请求都拿出来遍历一遍,筛选出可以执行的请求,然后用executorService来执行这些所有可执行的请求。
我们具体来看下:
首先从AsyncCallDeque队列中筛选出可执行的请求,条件是没有超过最大请求数(maxRequests
),并且所请求的主机没有超过最大请求数(maxRequestsPerHost
),满足这两个条件,该请求即满足可执行请求的条件,然后加入到可执行请求的集合中(executableCalls
),并且添加到正在执行请求的队列中(runningAsyncCalls
),用于后面做记录用的。接下来就是拿着可执行的请求的集合(executableCalls
)遍历执行这些请求并执行,怎么执行呢,把这些请求都丢给ExecutorService来进行执行,也就是调用AsyncCall
的executeOn函数。
点进去看下:
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
代码并不多,我们来简单分析下executeOn函数。
关键代码:executorService.execute(this),也就是说到这里线程发生了切换,不管之前的线程是什么,到这里统一都是后台线程。
至于后面的都是做一些扫尾工作,比如请求执行的过程中是否报错,报错了进行状态的回调以及将该请求标记为请求完成。这就不细聊了。
我们接下来看:executorService.execute(this)这句代码,既然传入的是this,那么它一定实现了Runnable接口,所以我们就找AsyncCall中的run函数。如下:
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
关键代码:val response = getResponseWithInterceptorChain(),也就是说在这里我们可以直接拿到请求的响应。接下来就是对响应的一些处理,比如说:responseCallback.onResponse(this@RealCall, response),回调请求响应,即成功。
responseCallback.onFailure(this@RealCall, e),出现异常了回调失败。
这里的responseCallback就是我们最开始从client.newCall(request).enqueue(Callback)
传递进来的Callback。
接下来最终执行
client.dispatcher.finished(this)函数,将该请求标记为完成。
经过上述步骤之后也就意味着整个请求结束了。
结束了吗?显然没有,因为我们还没有看是如果拿到Response的,具体干活的地方还没看呢。
接下来我们就要进入到另一个全新的阶段,根据请求获取响应,这也是每一个请求最最最重要的地方,同样这样是OkHttp最最最重要和硬核的部分。
在说getResponseWithInterceptorChain()
函数之前,我们有必要先了解下OkHttp的各种配置,这对后面理解getResponseWithInterceptorChain()
有很好的帮助。
OkHttp配置简单介绍:
- dispatcher 多线程调度管理器,前面我们已经说过了,这里就不细说了。
- connectionPool 连接池,可以类比线程池。连接池内部存储了一批的连接,当需要用到连接的时候而不是先创建,而是先看下池中是否有可重用的,如果没有再行创建,达到快速使用和创建,如果一个连接不再被使用的时候可以配置时间来进行回收。池的作用就是批量管理对象,通过重用和自动回收达到一种性能和资源占用的动态平衡。
- interceptors 是一个拦截器的集合。
- networkInterceptors 也是一个拦截器的集合。
- eventListenerFactory 用来生产EventListener的一个工厂,这个EventListener我们前面已经说过了,它是OkHttp各个过程和节点的事件监听器,通过设置EventListener我们可以监听到,请求过程中的各个节点的事件。
- retryOnConnectionFailure 当连接失败或者请求失败时是否重连,默认是true,失败重连。
- authenticator 自动进行认证修正的工具。
- followRedirects 是否跟踪重定向,默认为true,当响应码为重定向301/302时,如果为true时,将会重新进行重定向之后的请求,然后将结果返回,如果为false时,将直接回调,请求结束。
- followSslRedirects 当上面followRedirects为true的时候,当出现重定向时发生协议切换时,是否继续新的协议的请求,默认为true。
- cookieJar 用来存储Cookie,默认都是空实现,如果需要保存Cookie,需要自己实现。
- cache 缓存。
- dns 域名系统(Domain Name System),用于解析域名。最终使用的还
是Java的InetAddress.getAllByName(),这里不再细说了。 - proxy 代理。分为:直连,HTTP以及Socket。
- proxySelector 代理选择器。从一个列表中选择可使用的代理,如果没有设置proxy,默认为直连。
- proxyAuthenticator 代理独立的验证机制,当代理出现需要授权时,通过这个来进行给代理授权。
- socketFactory HTTP本质的连接就是建立Socket,所建立的Socket就是从这里的工厂创建的。
- sslSocketFactoryOrNull 同上个同理,当我们需要建立TLS/SSL加密连接时的Socket对象就是从这个sslSocketFactory来创建的。
- x509TrustManager HTTPS 建立连接时的证书验证器,X509是证书的一种格式,所有的HTTPS连接使用的都是X509格式的证书。
- connectionSpecs 连接规范/标准。就是HTTPS在建立连接时,客户端需要给服务器发送支持的TLS版本
以及支持的加密套件(对称加密,非对称加密,哈希等)。 - protocols 支持的HTTP协议的版本号,HTTP1.0,HTTP1.1,SPYD3.1,HTTP2,H2_PRIOR_KNOWLEDGE。
- hostnameVerifier 主机名验证器,属于证书验证机制中的,我们客户端验证证书主要是验证:1.证书的合法性,使用X509TrustMangaer来验证 2.是否是所访问的网站的证书,这里的第二步的验证就是使用HostnameVerifier来验证的。
- certificatePinner 证书强制验证的内容,就是自己写的代码,增强了验证的逻辑性。如果certificatePinner的验证不通过,即是证书验证是合法的,整个证书的验证流程也不会通过。
- certificateChainCleaner 这个也是证书验证机制中的,它的功能主要是操作X509TrustManager 验证证书的合法性。
- callTimeoutMillis 呼叫超时,但是完整的呼叫是没有超时的,但是呼叫中的连接,读写是有可能超时的。
- connectTimeoutMillis 连接超时。默认10s。
- readTimeoutMillis 连接的读操作超时时间,默认10s。
- writeTimeoutMillis 连接的写操作超时时间,默认10s。
- pingIntervalMillis 心跳间隔,这个是针对WebSocket和HTTP2来说的,默认不发送心跳。
经过以上部分的铺垫,我们接下来就要进入最最最硬核的部分了,有了以上部分的铺垫,我们能更好的理解getResponseWithInterceptorChain
这个函数里面的东西。
不废话,开始。
我们点进去看下 getResponseWithInterceptorChain
这个函数的具体代码如下:
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
猛一看,我k,代码有点多啊,不过没关系,老规矩,把代码罗列一些,分分类,就会很清楚了。
第一部分,合并Interceptor,把默认和自定的Interceptor都合并到一起。
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
第二部分,构建真正的InterceptorChain,对第一步的Interceptors进行封装和管理,为接下来的链式工作做准备。
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
第三部分,执行第一个Interceptor,让整个InterceptorChain开始转起来,当执行完第一步合并的所有Interceptor时,整个流程就结束了,此次的请求也结束了,就拿到了响应,这么神奇?可是没看到干什么啊?那是因为封装的太好了。我们接下里仔细来看下这些代码。
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
在看之前,我们下来了解下说明是链式工作,它的模型如下:
Chain
一个接一个的从左往右执行到最后一个,然后再从右往左一个一个的执行回来。就是每一个Interceptor都有前置工作,中置工作和后置工作,而且最后一个节点有点特殊,因为它没有中置工作,所以导致它的前置和后置工作合在一起了,就只剩下一个工作。这个模型也很容易加入一些节点,比如想在最前面加入一个初始化工作的节点,将准备好的Interceptor直接放到第一个位置,然后开始执行即可。
也就是说我们将第一部分的那一坨Interceptor看完,整个请求->响应的过程也就都明白了。
好了接下来我们开始详细看下第一部分的各个Interceptor。
因为client.interceptors
和 client.networkInterceptors
都是用户自定义的Interceptor,这里就没必要看了,我们只看内置的Interceptor,而且我们只需要弄明白每一个Interceptor的前置和后置工作即可。
-
RetryAndFollowUpInterceptor
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } }
点进
RetryAndFollowUpInterceptor
看下它的intercept
函数,我k,代码有点多,不要慌,稳住,按前,中,后置分开来看就好了。-
前置工作。
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
这里我们先放一放它的前置工作,我们先分析中置和后置工作。因为从它的名字或者代码中的while(true)可以看出来,它是做连接失败重连和重定向跟踪的功能,我们仔细想一下它的工作流程就能明白它的工作具体是干什么的。
-
中置工作
realChain.proceed(request)
交棒给下一个Interceptor。
-
后置工作
try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) }
代码中除了
response = realChain.proceed(request);
这句代码之外,剩下的都是它的后置工作。
它的后置工作分为两类,第一个各种异常和ResponseCode->3XX,都需要continue
,再转一次;第二类请求成功,直接return response
,结束循环。
接下来我们具体看下整个后置工作:
!recover(e.lastConnectException, call, request, requestSendStarted = false)
这句代码的意思就是出现异常的情况下,判断当前状态是否可以继续重试发起请求,我们点进去看下具体逻辑:private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { // The application layer has forbidden retries. // 用户是否设置了连接失败重连,默认为true,我们在之前的OkHttpClient中已经分析了这个配置了。 if (!client.retryOnConnectionFailure) return false // We can't send the request body again. //以下情况是不可以重连的,已经开始发送请求并且该请求只能发送一次 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false // This exception is fatal. // 根据异常类型判断是否可以恢复重连 if (!isRecoverable(e, requestSendStarted)) return false // No more routes to attempt. // 当连接失败时判断是否还有其他线路可以重连 if (!call.retryAfterFailure()) return false // For failure recovery, use the same route selector with a new connection. // 走到这里说明该请求确实可以进行重连。 return true }
注释其实已经说的很明白,这段代码没什么难度,以上代码是出错情况下来判断是否可以重连。
下面代码则是正常响应的情况下来判断是否需要重连,主要是处理重定向。// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response
接下里重点看下
val followUp = followUpRequest(response, exchange)
;
这个方法的主要作用就是根据响应码来判断是否需要重连,我们点进去看下:@Throws(IOException::class) private fun followUpRequest(userResponse: Response, exchange: Exchange?):Request? { val route = exchange?.connection?.route() val responseCode = userResponse.code val method = userResponse.request.method when (responseCode) { HTTP_PROXY_AUTH -> { val selectedProxy = route!!.proxy if (selectedProxy.type() != Proxy.Type.HTTP) { throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy") } return client.proxyAuthenticator.authenticate(route, userResponse) } HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse) HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> { return buildRedirectRequest(userResponse, method) } HTTP_CLIENT_TIMEOUT -> { // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (!client.retryOnConnectionFailure) { // The application layer has directed us not to retry the request. return null } val requestBody = userResponse.request.body if (requestBody != null && requestBody.isOneShot()) { return null } val priorResponse = userResponse.priorResponse if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) { // We attempted to retry and got another timeout. Give up. return null } if (retryAfter(userResponse, 0) > 0) { return null } return userResponse.request } HTTP_UNAVAILABLE -> { val priorResponse = userResponse.priorResponse if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) { // We attempted to retry and got another timeout. Give up. return null } if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) { // specifically received an instruction to retry without delay return userResponse.request } return null } HTTP_MISDIRECTED_REQUEST -> { // OkHttp can coalesce HTTP/2 connections even if the domain names are different. See // RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then // we can retry on a different connection. val requestBody = userResponse.request.body if (requestBody != null && requestBody.isOneShot()) { return null } if (exchange == null || !exchange.isCoalescedConnection) { return null } exchange.connection.noCoalescedConnections() return userResponse.request } else -> return null } }
代码虽然不少,但是却很好理解。主要是根据
ResponseCode
处理重定向的问题,就是3XX系列。HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> { return buildRedirectRequest(userResponse, method) }
这句代码如果是3XX系列,根据重定向的地址来重新构建Request,然后复制给
request
,在下一轮重新发起请求。
除了以上两种情况是需要重连的,之外的就是直接返回Response
。
RetryAndFollowUpInterceptor
的中置和后置我们看完了,之后我们来仔细看下它的前置工作是干什么的,我们点进去:
这是RealCall
类中的一个函数,也就是说所有的链式工作其实都是在为RealCall
来服务的,RealCall
才是爸爸,记住这点,后面的代码就很好理解了。fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) { check(interceptorScopedExchange == null) synchronized(this) { check(!responseBodyOpen) { "cannot make a new request because the previous response is still open: " + "please call response.close()" } check(!requestBodyOpen) } if (newExchangeFinder) { this.exchangeFinder = ExchangeFinder( connectionPool, createAddress(request.url), this, eventListener ) } }
前面
synchronized
中的代码都是对Request/Response
body做检测的,目的就是保证发起新的请求前,上一个body要关闭掉,避免数据混乱。
后面的代码才是关键。if (newExchangeFinder) { this.exchangeFinder = ExchangeFinder( connectionPool, createAddress(request.url), this, eventListener ) }
可以这样理解,每一次请求,都意味着需要一次数据交换,这个方法就是寻找一个数据交换???啥意思???换个名词来说,可能就会明白了,寻找一个交换,其实就是在寻找一个连接
Connection
,这样是不是就明白了,就是为了寻找一个可用的TCP或者TLS的连接。但是呢, 其实这里并不是寻找连接,而是为寻找连接来做准备的,就是准备各种参数,至于寻找连接,那是在其他Interceptor中来进行的工作。
这样我们就大致明白RetryAndFollowUpInterceptor
的工作内容了:- 连接准备
- 请求
- 处理重连
-
到这里RetryAndFollowUpInterceptor
代码就看完了。
接下来看下一个Interceptor,BridgeInterceptor
。
BridgeInterceptor
桥接Interceptor???干啥用的?我们点进去看下就明白了。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
代码很多,但是我们大致一看就知道是干什么的了。大致的工作就是为上一步准备的请求和下一步即将发送的请求做一个连接。
其实说白了就是为我们添加各种必要但对于开发者来说比较麻烦的各种Header
信息,所以一些必要且麻烦的Header
OkHttp帮我们做了。比如:Content-Length,Content-Encoding等各种Header。还是老规矩,来分割下前、中、后置工作。
- 前置工作
添加各种Header。 - 中置工作
发起请求。 - 后置工作
因为前置工作帮我们添加了一些gzip Header,所以后置工作需要帮我们进行解压缩。
这个BridgeInterceptor
相对来收是比较简单的,接下来我们看下一个Interceptor。
CacheInterceptor
,顾名思义就是做缓存工作用的Interceptor。请求之前先看下是否有可用缓存,如果有,就直接用缓存,如果没有则发起请求,请求结束后判断是否可以缓存,如果可以则进行缓存,这就是它大概的一个工作内容,我们仔细想一下就可以想明白的。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
额。。。挺多代码的。还是老规矩,分割前、中、后置工作。
- 前置工作
检查是否有可用缓存,如果有则直接返回缓存内容,代码这里就不分析了,比较简单。val call = chain.call() val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE if (cacheCandidate != null && cacheResponse == null) { // The cache candidate wasn't applicable. Close it. cacheCandidate.body?.closeQuietly() } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } // If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) }
- 中置工作
发起请求。networkResponse = chain.proceed(networkRequest)
- 后置工作
根据Response来判断是否需要缓存,同样代码也是很简单,这里就不细看了。if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { cacheResponse.body?.closeQuietly() } } val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { // This will log a conditional cache miss only. listener.cacheMiss(call) } } } if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } }
但是这里我们可能需要简单了解下CacheStrategy,缓存策略,其实就是根据不同的缓存Header来进行不同的缓存策略。比如:Date,Expires,Last-Modified,ETag,Age。然后再结合ReponseHeader来执行具体的缓存策略。比如:If-None-Match,If-Modified-Since 。这里就不细看了,都比较简单,接下来进入下一个Interceptor。
ConnectInterceptor
,连接Interceptor,这个是重点,这个是重点,这个是重点。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
哇~代码很少啊,只有四行代码。啪啪啪 ~~~ 别看代码少,但是却很难懂,硬核的东西比较多。
老规矩,分割前、中、后置工作。
- 前置工作
val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange)
- 中置工作
connectedChain.proceed(realChain.request)
- 后置工作,无,卒~。
啊,为什么没有后置工作,看名字,ConnectInterceptor,它是建立连接的Interceptor,连接建立,请求,返回结果,还要什么后置工作。
接下来我们具体分析代码。
我们先来看下前置工作;
val exchange = realChain.call.initExchange(chain)
然后我们点进去看下代码:
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
呦呵~又是RealCall
类中的一个函数,RealCall
果然是爸爸。
我们看下重点的代码:
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
第一句代码:通过find找到一个codec,这个codec是干嘛用的?其实是coder&decoder缩写,字面意思,编码解码器。哦原来是查找一个可用的编码解码器,就是找到一个针对不同的格式报文的编码解码器,比如:HTTP1版本的编码解码器,HTTP2的编码解码器。我们点击去看下:
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
其实只有两行代码,第一行找到一个健康的连接,根据这个连接来创建对应的codec对象。什么是一个健康的连接?我们接着在点进去看下:
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If it isn't, take it out of the pool.
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}
两部分代码:第一部分找到一个连接,第二部分判断连接是否是健康的。
我们从第一部分代码点进去看下是怎么样找到一个连接的。
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// Attempt to reuse the connection from the call.
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
我k,这代码着实有点长啊。不慌,根据功能来划分下,慢慢分析。
第一部分:
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
先看下自己有没有连接,如果有且是可重用的,就直接返回。
第二部分:
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
从连接池中获取一个可用的连接,此时传入的参数:(address, call, null, false)
说明下最后两个参数分别是:路由和是否多路复用(HTTP2的特性)
这里说个题外话,说明是路由?
一个路由包含3部分:proxy(代理类型:代理或者直连),ip以及port,也就是说一个路由如果确定之后,有可能发生变化的是proxy和ip。用一张图来说明下,如下:
路由
我们点进入看下这个allAcquirePooledConnection
这个函数:
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
ok,代码不多,主要是遍历连接池中的连接,然后筛选连接,条件如下:
- 是否是需要多路复用的连接并且该链接是支持多路复用;
- 再看该该链接是否满足要求:
这段代码主要是阐述步骤二满足的标准是什么。internal fun isEligible(address: Address, routes: List<Route>?): Boolean { assertThreadHoldsLock() // If this connection is not accepting new exchanges, we're done. if (calls.size >= allocationLimit || noNewExchanges) return false // If the non-host fields of the address don't overlap, we're done. if (!this.route.address.equalsNonHost(address)) return false // If the host exactly matches, we're done: this connection can carry the address. if (address.url.host == this.route().address.url.host) { return true // This connection is a perfect match. } // At this point we don't have a hostname match. But we still be able to carry the request if // our connection coalescing requirements are met. See also: // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/ // 1. This connection must be HTTP/2. if (http2Connection == null) return false // 2. The routes must share an IP address. if (routes == null || !routeMatchesAny(routes)) return false // 3. This connection's server certificate's must cover the new host. if (address.hostnameVerifier !== OkHostnameVerifier) return false if (!supportsUrl(address.url)) return false // 4. Certificate pinning must match the host. try { address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates) } catch (_: SSLPeerUnverifiedException) { return false } return true // The caller's address can be carried by this connection. }
-
当前连接的数量小于最大限制且可以继续创建新的
Exechange
-
当前连接的地址和新连接的地址不相同,需满足以下条件:
-
internal fun equalsNonHost(that: Address): Boolean { return this.dns == that.dns && this.proxyAuthenticator == that.proxyAuthenticator && this.protocols == that.protocols && this.connectionSpecs == that.connectionSpecs && this.proxySelector == that.proxySelector && this.proxy == that.proxy && this.sslSocketFactory == that.sslSocketFactory && this.hostnameVerifier == that.hostnameVerifier && this.certificatePinner == that.certificatePinner && this.url.port == that.url.port }
这里两个地址是否相同,代码已经写的很清楚了,这里就不在赘述。
-
-
主机名是否相同
-
是否为http2的连接
-
判断路由是否匹配
private fun routeMatchesAny(candidates: List<Route>): Boolean { return candidates.any { it.proxy.type() == Proxy.Type.DIRECT && route.proxy.type() == Proxy.Type.DIRECT && route.socketAddress == it.socketAddress } }
-
验证码hostnameVerifier是否一致,这里验证的过程我们就不一一具体分析了。
-
检测certificatePinner。
如果以上条件都没问题,也就是弄明白了该链接满足了当前的需求,继续下一步:
取得连接且不发送事件。call.acquireConnectionNoEvents(connection)
-
第三步:指定路由,然后在拿一次不支持http2的了解;
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
这个时候传入的参数为:(address, call, routes, false)
,可以看到这个时候已经传入了路由参数。这时候得到的连接就是从这一组路由中得到不支持http2的连接。
如果没有得到连接,进入第四步;
第四步:创建一个新的连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
第五步:创建新的连接之后,再次冲连接池中再取一次,而这次的入参又变了:(address, call, routes, true)
,意思是只取路由中的支持多路服用的连接,如果取到了说明这组路由且支持多路复用的连接是存在连接池中的,然后从连接池中取出,之后将刚刚新创建的连接关闭掉。
这里说明下为什么会出现这种情况呢?因为同一时间有可能同时发起多个请求,但是每个请求的host和schema是一样的,只有path是不一样的,这样的请求是可以公用一个连接的,所以当其中一个请求创建成功后,其他请求一定会拿到连接池中的连接,因为第一个创建成功后,会放入到连接池中,这个是同步的操作,当放入到连接池之后,其他请求再去拿一定是有连接可用的。
这里额外说一下:
nextRouteToTry
,这个是干嘛用的呢?就是第四步和第五步的结合,第四步创建一个新的连接,既然能创建一个新的连接,说明这个路由是可用的,当出现错误需要重连时,此时就不需要在重新查找路由了,因为这个创建的连接对应的路由是一定可用的,目的是为了加快连接速度。
经过上述五步之后,一定会拿到连接,所以一共经历了五种获取连接的情况。
然后findConnection
函数就结束了,返回了一个连接。然后判断该链接是否健康,健康与否主要是有是否关闭来判断的。然后接着看返回之后的代码。
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
在得到一个健康的连接之后,然后使用这个连接创建出一个编码解码器。
我们点进去看下是怎样创建编码解码器的:
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
哦原来是根据连接的不同来创建不同协议的编码解码器,如果是http1,就创建http1的编码解码器,如果是http2,就创建http2的编码解码器。ok。接着往下走。
接着回溯代码:
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
可以看到在得到编码解码器之后,基于编码解码器来创建一个Exchange
。
来解释下Codec和Exchange:
Codec是真正的发起请求和接受请求的,而Exchange是对Codec的包装和代理,exchange中的读写数据,其实就是调用了codec中的读写数据。
好了,接着网上回溯代码,就会到了最开始initExchange
函数入口的地方了:
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
然后ConnectInterceptor
进行中置工作(交棒):
connectedChain.proceed(realChain.request)
ok,到这里整个ConnectInterceptor
工作就分析完了,就是创建连接,然后返回一个包装好的Exchange,有了这个Exchange就可以做到发送和接受请求了。接下来我们看最后一个Interceptor
, CallServerInterceptor,点击去看下:
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
}
额,又是一坨代码,不慌,点根烟安静下,可以发现,这就是在利用Exchange来进行写请求和读响应啊,然后对响应进行处理,将最终的响应直接返回,这样这个链式调用就开始往回走了。看下重点代码:
exchange.writeRequestHeaders(request)
写请求头信息
exchange.flushRequest()
结束请求写操作。
exchange.readResponseHeaders(expectContinue = false)
读取响应头。
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
读取响应体,构建成一个响应体:Response,然后返回。
接下来的过程就很熟悉了:
- networkInterceptors,如果设置了自定义网络拦截器,会从这里进行Response的回调;
- ConnectInterceptor,没有后置工作,直接返回Response;
- CacheInterceptor,根据响应头来处理缓存,之后返回Response;
- BridgeInterceptor,将响应体进行gzip解压缩,然后返回Response;
- RetryAndFollowUpInterceptor,根据响应码来判断是否重连,如不需要,返回Response
- interceptors,如果用户设置了自定的Interceptors,这里会进行Response的回调。
之后这个链式工作结束,回到getResponseWithInterceptorChain
这个函数,还记得这个函数在哪调用的吗?哈哈哈。。。对就是我们最开始分析链式工作的地方:
AsyncCall中的run
函数。
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
这之后就是responseCallback
的各种回调了。
至此这个异步请求就结束了,整体下来也不是太难理解,还是很开心的。
至于RealCall中的execute
函数(同步请求数据)我们来简单看下;
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
哎呦我去,简单粗暴,直接调用getResponseWithInterceptorChain
函数,然后将结果直接返回。
好了,到这里整个OkHttp的源码简单解读就结束,整体看来不是太难,但也是需要多次进行揣摩和分析的,相信对你来说小case啦。加油~~~
个人能力有限,如有错误之处,还望指出,我会第一时间验证并修改。
网友评论