OkHttp 源码阅读笔记(一)
OkHttp
的大名不用多说了,本篇文章是对 OkHttp
的源码分析文章的第一篇。我后续分析源码的路径也都是针对 Http 1.1
来分析,略过 Http 2
和 WebSocket
逻辑,基于的 OkHttp
版本是 4.11.0
。
OkHttp 的简单使用
val client: OkHttpClient = OkHttpClient
.Builder()
.build()
val request = Request.Builder()
.get()
.url("https://www.google.com")
.build()
val call = client.newCall(request)
/**
* 异步请求方式
*/
call.enqueue(responseCallback = object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})
/**
* 同步请求方式
*/
//val response = call.execute()
首先通过创建一个 OkHttpClient
实例(通常在一般项目中都是只创建一个 OkHttp
实例),创建一个 Request
实例,Request
实例中包含我们 Http
请求的所有参数,通过 OkHttpClient#newCall()
方法传入创建的 Request
实例,会返回一个 Call
实例,通过 Call
实例就可以发送一次 Http
请求,请求的方式有两种,调用 execute()
方法是同步请求,调用 enqueue()
方法是同步请求。
同步请求
在介绍同步请求前,先看看 OkHttpClient#newCall()
创建 Call
的方法:
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Call
的实现类是 RealCall
,我们看看 RealCall
执行同步调用的方法是 execute()
我们来看看它的实现:
override fun execute(): Response {
// 判断是否已经执行过了,如果已经执行过了就报错
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 超时任务
timeout.enter()
callStart()
try {
// 将本次同步请求添加到 Dispatcher 中
client.dispatcher.executed(this)
// 真正执行请求
return getResponseWithInterceptorChain()
} finally {
// 从 Dispatcher 中移除本次同步请求
client.dispatcher.finished(this)
}
}
- 一个
RealCall
最多只能执行一次 -
timeout.enter()
用来计算超时任务,到达最大时间就会cancel
掉本次请求,然后报错,这个时间用的是OkHttpClient#callTimeoutMillis
参数,默认值是 0,也就是不计算超时。 - 请求中的同步任务会被添加到
Dispatcher
中,这个参数也是可以在创建OkHttpClient
的时候自定义。它主要处理异步调用的任务调度,异步调用时会详细介绍。 -
getResponseWithInterceptorChain()
就是执行真正的网络请求(也就是在当前线程直接执行),后面详细介绍。
我们看到上面还调用了 callStart()
方法,这里简单看一下:
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
这个 eventListener
可以在创建 OkHttp
实例的时候自定义它,通过它可以监听到请求的每个关键节点,比如这个 callStart()
方法就表示请求开始的回调,后面还有很多的其他关键节点的回调,比如创建 Socket
、DNS
查询、TLS
握手等等,我们可以在 OkHttp
很多处的源码看见它,后面的源码分析我也都会再说起它。
这里简单看看 Dispatcher#execute()
方法 和 Dispatcher#finished()
方法,来看看 Dispatcher
如何记录同步请求任务。
/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
// 添加到同步队列中
runningSyncCalls.add(call)
}
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 请队列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 后面分析 `promoteAndExecute()` 方法
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
// 当没有新的任务时,执行一次 `idleCallback`,表示当前 `OkHttp` 空闲了,和 Android 中的 `IdleHandler` 类似。
idleCallback.run()
}
}
异步请求
趁热打铁,我们接着看看 RealCall#enqueue()
方法执行的异步调用:
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
// 向 `Dispatcher` 中添加异步任务
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
异步任务的实现类是 AsyncCall
,它是 RealCall
中的一个内部类,我们看看 Dispatcher#enqueue
方法:
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 添加到等待队列
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
// 如果不是 WebSocket,会更新当前的域名正在请求的数量
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 检查等待中的任务是否可以执行
promoteAndExecute()
}
首先把任务添加到等待队列中;然后获取当前域名正在请求的任务的数量(这个数量我后面会分析),把这个数量更新在 RealCall
中;调用 promoteAndExecute()
执行等待队列中的可以执行的任务。
看看 promoteAndExecute()
方法的源码:
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 遍历等待的任务
while (i.hasNext()) {
val asyncCall = i.next()
// 如果当前执行中的任务大于了,最大的限制任务,退出遍历
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 如果当前任务的单个域名执行执行的个数大于了最大的限制,跳过当前 asyncCall
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
// 移动当前任务到正在执行的任务中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// Avoid resubmitting if we can't logically progress
// particularly because RealCall handles a RejectedExecutionException
// by executing on the same thread.
if (executorService.isShutdown) {
// ...
} else {
// 在线程池上执行这些任务
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
}
return isRunning
}
Dispatcher
默认限制同时最多执行 64 个任务(可自定义),同一个域名最多有 5 个任务同时执行(可自定义)。如果没有达到限制,就会把等待中的任务移动到正在执行的任务中,最后把这些任务在 executorService
上执行(上面的限制只是对异步任务有效,同步任务不受影响),等待中的任务需要等待其他任务执行完成后调用 promoteAndExecute()
方法来检测等待中的任务是否达到执行的要求(还有其他的逻辑也会调用 promoteAndExecute()
方法去判断),可以执行的话移动到正在执行中的任务。
我们看看 executorService
的创建:
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
线程池是一个无限大小的缓存线程池,线程的回收时间是 60s。
我们再来看看 AsyncCall#executeOn()
方法:
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 线程池上执行任务,入口函数为 run()
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
failRejected(e)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 真正执行
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 回调成功
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
// 回调失败
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 回调失败
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 将任务从 Dispatcher 中移除
client.dispatcher.finished(this)
}
}
}
}
上面的代码也都是比较简单,就不多说了,然后他的真正的 Http
请求也是通过 getResponseWithInterceptorChain()
方法来完成。
拦截器链的执行
我们上面讲同步调用和异步调用中都讲到了最终的请求都是通过 getResponseWithInterceptorChain()
方法来完成。
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 自定义普通拦截器
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
// 系统定义的拦截器
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
// 不是 WebSocket 的情况下
if (!forWebSocket) {
// 自定义网络拦截器
interceptors += client.networkInterceptors
}
// 真正的通过 Socket 和 Server 来交换信息。
interceptors += CallServerInterceptor(forWebSocket)
// 构建拦截器链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 开始执行拦截器链,最后一个拦截器执完成后就获取到了结果
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
拦截器链排在最前面的是自定义的普通拦截器,然后是 4 个普通的系统拦截器,再然后是自定义的网络拦截器(非 WebSocket
,拦截器执行时,可以保证对应的 Socket
已经创建完成,而且在它之中必须调用一次 RealInterceptorChain#procced()
方法,也只能调用一次),最后是系统的 CallServerInterceptor
拦截器。我这里简单介绍一下系统拦截器的功能,具体的实现代码,后面的文章介绍。
-
RetryAndFollowUpInterceptor
: 重试和重定向拦截器 -
BridgeInterceptor
: 主要处理RequestHeader
和ResponseHeader
-
CacheInterceptor
: 主要处理Http
中的缓存 -
ConnectionInterceptor
: 它是一个单例,它是获取(创建)一个可用的Socket
网络连接,也就是在它之后的拦截器执行时,网络链接就已经创建。 -
CallServerInterceptor
: 通过Socket
来和服务器交换数据
所有的 Interceptor
都会被放入到 RealInterceptorChain
,然后通过其 proceed()
方法开始执行,其中 index
就是下一个要执行的 Interceptor
的 index
。 这里还有一个比较重要的参数就是 Exchange
,如果它为空就表示还没有创建网络连接,反之就是已经创建了网络连接,我们上面也说到 ConnectionInterceptor
,在 ConnnectionInterceport
执行后,Exchange
对象就不为空了,我们来看看 RealInterceptorChain#proceed()
方法的源码:
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
// 调用次数记录
calls++
// exchange 不为空表示为网络拦截器
if (exchange != null) {
// 网络拦截器不能修改请求的域名
check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// 网络拦截器必须请求一次 proceed() 方法,也只能请求一次。
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
// 复制下一次要请求需要的 RealInterceptorChain 对象,这里的 index + 1 了,也就表示下一次的 Chain 执行的 Interceptor 为当前 `Interceptor` 的下一个。
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 执行拦截器的拦截方法,得到最后的结果然后返回。
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
return response
}
这里要简单说明一下在网络拦截器中不能修改域名,这很好理解,因为这个时候网络连接已经建立,也就是域名已经解析完成,如果你修改了网络域名,也就是说上面的网络连接对应的域名和你修改后的不一样,当然上面的网络连接是不可用的,所以 OkHttp
禁止在网络拦截器中修改域名;这里还限制了网络拦截器必须调用一次 proceed()
方法。
这里会构建一个新的 RealInterceptorChain
对象,和原来的对象相比,index
值加 1,reqeust
使用外部传入的 reqeust
。然后调用当前需要执行的 Interceptor
,然后调用它的 intercept()
方法,把上面新创建的 RealInterceptorChain
对象传入进去。
最后
本篇文章介绍了 OkHttp
同步网络请求和异步网络请求,还介绍了异步网络请求时 Dispatcher
如何调度请求任务,还介绍了 RealInterceptorChain
拦截器链如何工作的。后面的文章还会介绍网络链接如何创建,默认的拦截器具体的工作原理。
网友评论