源码版本:com.squareup.okhttp3:okhttp:4.3.0
1 okhttp简单应用
通过okhttp发起异步post请求示例:
// 创建OkHttpClient单例
val okHttpClient = OkHttpClient()
/* 媒体类型 用于描述http请求或响应主体的内容类型。
* 官方注释:一种[RFC 2045][rfc_2045]媒体类型,适合描述http请求或响应体的内容类型。
* 默认charset=utf-8。*/
val contentType: MediaType = "application/json; charset=utf-8".toMediaType()
// json格式请求参数
val body: RequestBody = "{\"userId\": \"001\", \"password\": \"123\"}".toRequestBody(contentType)
// 新建post请求
val post: Request = Request.Builder()
.url("")
.post(body)
.build()
// 异步发起,回调形式返回结果
okHttpClient.newCall(post).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
@Throws(IOException::class)
override fun onResponse(call: Call, response: Response) {
val request = call.request()
val requestBody = request.body
val responseBody = response.body
}
})
可以看出用okhttp发送请求需要3步
- 创建一个单例OkHttpClient对象
- 创建原始请求Request对象
- 用OkHttpClient对象把Request对象包装成直接可以用Call,并排队执行。
1.1 OkHttpClient
关于OkHttpClient的官方注释:OkHttpClient是http请求(Call)的工厂,可以用来发送http请求和接受响应。只创建一个OkHttpClient实例并将其用于所有HTTP调用时,OkHttp的性能最佳。因为每个OkHttpClient实例都拥有自己的连接池和线程池。重用连接和线程可以减少延迟并节省内存。相反,为每个请求创建一个OkHttpClient实例会浪费空闲池的资源。
OkHttpClient可以直接通过构造方法创建也可以通过OkHttpClient.Builder创建。
// 通过OkHttpClient.Builder创建OkHttpClient实例
val okHttpClient = OkHttpClient.Builder()
.eventListener(object : EventListener() {
override fun callStart(call: Call) {
Log.d("network", "发起请求${call.request().url}")
}
})
.build()
// 通过无参构造创建OkHttpClient实例
val okHttpClient = OkHttpClient()
// OkHttpClient.Builder的build()方法
fun build(): OkHttpClient = OkHttpClient(this)
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
// OkHttpClient无参构造
constructor() : this(Builder())
// 省略其他代码
...
}
查看源码可以看出两种方法其实是一种,但如果想自定义OkHttpClient属性则需通过OkHttpClient.Builder。我们来看一下OkHttpClient.Builder的属性和默认值,这些最后会赋值给OkHttpClient。
// 调度器
internal var dispatcher: Dispatcher = Dispatcher()
// 连接池 底层用ArrayDeque
internal var connectionPool: ConnectionPool = ConnectionPool()
// 用户责任链/拦截器链 ArrayList
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// 网络责任链/拦截器链 ArrayList
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// EventListener工厂
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
// 连接失败是否重试
internal var retryOnConnectionFailure = true
// 身份验证
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = OkHttpClient.DEFAULT_CONNECTION_SPECS
internal var protocols: List<Protocol> = OkHttpClient.DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
// 链接超时时间
internal var connectTimeout = 10_000
// 读超时时间
internal var readTimeout = 10_000
// 写超时时间
internal var writeTimeout = 10_000
internal var pingInterval = 0
1.2 Request
Request只能通过Request.Builder创建,默认get请求。可自定义请求方式、url、header、tag等。Request不能被OkHttpClient直接使用,需要包装成Call。
Call是interface,我们来看一下Call定义了哪些属性和方法。
interface Call : Cloneable {
// 返回原始Request对象
fun request(): Request
// 同步执行请求,直接返回响应
@Throws(IOException::class)
fun execute(): Response
// 异步执行请求,把请求加入队列等待执行,通过Callback返回响应
fun enqueue(responseCallback: Callback)
// 请求可取消,已经执行过的不可以取消
fun cancel()
// 是否执行了 execute或enqueue
fun isExecuted(): Boolean
fun isCanceled(): Boolean
// 返回跨越整个调用的Timeout,包括解析DNS、连接、写入请求体、服务器处理和读取响应体过程。如果调用需要重定向或重试,所有操作都必须在一个超时周期内完成。
fun timeout(): Timeout
// 复制当前Call,因为一个Call只可以发起一次,想重复发起相同相求需复制一个新的Call
public override fun clone(): Call
// Call工厂interface
interface Factory {
fun newCall(request: Request): Call
}
}
RealCall是Call的实现类,原始请求request先包装成RealCall,后续还会包装成RealCall的内部类AsnyCall,这里先不说。OkHttpClient实现了Call.Factory,所以前边说OkHttpClient是http请求(Call)的工厂。
RealCall对象是通过okHttpClient.newCall(post)创建的。
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request,forWebSocket = false)
}
RealCall构造方法私有,只能通过RealCall的newRealCall(client: OkHttpClient,originalRequest: Request,forWebSocket: Boolean)构建实例。
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client,this)
}
}
}
1.3 发送异步请求
平时应用比较多的是异步请求,接下来我们看一下okHttp异步请求的执行过程。RealCall的enqueue方法:
override fun enqueue(responseCallback: Callback) {
// 加锁用标记位判断当前对象是否以执行,执行过抛异常"Already Executed",没执行过则执行。
synchronized(this){
check(!executed){ "Already Executed" }
executed =true
}
// 记录堆栈跟踪,告诉eventListener请求开始
transmitter.callStart()
// 通过dispatcher进行排队
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
2 Dispatcher和AsyncCall
Dispatcher 调度器,用于决定何时执行异步请求。可在创建OkHttpClient时自定义,一般没必要自定义,okhttp有很好的默认实现。说到Dispatcher就必须提到AsyncCall,AsyncCall是RealCall的一个内部类,是为了配合dispatcher进行线程调度的Runnable。
我们先看一下Dispatcher的部分源码。
// 并发执行的请求的最大数目
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
// 同一host请求并发执行的请求的最大数目。WebSocket 不受此限制
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
// 线程池
private var executorServiceOrNull: ExecutorService? = null
// 线程池懒汉式初始化
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
// 准备执行的AsyncCall列表
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// 正在执行的AsyncCall列表
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
// 请求入队
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 把call加入准备执行的AsyncCall列表
readyAsyncCalls.add(call)
// 相同host的call共享同一AtomicInteger
// 这里我个人认为把 readyAsyncCalls.add(call)放到if语句后更好
if (!call.get().forWebSocket) {
// 在执行中call列表和准备执行call列表寻找相同host的call
val existingCall = findExistingCallWithHost(call.host())
// 存在相同host的call,则把找到的call的AtomicInteger赋值给当前call
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 推动执行
promoteAndExecute()
}
/**
* 在执行中call列表和准备执行call列表寻找相同host的call
*/
private fun findExistingCallWithHost(host: String): AsyncCall? {
for (existingCall in runningAsyncCalls) {
if (existingCall.host() == host) return existingCall
}
/* 如果执行到这里肯定能找到相同host的call的,
因为当前call已经被加入到readyAsyncCalls,
当遍历到最后一个call就是当前call本身,
肯定符合existingCall.host() == host,然后返回的call实际就是当前call。*/
for (existingCall in readyAsyncCalls) {
if (existingCall.host() == host) return existingCall
}
return null
}
/**
* 筛选合适的待执行列表的call到执行中列表。
* 并在线程池executorService中执行
*/
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
// 可拓展列表,用于存放待执行列表中合适移动到执行中列表的call
val executableCalls = mutableListOf<AsyncCall>()
// 当前是否有请求正在执行
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 遍历待执行call列表
while (i.hasNext()) {
val asyncCall = i.next()
// 如果当前正在执行的call数量超过了调度器允许的最大值,则终止操作
if (runningAsyncCalls.size >= this.maxRequests) break
// 如果与当前call相同host的call数量超过调度器允许的同一个host的最大值,则跳过当前call,继续判断下一个call是否合适。
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue
// 当前call合适
// 移出待执行列表
i.remove()
// 同host请求计数加1
asyncCall.callsPerHost().incrementAndGet()
// 把当前call添加到可执行列表,当循环完成此列表中包含此次筛选的所有合适的call
executableCalls.add(asyncCall)
// 把当前call添加到执行中列表,此时此列表中包含之前正在执行的call和此次筛选出的合适执行但未执行的call
runningAsyncCalls.add(asyncCall)
}
// 判断是否有正在执行的(包含合适执行但未执行的)请求
isRunning = runningCallsCount() > 0
}
// 把此次循环筛选出来的call放到线程池中执行,for结束runningAsyncCalls中的所有call都处于执行中状态
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
这部分代码比较长,但是只做了一件事:把请求放到队列中合理调度执行。程序的入口是enqueue(call: AsyncCall)方法,操作一共分3步
- 把call放到待执行列表。
- 相同host的call共用同一AtomicInteger。AtomicInteger用于统计当前host正在执行的请求数量。
- 筛选合适的待执行call,从待执行列表取出放到执行中列表,然后在线程池中执行。
我们在看一下AsyncCall的源码
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
// 多线程操作可见,且操作保证原子性
@Volatile private var callsPerHost = AtomicInteger(0)
fun callsPerHost(): AtomicInteger = callsPerHost
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
// 原始请求的host
fun host(): String = originalRequest.url.host
// 原始请求
fun request(): Request = originalRequest
// 当前RealCall对象,持有此内部类的外部类对象
fun get(): RealCall = this@RealCall
// 执行当前AsnyCall
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
// 把当前call(Runnable对象)加入dispatcher的线程池中执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
// 执行结束,尝试释放connection,并通知eventListener
transmitter.noMoreExchanges(ioException)
// 失败回调,把当前call和异常传回
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
// 当前Runnable的run方法
override fun run() {
// 当前是在子线程
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
// 开始计时
transmitter.timeoutEnter()
try {
// 真正的网络请求操作,response是网络请求结果
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 回调 请求成功
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", INFO, e)
} else {
// 回调 请求失败
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
// 停止计时 即调用:transmitter.cancel()
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 回调 请求失败
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
client.dispatcher.finished(this)
}
}
}
}
Asnycall的代码非常简单,只有2个重点方法:
fun executeOn(executorService: ExecutorService) 和
Runnable的抽象方法override fun run()。
executeOn(executorService: ExecutorService)是提供给dispatcher当请求排队到可执行队列中时操作请求执行的入口,整个方法除了异常处理和回调处理,只有一句executorService.execute(this),executorService是从dispatcher持有的所有call共享的线程池,okhttp通过此线程池实现线程的回收利用。
run()方法就是网络请求的真正入口,即:val response = getResponseWithInterceptorChain()获取网络请求结果,然后通过回调把结果传给调用者。至此,okhttp的线程调度完成,Dispatcher和Asnycall的任务完成。
3 责任链
接下来我们看一下Asnycall的run()里用来获取网络响应的getResponseWithInterceptorChain(),它拉开了okhttp的一大亮点的序幕:责任链模式。
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors. 需要留意任务链中拦截器的顺序
val interceptors = mutableListOf<Interceptor>()
// 用户自定义拦截器,最先执行的拦截器
interceptors += client.interceptors
// 失败重试和重定向拦截器,如果请求取消则可能抛出IOException
interceptors += RetryAndFollowUpInterceptor(client)
// 桥接拦截器
interceptors += BridgeInterceptor(client.cookieJar)
// 缓存拦截器
interceptors += CacheInterceptor(client.cache)
// 链接拦截器,用于打开目标服务器的链接
interceptors += ConnectInterceptor
// 用户自定义的网络拦截器
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
// 真正对服务器进行网络调用的拦截器,任务链最后一个拦截器
interceptors += CallServerInterceptor(forWebSocket)
// 创建任务链,注意第四个参数index的值是0
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
// 执行任务链下一个
val response = chain.proceed(originalRequest)
// 以下都是异常处理
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
任务链的执行入口是val response = chain.proceed(originalRequest),我们点进去看一下,这里确实不太好理解,所以源码中有很多注释。
override fun proceed(request: Request): Response {
return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
calls++
// If we already have a stream, confirm that the incoming request will use it.
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
// Call the next interceptor in the chain. 注意这里第四个参数index+1了
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
// 取出当前index对应的interceptor
val interceptor = interceptors[index]
// 当前拦截器执行拦截操作
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")
// Confirm that the next interceptor made its required call to chain.proceed().
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
这里的重点同样只有一行val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null"),我们点开看一下intercept(next) ,这是接口Interceptor的方法,我们随便找一个实现类看,这里用的是简单好理解的LoggingInterceptor。
private static class LoggingInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
// 操作request
long t1 = System.nanoTime();
Request request = chain.request();
logger.info(String.format("Sending request %s on %s%n%s",
request.url(), chain.connection(), request.headers()));
// 执行任务链下一个
Response response = chain.proceed(request);
// 操作response
long t2 = System.nanoTime();
logger.info(String.format("Received response for %s in %.1fms%n%s",
request.url(), (t2 - t1) / 1e6d, response.headers()));
return response;
}
}
拦截器的 intercept(Chain chain)方法基本分为3部分
- 处理request;
- 执行任务链下一个,获取response;
- 处理response,并返回给上级调用者。
责任链的工作流程
未命名文件-6.png
网友评论