最近打算做网络相关的优化工作,不免需要重新熟悉一下网络框架,在Android领域网络框架的龙头老大非OkHttp莫属,借此机会对OkHttp的一些内部实现进行深入的剖析,同时这些问题也是面试时的常客,相信一定对你有帮助。
先来一发灵魂拷问四连击:
- addInterceptor与addNetworkInterceptor有什么区别?
- 网络缓存如何实现的?
- 网络连接怎么实现复用?
- OkHttp如何做网络监控?
是不是既熟悉又陌生,实际上就是因为网络框架已经为我们实现了这些基本功能,所以很容易被我们忽略下面就来为大家分享一下:
OkHttp基本实现原理
OkHttp的内部实现通过一个责任链模式完成,将网络请求的各个阶段封装到各个链条中,实现了各层的解耦。
文内源码基于OkHttp最新版本4.2.2,从4.0.0版本开始,OkHttp使用全Kotlin语言开发,没上车的小伙伴要抓紧了,要不源码都快看不懂了 [捂脸],学习Kotlin可参考旧文 Kotlin学习系列文章Overview 。
我们从发起一次请求的调用开始,熟悉一下OkHttp执行的流程。
//创建OkHttpClient
val client = OkHttpClient.Builder().build();
//创建请求
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json")
.build()
//同步任务开启新线程执行
Thread {
//发起网络请求
val response = client.newCall(request).execute()
if (!response.isSuccessful) throw IOException("Unexpected code $response")
Log.d("okhttp_test", "response: ${response.body?.string()}")
}.start()
所以核心的代码逻辑是通过OkHttpClient的newCall方法创建了一个Call对象,并调用其execute方法;Call代表一个网络请求的接口,实现类只有一个RealCall。execute表示同步发起网络请求,与之对应还有一个enqueue方法,表示发起一个异步请求,因此同时需要传入callback。
我们来看RealCall的execute方法:
# RealCall
override fun execute(): Response {
...
//开始计时超时、发请求开始回调
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher.executed(this)//第1步
return getResponseWithInterceptorChain()//第2步
} finally {
client.dispatcher.finished(this)//第3步
}
}
把大象装冰箱,统共也只需要三步。
第一步
调用Dispatcher的execute方法,那Dispatcher是什么呢?从名字来看它是一个调度器,调度什么呢?就是所有网络请求,也就是RealCall对象。网络请求支持同步执行和异步执行,异步执行就需要线程池、并发阈值这些东西,如果超过阈值需要将超过的部分存储起来,这样一分析Dispatcher的功能就可以总结如下:
- 记录同步任务、异步任务及等待执行的异步任务。
- 线程池管理异步任务。
- 发起/取消网络请求API:execute、enqueue、cancel。
OkHttp设置了默认的最大并发请求量 maxRequests = 64 和单个host支持的最大并发量 maxRequestsPerHost = 5。
同时用三个双端队列存储这些请求:
# Dispatcher
//异步任务等待队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//异步任务队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//同步任务队列
private val runningSyncCalls = ArrayDeque<RealCall>()
为什么要使用双端队列?很简单因为网络请求执行顺序跟排队一样,讲究先来后到,新来的请求放队尾,执行请求从对头部取。
说到这LinkedList表示不服,我们知道LinkedList同样也实现了Deque接口,内部是用链表实现的双端队列,那为什么不用LinkedList呢?
实际上这与readyAsyncCalls向runningAsyncCalls转换有关,当执行完一个请求或调用enqueue方法入队新的请求时,会对readyAsyncCalls进行一次遍历,将那些符合条件的等待请求转移到runningAsyncCalls队列中并交给线程池执行。尽管二者都能完成这项任务,但是由于链表的数据结构致使元素离散的分布在内存的各个位置,CPU缓存无法带来太多的便利,另外在垃圾回收时,使用数组结构的效率要优于链表。
回到主题,上述的核心逻辑在promoteAndExecute方法中:
#Dispatcher
private fun promoteAndExecute(): Boolean {
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
//遍历readyAsyncCalls
while (i.hasNext()) {
val asyncCall = i.next()
//阈值校验
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
//符合条件 从readyAsyncCalls列表中删除
i.remove()
//per host 计数加1
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
//移入runningAsyncCalls列表
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//提交任务到线程池
asyncCall.executeOn(executorService)
}
return isRunning
}
这个方法在enqueue和finish方法中都会调用,即当有新的请求入队和当前请求完成后,需要重新提交一遍任务到线程池。
讲了半天线程池,那OkHttp内部到底用的什么线程池呢?
#Dispatcher
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
这不是一个newCachedThreadPool吗?没错,除了最后一个threadFactory参数之外与newCachedThreadPool一毛一样,只不过是设置了线程名字而已,用于排查问题。
阻塞队列用的SynchronousQueue,它的特点是不存储数据,当添加一个元素时,必须等待一个消费线程取出它,否则一直阻塞,如果当前有空闲线程则直接在这个空闲线程执行,如果没有则新启动一个线程执行任务。通常用于需要快速响应任务的场景,在网络请求要求低延迟的大背景下比较合适,详见旧文 Java线程池工作原理浅析。
继续回到主线,第二步比较复杂我们先跳过,来看第三步。
第三步
调用Dispatcher的finished方法
//异步任务执行结束
internal fun finished(call: AsyncCall) {
call.callsPerHost().decrementAndGet()
finished(runningAsyncCalls, call)
}
//同步任务执行结束
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
//同步异步任务 统一汇总到这里
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将完成的任务从队列中删除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//这个方法在第一步中已经分析,用于将等待队列中的请求移入异步队列,并交由线程池执行。
val isRunning = promoteAndExecute()
//如果没有请求需要执行,回调闲置callback
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
第二步
现在我们回过头来看最复杂的第二步,调用getResponseWithInterceptorChain方法,这也是整个OkHttp实现责任链模式的核心。
#RealCall
fun getResponseWithInterceptorChain(): Response {
//创建拦截器数组
val interceptors = mutableListOf<Interceptor>()
//添加应用拦截器
interceptors += client.interceptors
//添加重试和重定向拦截器
interceptors += RetryAndFollowUpInterceptor(client)
//添加桥接拦截器
interceptors += BridgeInterceptor(client.cookieJar)
//添加缓存拦截器
interceptors += CacheInterceptor(client.cache)
//添加连接拦截器
interceptors += ConnectInterceptor
if (!forWebSocket) {
//添加网络拦截器
interceptors += client.networkInterceptors
}
//添加请求拦截器
interceptors += CallServerInterceptor(forWebSocket)
//创建责任链
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
...
try {
//启动责任链
val response = chain.proceed(originalRequest)
...
return response
} catch (e: IOException) {
...
}
}
我们先不关心每个拦截器具体做了什么,主流程最终走到chain.proceed(originalRequest)
。我们看一下这个procceed方法:
# RealInterceptorChain
override fun proceed(request: Request): Response {
return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
// 统计当前拦截器调用proceed方法的次数
calls++
// exchage是对请求流的封装,在执行ConnectInterceptor前为空,连接和流已经建立但此时此连接不再支持当前url
// 说明之前的网络拦截器对url或端口进行了修改,这是不允许的!!
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// 这里是对拦截器调用proceed方法的限制,在ConnectInterceptor及其之后的拦截器最多只能调用一次proceed!!
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
// 创建下一层责任链 注意index + 1
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
//取出下标为index的拦截器,并调用其intercept方法,将新建的链传入。
val interceptor = interceptors[index]
val response = interceptor.intercept(next)
// 保证在ConnectInterceptor及其之后的拦截器至少调用一次proceed!!
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
return response
}
代码中的注释已经写得比较清楚了,总结起来就是创建下一级责任链,然后取出当前拦截器,调用其intercept方法并传入创建的责任链。++为保证责任链能依次进行下去,必须保证除最后一个拦截器(CallServerInterceptor)外,其他所有拦截器intercept方法内部必须调用一次chain.proceed()方法++,如此一来整个责任链就运行起来了。
比如ConnectInterceptor源码中:
# ConnectInterceptor 这里使用单例
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
val doExtensiveHealthChecks = request.method != "GET"
//创建连接和流
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
//执行下一级责任链
return realChain.proceed(request, transmitter, exchange)
}
}
除此之外在责任链不同节点对于proceed的调用次数有不同的限制,ConnectInterceptor拦截器及其之后的拦截器能且只能调用一次,因为网络握手、连接、发送请求的工作发生在这些拦截器内,表示正式发出了一次网络请求;而在这之前的拦截器可以执行多次proceed,比如错误重试。
经过责任链一级一级的递推下去,最终会执行到CallServerInterceptor的intercept方法,此方法会将网络响应的结果封装成一个Response对象并return。之后沿着责任链一级一级的回溯,最终就回到getResponseWithInterceptorChain方法的返回。
拦截器分类
现在我们需要先大致总结一下责任链的各个节点拦截器的作用:
拦截器 | 作用 |
---|---|
应用拦截器 | 拿到的是原始请求,可以添加一些自定义header、通用参数、参数加密、网关接入等等。 |
RetryAndFollowUpInterceptor | 处理错误重试和重定向 |
BridgeInterceptor | 应用层和网络层的桥接拦截器,主要工作是为请求添加cookie、添加固定的header,比如Host、Content-Length、Content-Type、User-Agent等等,然后保存响应结果的cookie,如果响应使用gzip压缩过,则还需要进行解压。 |
CacheInterceptor | 缓存拦截器,如果命中缓存则不会发起网络请求。 |
ConnectInterceptor | 连接拦截器,内部会维护一个连接池,负责连接复用、创建连接(三次握手等等)、释放连接以及创建连接上的socket流。 |
networkInterceptors(网络拦截器) | 用户自定义拦截器,通常用于监控网络层的数据传输。 |
CallServerInterceptor | 请求拦截器,在前置准备工作完成后,真正发起了网络请求。 |
至此,OkHttp的核心执行流程就结束了,是不是有种豁然开朗的感觉?现在我们终于可以回答开篇的问题:
addInterceptor与addNetworkInterceptor的区别
二者通常的叫法为应用拦截器和网络拦截器,从整个责任链路来看,应用拦截器是最先执行的拦截器,也就是用户自己设置request属性后的原始请求,而网络拦截器位于ConnectInterceptor和CallServerInterceptor之间,此时网络链路已经准备好,只等待发送请求数据。
-
首先,应用拦截器在RetryAndFollowUpInterceptor和CacheInterceptor之前,所以一旦发生错误重试或者网络重定向,网络拦截器可能执行多次,因为相当于进行了二次请求,但是应用拦截器永远只会触发一次。另外如果在CacheInterceptor中命中了缓存就不需要走网络请求了,因此会存在短路网络拦截器的情况。
-
其次,如上文提到除了CallServerInterceptor,每个拦截器都应该至少调用一次realChain.proceed方法。实际上在应用拦截器这层可以多次调用proceed方法(本地异常重试)或者不调用proceed方法(中断),但是网络拦截器这层连接已经准备好,可且仅可调用一次proceed方法。
-
最后,从使用场景看,应用拦截器因为只会调用一次,通常用于统计客户端的网络请求发起情况;而网络拦截器一次调用代表了一定会发起一次网络通信,因此通常可用于统计网络链路上传输的数据。
网络缓存机制CacheInterceptor
这里的缓存是指基于Http网络协议的数据缓存策略,侧重点在客户端缓存,所以我们要先来复习一下Http协议如何根据请求和响应头来标识缓存的可用性。
提到缓存,就必须要聊聊缓存的有效性、有效期。
HTTP缓存原理
在HTTP 1.0时代,响应使用Expires头标识缓存的有效期,其值是一个绝对时间,比如Expires:Thu,31 Dec 2020 23:59:59 GMT。当客户端再次发出网络请求时可比较当前时间
和上次响应的expires时间进行比较,来决定是使用缓存还是发起新的请求。
使用Expires头最大的问题是它依赖客户端的本地时间,如果用户自己修改了本地时间,就会导致无法准确的判断缓存是否过期。
因此,从HTTP 1.1 开始使用Cache-Control头表示缓存状态,它的优先级高于Expires,常见的取值为下面的一个或多个。
private,默认值,标识那些私有的业务逻辑数据,比如根据用户行为下发的推荐数据。该模式下网络链路中的代理服务器等节点不应该缓存这部分数据,因为没有实际意义。
- public 与private相反,public用于标识那些通用的业务数据,比如获取新闻列表,所有人看到的都是同一份数据,因此客户端、代理服务器都可以缓存。
- no-cache 可进行缓存,但在客户端使用缓存前必须要去服务端进行缓存资源有效性的验证,即下文的对比缓存部分,我们稍后介绍。
- max-age 表示缓存时长单位为秒,指一个时间段,比如一年,通常用于不经常变化的静态资源。
- no-store 任何节点禁止使用缓存。
强制缓存
在上述缓存头规约基础之上,强制缓存是指网络请求响应header标识了Expires或Cache-Control带了max-age信息,而此时客户端计算缓存并未过期,则可以直接使用本地缓存内容,而不用真正的发起一次网络请求。
协商缓存
强制缓存最大的问题是,一旦服务端资源有更新,直到缓存时间截止前,客户端无法获取到最新的资源(除非请求时手动添加no-store头),另外大部分情况下服务器的资源无法直接确定缓存失效时间,所以使用对比缓存更灵活一些。
使用Last-Modify / If-Modify-Since头实现协商缓存,具体方法是服务端响应头添加Last-Modify头标识资源的最后修改时间,单位为秒,当客户端再次发起请求时添加If-Modify-Since头并赋值为上次请求拿到的Last-Modify头的值。
服务端收到请求后自行判断缓存资源是否仍然有效,如果有效则返回状态码304同时body体为空,否则下发最新的资源数据。客户端如果发现状态码是304,则取出本地的缓存数据作为响应。
使用这套方案有一个问题,那就是资源文件使用最后修改时间有一定的局限性:
- Last-Modify单位为秒,如果某些文件在一秒内被修改则并不能准确的标识修改时间。
- 资源修改时间并不能作为资源是否修改的唯一依据,比如资源文件是Daily Build的,每天都会生成新的,但是其实际内容可能并未改变。
因此,HTTP 还提供了另外一组头信息来处理缓存,ETag/If-None-Match。流程与Last-Modify一样,只是把服务端响应的头变成Last-Modify,客户端发出的头变成If-None-Match。ETag是资源的唯一标识符
,服务端资源变化一定会导致ETag变化。具体的生成方式有服务端控制,场景的影响因素包括,文件最终修改时间、文件大小、文件编号等等。
OKHttp的缓存实现
上面讲了这么多,实际上OKHttp就是将上述流程用代码实现了一下,即:
- 第一次拿到响应后根据头信息决定是否缓存。
2.下次请求时判断是否存在本地缓存,是否需要使用对比缓存、封装请求头信息等等。
3.如果缓存失效或者需要对比缓存则发出网络请求,否则使用本地缓存。
OKHttp内部使用Okio来实现缓存文件的读写。
缓存文件分为CleanFiles和DirtyFiles,CleanFiles用于读,DirtyFiles用于写,他们都是数组,长度为2,表示两个文件,即缓存的请求头和请求体;同时记录了缓存的操作日志,记录在journalFile中。
开启缓存需要在OkHttpClient创建时设置一个Cache对象,并指定缓存目录和缓存大小,缓存系统内部使用LRU作为缓存的淘汰算法。
## Cache.kt
class Cache internal constructor(
directory: File,
maxSize: Long,
fileSystem: FileSystem
): Closeable, Flushable
OkHttp早期的版本有个一个InternalCache接口,支持自定义实现缓存,但到了4.x的版本后删减了InternalCache,Cache类又为final的,相当于关闭了扩展功能。
具体源码实现都在CacheInterceptor类中,大家可以自行查阅。
通过OkHttpClient设置缓存是全局状态的,如果我们想对某个特定的request使用或禁用缓存,可以通过CacheControl相关的API实现:
//禁用缓存
Request request = new Request.Builder()
.cacheControl(new CacheControl.Builder().noCache().build())
.url("http://publicobject.com/helloworld.txt")
.build();
OKHttp不支持的缓存情况
最后需要注意的一点是,OKHttp默认只支持get请求的缓存。
# okhttp3.Cache.java
@Nullable CacheRequest put(Response response) {
String requestMethod = response.request().method();
...
//缓存仅支持GET请求
if (!requestMethod.equals("GET")) {
// Don't cache non-GET responses. We're technically allowed to cache
// HEAD requests and some POST requests, but the complexity of doing
// so is high and the benefit is low.
return null;
}
//对于vary头的值为*的情况,统一不缓存
if (HttpHeaders.hasVaryAll(response)) {
return null;
}
...
}
这是当网络请求响应后,准备进行缓存时的逻辑代码,当返回null时表示不缓存。从代码注释中不难看出,我们从技术上可以缓存method为HEAD和部分POST请求,但实现起来的复杂性很高而收益甚微。这本质上是由各个method的使用场景决定的。
我们先来看看常见的method类型及其用途。
- GET 请求资源,参数都在URL中。
- HEAD 与GET基本一致,只不过其不返回消息体,通常用于速度或带宽优先的场景,比如检查资源有效性,可访问性等等。
- POST 提交表单,修改数据,参数在body中。
- PUT 与POST基本一致,最大不同为PUT是幂等的。
- DELETE 删除指定资源。
可以看到对于标准的RESTful请求,GET就是用来获取数据,最适合使用缓存,而对于数据的其他操作缓存意义不大或者根本不需要缓存。
也是基于此在仅支持GET请求的条件下,OKHTTP使用request URL作为缓存的key(当然还会经过一系列摘要算法)。
最后上面代码中贴到,如果请求头中包含vary:*这样的头信息也不会被缓存。vary头用于提高多端请求时的缓存命中率,比如两个客户端,一个支持gzip压缩而另一个不支持,二者的请求URL都是一致的,但Accept-Encoding不同,这很容易导致缓存错乱,我们可以声明vary:Accept-Encoding防止这种情况发生。
而包含vary:*头信息,标识着此请求是唯一的,不应被缓存,除非有意为之,一般不会这样做来牺牲缓存性能。
okhttp的连接复用,实则是依靠ConnectInterceptor连接拦截器实现的。
在关键方法上依次会走 newStream->findHealthyConnection->findConnection 最后获取到一个可用的连接
findConnection分析
这也是获取到可复用连接的关键,总的来说可以分为三步,第一步会先判断是否存在可复用的连接,如果有则直接用,如果没有则去连接池里面获取,如果连接池里面没有获取到,则最后才会去创建一个新的连接,并添加到线程池中。
相关说明
连接池ConnectionPool
-
transmitterAcquirePooledConnection
获取缓存池中的可用连接。
ConnectionPool提供对Deque进行操作的方法分别为put、get、connectionBecameIdle、evictAll几个操作。分别对应放入连接、获取连接、移除连接、移除所有连接操作。线程中不停调用Cleanup 清理的动作并立即返回下次清理的间隔时间。继而进入wait 等待之后释放锁,继续执行下一次的清理。所以可能理解成他是个监测时间并释放连接的后台线程。
-
put
函数,向缓存池中增加可用连接。 -
excutor
: 线程池,用来检测闲置socket并对其进行清理。 -
connections
: connection缓存池。Deque是一个双端列表,支持在头尾插入元素,这里用作LIFO(后进先出)堆栈,多用于缓存数据。 -
routeDatabase
:用来记录连接失败router
具体源码这里就不贴出来了,可以自行参照本文的思路去看源码,应该比较通俗易懂的。
okhttp网络监控指标
指标数据
1.入队到请求结束耗时
2.dns查询耗时
3.socket connect耗时
4.tls连接的耗时
5.请求发送耗时
6.响应传输耗时
7.首包耗时
8.响应解析耗时
指标获取
OKHhttp 3.10.0以上提供这些指标监听
指标流程分析
OkHttp整体流程
HTTP请求过程及指标监听插入位置
以下流程图只针对完整的HTTP请求,忽略重定向及HTTP2
指标对应方法:
1.入队到请求结束耗时
callStart-->callEnd
2.dns查询耗时
dnsStart-->dnsEnd
3.socket connect耗时
connectStart-->connectEnd
4.tls连接的耗时
secureConnectStart-->secureConnectEnd
5.请求发送耗时
requestHeaderStart-->requestHeaderEnd-->requestBodyStart-->requestBodyEnd
6.首包耗时
responseHeaderStart-->responseHeaderEnd
7.响应解析耗时
responseBodyStart-->responseBodyEnd
插入监听
一般情况
OkHttpClient创建一个单例来提供给APP网络请求使用,造成不同的URL请求监听器是同一个,如下:
public static OkHttpClient defaultClient() {
return new OkHttpClient.Builder()
.connectionPool(connectionPool)
.dispatcher(dispatcher)
.build();
}
public Builder() {
……
eventListenerFactory = EventListener.factory(EventListener.NONE);
……
}
Builder(OkHttpClient okHttpClient) {
……
this.eventListenerFactory = okHttpClient.eventListenerFactory;
……
}
再分析监听器,每个监听方法会返回一个Call对象,这个Call对象正是各个请求创建的Call对象,这样针对每个Call对象,来采集每个URL对应的start,end来进行指标计算
因为篇幅长度有限,这次就讲下OKHttp,其实还有许多面试题目整理,如果觉得没有看够,可以点击这里查看更多 传送门直达!!! 我这将其他的面试按技术板块进行了划分整理成了文档,并在网上和行业大佬那得到了参考答案,感性兴趣的可以看看复习下技术点的。
文章参考部分:
面试官:听说你熟悉OkHttp原理?
okhttp的连接复用
Android okhttp http网络监控
网友评论