1、创建Client
Builder 是OkHttpClient的一个内部类,使用的是构建者模式
val client = OkHttpClient.Builder().connectTimeout(5000, TimeUnit.MILLISECONDS).build()
// Builder
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() // 创建分发器对象
...
}
2、构建Request
也就是构建请求报文信息,请求的url,header等信息,也是用的Builder模式
val request = Request.Builder().url(url).build()
3、创建Call 对象,可以把Call 对象当成是Request 和 Response 的中间桥梁,
Call 是一个接口,真正的实现类是RealCall。
val call = client.newCall(request)
// OkHttpClient.newCall
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}
// RealCall.newRealCall
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
同步请求:
val response = call.execute()
// RealCall.execute
override fun execute(): Response {
// 一个RealCall 只能执行一次
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher.executed(this)
// 通过拦截器链获取Response
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
// Dispatcher.executed
// 把call 添加到runningSyncCalls 队列中
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
// Dispatcher.finished
internal fun finished(call: AsyncCall) {
call.callsPerHost().decrementAndGet()
finished(runningAsyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 同步队列中移除这个call
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 查看是否还有请求,如有再继续执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
异步请求:
val response2 = call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
// ReaCall.enqueue
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
// 将new 一个AsyncCall,AsyncCall 是一个Runnable,是RealCall 的一个内部类
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
// Dispatcher.enqueue
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 在等待队列中添加AsyncCall
readyAsyncCalls.add(call)
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
// Dispatcher.promoteAndExecute
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 判断异步运行队列是否大于等于 64 且同主机请求个数 是否大于等于5
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
// 条件成立,则将等到请求队列的请求 readyAsyncCalls 添加到 executableCalls 和 runningAsyncCalls 中
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 将刚刚添加到runningAsyncCalls 中的请求,放在线程池中执行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
// Dispatcher.executorService
// 异步请求线程池
// 核心线程是为0,这样没有任务的时候,就可以线程全部关掉, 最大线程数是无限大,其实不是的,
// 因为异步运行队列的最大个数限制在了64
// 超时时间的是60秒
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
// AsyncCall.enqueue
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
// 执行AsyncCall 里面的run 方法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
// AsyncCall.run
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
// 最后调用 RealCall的 getResponseWithInterceptorChain 获取请求结果
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
client.dispatcher.finished(this)
}
}
}
同步请求和异步请求都是从拦截器链中获取结果,采用的是责任链模式:
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
网友评论