文章中源码的OkHttp版本为4.10.0
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
1.简单使用
- okHttp的简单使用代码如下:
//创建OkHttpClient对象
val client = OkHttpClient().newBuilder().build()
//创建Request对象
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json") //添加请求的url地址
.build() //返回一个Request对象
//发起请求
fun request() {
val response = client
.newCall(request) //创建一个Call对象
.enqueue(object : Callback { //调用enqueue方法执行异步请求
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
}
- 工作流程有四步:
- 创建OkHttpClient对象
- 创建Request对象
- 创建Call对象
- 开始发起请求,
enqueue
为异步请求,execute
为同步请求
2.OkHttpClient对象是如何创建的
val client = OkHttpClient().newBuilder().build()
OkHttpClient
对象的创建是一个典型的建造者模式,先看一下newBuilder
方法做了什么,源码如下:
//创建了一个Builder对象
open fun newBuilder(): Builder = Builder(this)
class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//拦截器集合
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络拦截器集合
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
...
}
newBuilder
中创建了一个Builder
对象,Builder
对象的构造函数中定义了很多的变量,这里只保留了3个重要的。
下面看一下build
方法做了什么
//这个this就是上面创建的Builder对象
fun build(): OkHttpClient = OkHttpClient(this)
okHttpClient
源码如下
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("interceptors") val interceptors: List<Interceptor> =
builder.interceptors.toImmutableList()
@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
builder.networkInterceptors.toImmutableList()
...
}
通过OkHttpClient
对象的源码可以得知,Builder
创建的调度器、拦截器最终都会交给OkHttpClient
,这是建造者模式的特定。
3.Request对象是如何创建的
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json") //添加请求的url地址
.build() //返回一个Request对象
open class Builder {
//请求地址
internal var url: HttpUrl? = null
//请求方法
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
...
}
4.创建Call对象
val call = client.newCall(request)
override fun newCall(request: Request): Call {
//newRealCall中传递了三个参数,第一个参数是OkHttpClient本身,第二个参数request,
//第三个不用关注
return RealCall.newRealCall(this, request, forWebSocket = false)
}
接着我们先来看一下RealCall
是什么
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
}
从源码可知RealCall
是Call
的子类,那么Call
又是什么呢,往下看
//调用是准备好要执行的请求。也可以取消调用。
//由于该对象表示单个请求/响应对(流),因此不能执行两次。
interface Call : Cloneable {
//返回发起此调用的原始请求。
fun request(): Request
@Throws(IOException::class)
//同步请求,立即调用请求,并阻塞,直到响应可以处理或出现错误。
fun execute(): Response
//异步请求,接受回调参数
fun enqueue(responseCallback: Callback)
//取消请求
fun cancel()
//如果此调用已被执行或进入队列,则返回true。多次执行调用是错误的。
fun isExecuted(): Boolean
//是否是取消状态
fun isCanceled(): Boolean
//超时时间,
fun timeout(): Timeout
//创建一个与此调用相同的新调用,即使该调用已经进入队列或执行,该调用也可以被加入队列或执行。
public override fun clone(): Call
fun interface Factory {
fun newCall(request: Request): Call
}
}
5.发起请求
- 以异步请求为例进行分析
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
println("onFailure:$e")
}
override fun onResponse(call: Call, response: Response) {
println("onResponse:${response.body.toString()}")
}
})
RealCall
是Call
的子类所以enqueue
的具体实现是在RealCall
中
override fun enqueue(responseCallback: Callback) {
//检查是否进行了二次请求
check(executed.compareAndSet(false, true)) { "Already Executed" }
//请求后立即调用,相当于监听请求的开始事件
callStart()
//将请求交给调度器来决定什么时候开始请求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
-
疑问:
client.dispatcher.enqueue
是如何决定什么时候开始请求的 - 已知
client
就是OkHttpClient
- 已知
-
dispatcher
是调度器,先来看一下它的源码
-
class Dispatcher constructor() {
//并发执行的最大请求数。上面的请求队列在内存中,等待正在运行的调用完成。
//如果在调用这个函数时有超过maxRequests的请求在运行,那么这些请求将保持在运行状态。
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
//每台主机可并发执行的最大请求数。这限制了URL主机名的请求。
//注意,对单个IP地址的并发请求仍然可能超过此限制:
//多个主机名可能共享一个IP地址或通过同一个HTTP代理路由。
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
//线程安全的单例模式,线程池的获取用于线程调度。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
//定义准备发送的队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//定义异步发送队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//定义同步发送队列
private val runningSyncCalls = ArrayDeque<RealCall>()
...
}
- 再来看一下
dispatcher.enqueue
的源码
- 再来看一下
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//将call对象添加到准备发送的队列,这个call对象来自AsyncCall,稍后再讲
readyAsyncCalls.add(call)
//修改AsyncCall,使其共享对同一主机的现有运行调用的AtomicInteger。
if (!call.get().forWebSocket) {
//共享一个已经存在的正在运行调用的AtomicInteger
val existingCall = findExistingCallWithHost(call.host())
//统计发送数量
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//准备发送请求
promoteAndExecute()
}
//将符合条件的call从readyAsyncCalls(准备发送的队列)添加到runningAsyncCalls(异步发送队列)中
//并在服务器上执行它们
//不能在同步时调用,因为执行调用可以调用用户代码
//如果调度程序当前正在运行,则为true。
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
//收集所有需要执行的请求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
//遍历准备发送的队列
while (i.hasNext()) {
val asyncCall = i.next()
//判断已经发送的请求是大于等于最大请求个数64,是则跳出循环
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//判断并发请求个数是否大于等于最大并发个数5,如果是则跳出循环
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从准备队列中删除
i.remove()
//计数+1
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
//将对象添加到异步发送队列中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//提交任务到线程池
asyncCall.executeOn(executorService)
}
return isRunning
}
- 最终请求是通过
AsyncCall
的executeOn
发送出去的,AsyncCall
是什么
- 最终请求是通过
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
}
- 接收了一个回调,继承了Runnable
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
//暂时定义为未执行成功
var success = false
try {
//使用线程执行自身
executorService.execute(this)
//执行成功
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//发送失败
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//发送结束
client.dispatcher.finished(this) // 停止运行
}
}
}
-
AsyncCall
将自己加入到线程池,然后线程池开启线程执行自己的run方法,那么AsyncCall
加入了一个怎样的线程池呢?
-
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
- 这里定义了一个缓存线程池,具有缓存功能且如果一个请求执行完毕后在60s内再次发起就会复用刚才那个线程,提高了性能。
- 交给线程池后,线程池会开启线程执行AsyncCall的run方法
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
//定义响应标志位,用于表示请求是否成功
var signalledCallback = false
timeout.enter()
try {
//发送请求并得到结果
val response = getResponseWithInterceptorChain()
//过程未出错
signalledCallback = true
//回调结果
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// 不要两次发出回调信号!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
//失败回调
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
在请求得到结果后最后会调用finish
表示完成,这里的finish
又做了什么呢?
/**
* 由[AsyncCall.run]用于表示完成。
*/
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/**
* 由[Call.execute]用于表示完成。
*/
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将完成的任务从队列中删除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//用于将等待队列中的请求移入异步队列,并交由线程池执行
val isRunning = promoteAndExecute()
//如果没有请求需要执行,回调闲置callback
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
- 流程图如下

作者:无糖可乐爱好者
链接:https://juejin.cn/post/7182552726641836087
网友评论