title: okhttp源码解析
date: 2020-03-25 21:42:36
tags: [android工具源码]
typora-root-url: ./okhttp源码解析
typora-copy-images-to: ./okhttp源码解析
1.总体流程
okhttp的优点
- 支持HTTPS/HTTP2/WebSocket(在OkHttp3.7中已经剥离对Spdy的支持,转而大力支持HTTP2)
- 内部维护任务队列线程池,友好支持并发访问
- 内部维护连接池,支持多路复用,减少连接创建开销
- socket创建支持最佳路由
- 提供拦截器链(InterceptorChain),实现request与response的分层处理(如透明GZIP压缩,logging等)
okhttp的总体架构.
imgHTTP知识点
100~199:指示信息,表示请求已接收,继续处理
200~299:请求成功,表示请求已被成功接收、理解
300~399:重定向,要完成请求必须进行更进一步的操作
400~499:客户端错误,请求有语法错误或请求无法实现
500~599:服务器端错误,服务器未能实现合法的请求
http缓存机制
第一次请求的时候,肯定是没有缓存的. 但是响应的时候服务器可以指定响应头的一些属性.来让下次请求时可以使用缓存或者是请求服务器.因此总是上一次的响应头作为下一次请求头信息的来源.
缓存的大概原理如下
缓存HTTP协议定义了几个可以用来控制浏览器缓存关键字,它们是:
Expires, 服务器响应头返回的过期时间,但是是以服务器时间为标准.可能和客户端不同步,
Pragma: no-cache, 请求头里的.但不是标准请求头.可能不是都适用.
Cache-Control , 缓存控制头.请求头和响应头都有.有如下格式
max-age 最长过期时间.有上次的响应提供给刻度端,通过这个客户端判断当前缓存是否过期.
no-cache 设置成这个.每次客户端的请求都去服务端查缓存是否过期.也就是由服务端决定是否使用缓存
no-store 客户端不使用缓存.
Last-Modified , 表示上次资源修改的时间.响应头内容.发现资源具有Last-Modified声明,
则再次向web服务器请求时带上头 If-Modified-Since,表示请求时间。
web服务器收到请求后发现有头If-Modified-Since 则与被请求资源的最后修改时间进行比对。
若最后修改时间较新,说明资源又被改动过,则响应整片资源内容(写在响应消息包体内),HTTP 200;
若最后修改时间一致,说明资源无新修改,则响应HTTP 304 (无需包体,节省浏览),
告知浏览器继续使用所保存的cache。
ETag。 资源的hash值.可以判断服务器资源是否修改过.也是为了提高效率的.由服务端产生.发给客户端.
使用方式
使用的是4.0的okhttp源码.已经用kotlin改写了.kotlin我也不是很熟.所以编写边学了
这个比较简单.就是构建一个request. 然后获得一个call.调用call.enqueue 来把这个call在异步线程执行.
final Request request = new Request.Builder()
.url("https://www.jianshu.com/u/b4e69e85aef6")
.addHeader("user_agent","22222")
.build();
Call call = mOkHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if(response != null )
Log.i(TAG, "返回服务端数据:"+ String.valueOf(response.body().string()));
}
});
从call.enqueue开始看. 这里的call 的真正类是RealCall.他代表一次真正的请求
执行请求流程
1.RealCall.enqueue 入栈请求
RealCall 类
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
//1.一个realCall只能执行一次.
check(!executed) { "Already Executed" }
executed = true
}
//2.EventListener 的callStart方法.
callStart()
//3.这里最重要.把call包装成asyncCall.由dispatcher分发. dispatc是所有异步,同步请求的分派器.
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
这里看到AsyncCall包装了callback. AsyncCall是一个继承Runnable的线程.他会在线程池中执行.
看下他的 run
1.1 AsyncCall.run
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
try { //获得响应.这是RealCall的方法.然后用callback 来调用对应的方法.
val response = getResponseWithInterceptorChain()
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
responseCallback.onFailure(this@RealCall, e)
} catch (t: Throwable) {
responseCallback.onFailure(this@RealCall, canceledException)
} finally {
client.dispatcher.finished(this)
}
}
}
在回到1.看看dispatch的逻辑
2.disptach.enqueue 分发器把请求分发
1.这是dispatch 里三个队列 准备执行的call队列.正在执行的异步call队列.正在执行的同步call队列,所有的call都会加入这里边
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
private val runningSyncCalls = ArrayDeque<RealCall>()
2.最多同时支持64个并发请求, 每个host最多支持5个请求.
get:Synchronized var maxRequests = 64
@get:Synchronized var maxRequestsPerHost = 5
3.内部有一个 线程池来执行异步任务
constructor(executorService: ExecutorService) : this() {
this.executorServiceOrNull = executorService
}
4.入队方法
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
5.加入等待队列
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
//6.看这个AsyneCall的host是否有正在执行从call.如果有的话.就复用这个call.
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//这里会遍历等待队列.移到运行队列并开始执行
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
这是存储本次方法要执行的call的集合
val executableCalls = mutableListOf<AsyncCall>()
//遍历 等待队列readyAsyncCalls
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//如果当前运行的cal数目超过64就停止遍历
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同一个host上执行的call超过5个.就跳过这个call
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
//这里就递增同一个host执行的call数目
asyncCall.callsPerHost.incrementAndGet()
//加入到要执行的队列中.
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
//遍历.然后通过dispatch构造函数的线程值来执行.这里就会执行上边1.1的run方法
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
到这里.就是把 请求call 封装成asyncCall,交给disptach分发器分发.通过线程池异步执行,而执行的方法是AsyneCall.run. 同步的和这类似.并且更简单.
3.RealCall.getResponseWithInterceptorChain
AsyneCall是RealCall的内部类.他的run方法就是调用Real.getResponseWithInterceptorChain 来获得响应
这个方法就是有名的调用链.先看看结果图.
img这里就是把所有拦截器组成一个集合.每个拦截器按顺序拦截请求并处理.然后交给chian 去调用下一个拦截器.然后层层递进到最里边的拦截器处理请求.得到相应.然后在层层向往返回响应.每层拦截器在对响应进行处理.这里默认了几个拦截器.同时我们可以加入自定义的应用拦截器和网络拦截器.
RealCall类
internal fun getResponseWithInterceptorChain(): Response {
// 1.拦截器集合,这里的添加顺序就是请求调用的顺序.要特别注意
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors 2.应用拦截器.我们可以添加自己的到这个集合.
interceptors += RetryAndFollowUpInterceptor(client) 3.重试及重定向拦截器.
interceptors += BridgeInterceptor(client.cookieJar)4.封装请求响应头
interceptors += CacheInterceptor(client.cache)5.根据请求响应头决定是否使用缓存
interceptors += ConnectInterceptor 6.拿到合适的连接,这里使用了连接处
if (!forWebSocket) {7.网络拦截器.我们可以添加自己的.
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)8.与服务器交互的拦截器,获取真正网络请求
val chain = RealInterceptorChain(9,生成一个调用链注意index=0,调用链与拦截器交替操作.
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)10.从调用链开始执行,最后获得响应
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null) //释放连接
}
}
}
可以看到.主要就是为这个realcall对象创建了一系列的拦截器对象.在一个list中.然后通过调用链chain来执行.获得response.最后释放连接connection. connection连接这部分以后再讲.
这里看下chain责任链的proseed方法,chain是Interceptor的内部类.所有实现是RealInterceptorChain类
这里要注意.所有责任链chain和拦截器的方法.返回的都是response.
img3.1 拦截器链执行
RealInterceptorChain.process
RealInterceptorChain类
1.重要变量.请求call, 拦截器集合, 当前拦截器索引, Exchange是执行数据交换的类.以后再看.最后是请求.
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
override fun proceed(request: Request): Response {
2.创建一个新的chain.注意此时index+1了.也就是要指向下一个拦截器
val next = copy(index = index + 1, request = request)
3. 拿到index索引对应的拦截器. 最开始肯定index 是0 然后依次递增
val interceptor = interceptors[index]
4.执行拦截器的intercept方法.并传入刚才新建的chain. 拦截器里又会调用chain.proceed来执行下一个拦截.
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
//获得响应.返回
return response
}
可以看到调用链chain主要通过index来拿到不同的拦截器.然后chain调用interceptor. interceptor.又调用chain.不断递增index.就实现了拦截器集合中所有拦截器的链式调用.
看下步骤3中第一个拦截器RetryAndFollowUpInterceptor
3.2 重试拦截器RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor.intercept
这是关于 请求重试及请求重定向的拦截器. 里边又会调用response = realChain.proceed(request).来吧请求向下一个chain传递.
RetryAndFollowUpInterceptor类
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var priorResponse: Response? = null
while (true) { 这里是while 循环.
//1.产生一个ExchangeFinder.用来之后进行连接时找到合适的连接线路.
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
try {
2.又调用chain 的proceed. 执行其他拦截器的方法.获取响应
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
//3.连接失败.继续重试
newExchangeFinder = false
continue
} catch (e: IOException) {
4.已连上服务器.但是io出错,常识恢复request.如果可以就重试.否则报错.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e
}
newExchangeFinder = false
continue
}
看看上次是否有响应.有就把上次响应绑定到这次的.第一次进来是肯定没有.只有重试才会有
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//得到需要重试的request. 这里是根据response的响应来的code判断.比如300的需要重定向,408的超时重试,500的服务器有错误. 重定向请求只有get方式可以.这个新的request是根据response的参数生成的
val followUp = followUpRequest(response, exchange)
//不需要重试.那就使用调用链产生的响应
if (followUp == null) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
最大重试次数20次.超过就异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
可以看到.本次的重试request和response又成了下次循环的请求和之前的响应.
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
可以看到.重试拦截器主要是 拿到深层拦截器产生的响应.在根据响应response的code 分成30.40.5-~来决定是否要重试.如果重试就产生重试的request再次交给深层拦截器处理.得到重试响应.知道返回合适的response.
3.3桥接拦截器-BridgeInterceptor
接下来是BridgeInterceptor.intercept,注意是把我们穿件的请求转换成网络请求request. 也就是添加各种请求头.
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
host 是 ip:端口 的模式
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
压缩方式的头. 如果请求有压缩.响应也需要解压
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
通过url 拿到cookie.加入cookie头 ,session 在服务器端,cookie 在客户端, cookie就是用来标识这个用户的id.
由服务端产生sessionid,交给客户端保存.客户端下次请求要上送这个sessionId
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
通过调用链继续执行上层拦截器.拿到响应的response
val networkResponse = chain.proceed(requestBuilder.build())
更新刻度端cookie 的信息.
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
这里看到.得到的响应和 发出的请求绑定起来了.
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
//这里对应上班的gzip压缩头.如果请求有压缩,那么响应到来后会进行解压.然后在组成响应.
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
//返回响应
return responseBuilder.build()
}
这里也比较好理解.就是生成各种请求头.然后交给上层调用链得到响应.并绑定请求和响应.如果请求时使用了压缩.响应也需要进行解压.这里还使用了cookie
https://segmentfault.com/a/1190000006689767 请求头响应头的总结.
3.4缓存拦截器 CacheInterceptor
再次看下缓存的逻辑
img缓存这里涉及到了缓存策略CacheStrategy,缓存 Cache.文件缓存工具类DiskLruCache.CacheStrategy决定使用网络还是使用缓存.Cache是缓存的操作类,DiskLruCache是Cache的内部属性.cache会统计请求网络的数量.缓存名字数量.
这里可以看到.当本地有缓存却过期时,会请求服务器.服务器返回200就是最新的响应,而如果返回304则是使用本地原有缓存
可以看到.缓存的逻辑.就是根据请求时候的请求头来判断的. 第一次请求肯定是没有缓存的.下次请求同一url.则会使用上次的响应头的参数来判断.缓存的逻辑
override fun intercept(chain: Interceptor.Chain): Response {
1.根据请求拿到缓存的response.这个响应不一定有用,还需要经过缓存策略的判断.
val cacheCandidate = cache?.get(chain.request())
2.获取缓存策略,缓存策略觉得拿缓存还是走网络请求.
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest 决定是否请求网络
val cacheResponse = strategy.cacheResponse 决定是否使用缓存
3.当前缓存不符合要求.关闭缓存
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
既不使用网络请求,也不使用缓存,那么久返回一个504的响应
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
不使用网络请求,那么就使用缓存的响应
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
到这里说明需要使用网络请求获取服务器的响应.进一步让下个拦截器获取响应
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
//及时关闭资源.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
如果既有缓存的响应.又拿到了网络响应.就需要把二者结合.用网络响应的头更新缓存响应,然后在保存起来.
说明此时是一个Conditional Get请求
这里查了下服务器.服务器可能返回304表示缓存继续使用.或者是新的响应.都要把新的响应更新为最新的缓存
if (cacheResponse != null) {
//服务器返回304.就把缓存响应和网络响应合并,更新到缓存
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache.update(cacheResponse, response)
return response
} else {
关闭原有缓存的资源,这里是关闭流
cacheResponse.body?.closeQuietly()
}
}
//网络响应和缓存响应封装在一起.stripBody是把响应的body置空
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
保存缓存.但是只能是get请求才可以有缓存,同时还要判断响应的code.这里的逻辑就是http缓存的逻辑.
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
.可以看到.主要是根据CacheStrategy缓存策略的networkRequest和cacheResponse来判断是请求网络还是使用本地缓存.然后如果是请求网络.调用拦截器链得到networkResponse后又根据网络响应的头来决定如何更新本地缓存.只有get请求可以被缓存.
3.4.1CacheStrategy 缓存策略
通过给定的请求和缓存响应.来决定是使用网络还是使用缓存还是都使用.
CacheInterceptor里调用缓存策略的方式,注意.缓存策略里的response是从cache缓存里取的.缓存策略是在负责判断缓存是否有用,是否应该请求网络
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
CacheStrategy 这里主要是根据请求头和缓存响应的头来判断
private fun computeCandidate(): CacheStrategy {
// 没有缓存响应
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
//https请求.缓存响应却没有tls握手
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
//不可缓存,根据request的code.如302,500,
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
cacheControl 是nocache 或者.header里有If-Modified-Since或If-None-Match,就不缓存
If-Modified-Since是上传响应返回的Last-Modified最后修改时间.下次请求带上来让服务器判断本地缓存是否可以用
If-None-Match是上次响应返回的etag.下次请求带上让服务器判断缓存etag是否过期
val requestCaching = request.cacheControl
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
这里是用缓存响应来判断.通过过期时间来判断
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
如果 age + min-fresh >= max-age && age + min-fresh < max-age + max-stale,则虽然缓存过期了, //但是缓存继续可以使用,只是在头部添加 110 警告码
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
到这里就是有说明提供的缓存响应可以使用.然后对请求略加修改,编程网络请求networkrequest
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
返回缓存响应和网络请求.
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
逻辑其实比较好理解.就是根据传入的请求的header和缓存响应的header. 根据http协议的缓存逻辑.判断哪些情况下不能缓存.做到最后说明是可以缓存的.修改请求头.返回出去.
3.4.2 DiskLruCache
文件缓存管理类.他负责对缓存响应的增删改查.同时控制日志梳理的大小.
class DiskLruCache internal constructor(
操作文件的类
internal val fileSystem: FileSystem,
日志目录
val directory: File,
日志最大数量
maxSize: Long): Closeable, Flushable {
//记录每次操作的日志记录文件及备份文件
private val journalFile: File
private val journalFileTmp: File
private val journalFileBackup: File
//保存所有缓存的集合.
internal val lruEntries = LinkedHashMap<String, Entry>(0, 0.75f, true)
journalFile
DiskLruCache内部日志文件,对cache的每一次读写都对应一条日志记录,格式如下
libcore.io.DiskLruCache
1
100
2
前5行固定不变,分别为:常量:libcore.io.DiskLruCache;diskCache版本;应用程序版本;valueCount(后文介绍),空行
CLEAN 3400330d1dfc7f3f7f4b8d4d803dfcf6 832 21054
DIRTY 335c4c6028171cfddfbaae1a9c313c52
CLEAN 335c4c6028171cfddfbaae1a9c313c52 3934 2342
REMOVE 335c4c6028171cfddfbaae1a9c313c52
DIRTY 1ab96a171faeeee38496d8b330771a7a
CLEAN 1ab96a171faeeee38496d8b330771a7a 1600 234
READ 335c4c6028171cfddfbaae1a9c313c52
READ 3400330d1dfc7f3f7f4b8d4d803dfcf6
接下来每一行对应一个cache entry的一次状态记录,其格式为:[状态(DIRTY,CLEAN,READ,REMOVE),key,状态相关value(可选)]:
- DIRTY:表明一个cache entry正在被创建或更新,每一个成功的DIRTY记录都应该对应一个CLEAN或REMOVE操作。如果一个DIRTY缺少预期匹配的CLEAN/REMOVE,则对应entry操作失败,需要将其从lruEntries中删除
- CLEAN:说明cache已经被成功操作,当前可以被正常读取。每一个CLEAN行还需要记录其每一个value的长度
- READ: 记录一次cache读取操作
- REMOVE:记录一次cache清除
这里需要明确的是. 每个缓存数据.是一个entry. 而上边记录的是对 缓存entry 的每次操作.开始写入和创建时都是dirty.写入完成提交后成功的话就会生成clean,失败的话就生成remove记录.然后读取缓存entry时会生成read记录.
这只是一个操作日志的记录文件.并不是 缓存entry的保存文件.
- DiskCacheLru初始化时通过读取日志文件创建cache容器:lruEntries。同时通过日志过滤操作不成功的cache项。相关逻辑在DiskLruCache.readJournalLine,DiskLruCache.processJournal
- 初始化完成后,为避免日志文件不断膨胀,对日志进行重建精简,具体逻辑在DiskLruCache.rebuildJournal
- 每当有cache操作时将其记录入日志文件中以备下次初始化时使用
- 当冗余日志过多时,通过调用cleanUpRunnable线程重建日志
DiskLruCache.Entry
每个entry是一个缓存记录,这是一个内部类
internal inner class Entry internal constructor(
internal val key: String
) {
编辑entry的编辑器
internal var currentEditor: Editor? = null
valueCount 默认是2
internal val lengths: LongArray = LongArray(valueCount)
//保存cleanFiles的集合
internal val cleanFiles = mutableListOf<File>()
//保存dirtyFiles的集合
internal val dirtyFiles = mutableListOf<File>()
}
key:每个cache都有一个key作为其标识符。当前cache的key为其对应URL的MD5字符串
cleanFiles/dirtyFiles:每个cache对应2个cleanFiles,2个dirtyFiles。其中第一个cleanFiles/dirtyFiles记录cache的meta数据(如URL,创建时间,SSL握手记录等等),第二个文件记录cache的真正内容。cleanFiles记录处于稳定状态的cache结果,dirtyFiles记录处于创建或更新状态的cache
currentEditor:entry编辑器,对entry的所有操作都是通过其编辑器完成。编辑器内部添加了同步锁
SnapShot
cache快照,记录了特定cache在某一个特定时刻的内容。每次向DiskLruCache请求时返回的都是目标cache的一个快照,相关逻辑在DiskLruCache.get中
inner class Snapshot internal constructor(
private val key: String,
private val sequenceNumber: Long,
private val sources: List<Source>,
private val lengths: LongArray
)
DiskLruCache.edit
DiskLruCache可以看成是Cache在文件系统层的具体实现,所以其基本操作接口存在一一对应的关系:
- Cache.get() —>DiskLruCache.get()
- Cache.put()—>DiskLruCache.edit() //cache插入
- Cache.remove()—>DiskLruCache.remove()
- Cache.update()—>DiskLruCache.edit()//cache更新
看下DiskLruCache的初始化方法,里边主要是readJurnal
private fun readJournal() {
fileSystem.source(journalFile).buffer().use { source ->
val magic = source.readUtf8LineStrict()
val version = source.readUtf8LineStrict()
val appVersionString = source.readUtf8LineStrict()
val valueCountString = source.readUtf8LineStrict()
val blank = source.readUtf8LineStrict()
while (true) {
try {
//最重要是这里 一次读取一行数据
readJournalLine(source.readUtf8LineStrict())
lineCount++
} catch (_: EOFException) {
break // End of journal.
}
}
}
}
//CLEAN 3400330d1dfc7f3f7f4b8d4d803dfcf6 832 21054 列出一行记录
private fun readJournalLine(line: String) {
val firstSpace = line.indexOf(' ') 第一个空格
val keyBegin = firstSpace + 1
val secondSpace = line.indexOf(' ', keyBegin) 第二个空格
val key: String
if (secondSpace == -1) {
key = line.substring(keyBegin) //如果是remove的 就删除这个key对应entry
if (firstSpace == REMOVE.length && line.startsWith(REMOVE)) {
lruEntries.remove(key)
return
}
} else { 拿到key
key = line.substring(keyBegin, secondSpace)
}
通过key创建 entry. 加入到lruEntries中
var entry: Entry? = lruEntries[key]
if (entry == null) {
entry = Entry(key)
lruEntries[key] = entry
}
when {
secondSpace != -1 && firstSpace == CLEAN.length && line.startsWith(CLEAN) -> {
val parts = line.substring(secondSpace + 1).split(' ')
entry.readable = true 对clean型entry的处理.clean是只读的.不需要editor
entry.currentEditor = null
entry.setLengths(parts)
}
对dirty型entry的处理,创建一个editor
secondSpace == -1 && firstSpace == DIRTY.length && line.startsWith(DIRTY) -> {
entry.currentEditor = Editor(entry)
}
}
}
可以看到就是遍历操作日志, 把dirty和clean的entry记录.删除remove的entry.给dirty的entry创建编辑器editor
readJurnal 后还跟着 processJournal
private fun processJournal() {
val i = lruEntries.values.iterator()
while (i.hasNext()) {
val entry = i.next()
对clean型entry的editor是null
if (entry.currentEditor == null) {
for (t in 0 until valueCount) {
size += entry.lengths[t]
}
} else { dirty型的entry不是null.这里是把这个entry 给删除了.文件和entry都删除
entry.currentEditor = null
for (t in 0 until valueCount) {
fileSystem.delete(entry.cleanFiles[t])
fileSystem.delete(entry.dirtyFiles[t])
}
i.remove()
}
}
}
到这里.lruEntries里就剩下所有clean型的entry了.entry创建的时候.会生成2个clean文件.2个dirty文件.
看看DiskLruCache的get方法
operator fun get(key: String): Snapshot? {
initialize() //还是先初始lruentry.得到所有clean类型
拿到entry,拿到对应的 snapshot.返回
val entry = lruEntries[key] ?: return null
if (!entry.readable) return null
val snapshot = entry.snapshot() ?: return null
//写一条读取的操作记录
journalWriter!!.writeUtf8(READ)
.writeByte(' '.toInt())
.writeUtf8(key)
.writeByte('\n'.toInt())
return snapshot
}
在看看删除
internal fun removeEntry(entry: Entry): Boolean {
editor 的detect 会删除这个entry的所有dirty文件
entry.currentEditor?.detach()
在删除这个entry的所有clean文件
for (i in 0 until valueCount) {
fileSystem.delete(entry.cleanFiles[i])
size -= entry.lengths[i]
entry.lengths[i] = 0
}
写入删除日志
journalWriter!!.writeUtf8(REMOVE)
.writeByte(' '.toInt())
.writeUtf8(entry.key)
.writeByte('\n'.toInt())
删除这个entry
lruEntries.remove(entry.key)
清理日志
if (journalRebuildRequired()) {
cleanupQueue.schedule(cleanupTask)
}
return true
}
缓存的加入在cache类里.看看Cache.put
internal fun put(response: Response): CacheRequest? {
val requestMethod = response.request.method
这个entry是cache中国的.
val entry = Entry(response)
var editor: DiskLruCache.Editor? = null
try {
//拿到url 对应的diskLruCache的editor
editor = cache.edit(key(response.request.url)) ?: return null
entry.writeTo(editor) //这里最重要.同流的方式.写入到editor的dirty文件中
return RealCacheRequest(editor)
} catch (_: IOException) {
return null
}
}
fun writeTo(editor: DiskLruCache.Editor) {
editor.newSink(ENTRY_METADATA).buffer().use { sink ->
sink.writeUtf8(url).writeByte('\n'.toInt())
sink.writeUtf8(requestMethod).writeByte('\n'.toInt())
sink.writeDecimalLong(varyHeaders.size.toLong()).writeByte('\n'.toInt())
写入所有的请求header.
for (i in 0 until varyHeaders.size) {
sink.writeUtf8(varyHeaders.name(i))
.writeUtf8(": ")
.writeUtf8(varyHeaders.value(i))
.writeByte('\n'.toInt())
}
sink.writeUtf8(StatusLine(protocol, code, message).toString()).writeByte('\n'.toInt())
写入所有响应头
sink.writeDecimalLong((responseHeaders.size + 2).toLong()).writeByte('\n'.toInt())
for (i in 0 until responseHeaders.size) {
sink.writeUtf8(responseHeaders.name(i))
.writeUtf8(": ")
.writeUtf8(responseHeaders.value(i))
.writeByte('\n'.toInt())
}
写入https的数据
if (isHttps) {
sink.writeByte('\n'.toInt())
sink.writeUtf8(handshake!!.cipherSuite.javaName).writeByte('\n'.toInt())
writeCertList(sink, handshake.peerCertificates)
writeCertList(sink, handshake.localCertificates)
sink.writeUtf8(handshake.tlsVersion.javaName).writeByte('\n'.toInt())
}
}
}
DiskLruCache的commitEdit 方法.就是对把entry记录由dirty转为clean的方法.
internal fun completeEdit(editor: Editor, success: Boolean) {
val entry = editor.entry
dirtyfile转为cleanfile.同时更新尺寸.用于 lrucache计算
for (i in 0 until valueCount) {
val dirty = entry.dirtyFiles[i]
if (success) {
if (fileSystem.exists(dirty)) {
val clean = entry.cleanFiles[i]
fileSystem.rename(dirty, clean)
val oldLength = entry.lengths[i]
val newLength = fileSystem.size(clean)
entry.lengths[i] = newLength
size = size - oldLength + newLength
}
} else {
fileSystem.delete(dirty)
}
}
写一条clean记录.如果不可读,就移除这entry.并写一个remove记录.
journalWriter!!.apply {
if (entry.readable || success) {
entry.readable = true
writeUtf8(CLEAN).writeByte(' '.toInt())
writeUtf8(entry.key)
entry.writeLengths(this)
writeByte('\n'.toInt())
if (success) {
entry.sequenceNumber = nextSequenceNumber++
}
} else {
lruEntries.remove(entry.key)
writeUtf8(REMOVE).writeByte(' '.toInt())
writeUtf8(entry.key)
writeByte('\n'.toInt())
}
flush()
}
进行lrucache的清理.
if (size > maxSize || journalRebuildRequired()) {
cleanupQueue.schedule(cleanupTask)
}
}
这里看到.diskLruCache 管理entry 集合.而entry中文件的读写则是由 editor来完成.
附一个比较好的讲解https://yq.aliyun.com/articles/78102?spm=a2c4e.11153940.0.0.6e5f5246b6YrkM
3.5 连接器拦截器 ConnectInterceptor
这里是为了和服务器建立链接的拦截器
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
得到Exchange,这是数据发送接收的交换类
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
看起来很简单.但是内容很多.一步步看
val exchange = realChain.call.initExchange(chain) 调用RealCall.initExchange
internal fun initExchange(chain: RealInterceptorChain): Exchange {
1.codec是 ExchangeCodec类,实现是Http1ExchangeCodec和Http2ExchangeCodec分别对应HTTP1.1和http2.0的协议.主要写出request.读取response
找到合适的codec
val codec = exchangeFinder!!.find(client, chain)
2.exchange是对codec的封装,同时实现了EventListener事件的发送
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
this.interceptorScopedExchange = result
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
3.5.1ExchangeFinder.find 找到合适的数据交换类
ExchangeFinder
fun find(client: OkHttpClient,chain: RealInterceptorChain ): ExchangeCodec {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
}
3.5.2 findConnection建立合适的连接
ExchangeFinder类 主要功能在findConnection里
private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
1.如果call原有链接.但是不支持这次的请求地址.就把原有的关掉
releasedConnection = call.connection
toClose = if (call.connection != null &&
(call.connection!!.noNewExchanges || !call.connection!!.supportsUrl(address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}
2.复用原有连接
if (call.connection != null) {
// We had an already-allocated connection and it's good.
result = call.connection
releasedConnection = null
}
3.从连接池中获取一个连接
if (result == null) {
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
4.创造一个路由选择器,路由就是地址和代理的组合
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (newRouteSelection) {
5.通过路由再次尝试获取连接.这里会比较路由的dns 代理.port.协议等等.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}
6.如果没有找到已存在的合适的连接,就新建一个连接
if (!foundPooledConnection) {
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
7.执行TCP + TLS握手.建立连接.这是阻塞的操作.这里是建立socket连接.
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
8合并连接(如果我们是建立大量并发连接到同一个host)如果不是.就把这个链接加入到连接池中.
synchronized(connectionPool) {
connectingConnection = null
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
} else {
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
最后返回这个链接.
return result!!}
- 查看当前是否有之前已经分配过的连接,有则直接使用
- 从连接池中查找可复用的连接,有则返回该连接
- 配置路由,配置后再次从连接池中查找是否有可复用连接,有则直接返回
- 新建一个连接,其实是建立了soceket连接
- 查看连接池是否有重复的多路复用连接,有则清除,没有就加入连接池
3.5.2Http2ExchangeCodec 数据交换类
这是对http2.0协议的数据交换类简单来说HTTP/2主要解决了以下问题:
- 报头压缩:HTTP/2使用HPACK压缩格式压缩请求和响应报头数据,减少不必要流量开销
- 请求与响应复用:HTTP/2通过引入新的二进制分帧层实现了完整的请求和响应复用,客户端和服务器可以将HTTP消息分解为互不依赖的帧,然后交错发送,最后再在另一端将其重新组装
- 指定数据流优先级:将 HTTP 消息分解为很多独立的帧之后,我们就可以复用多个数据流中的帧,客户端和服务器交错发送和传输这些帧的顺序就成为关键的性能决定因素。为了做到这一点,HTTP/2 标准允许每个数据流都有一个关联的权重和依赖关系
- 流控制:HTTP/2 提供了一组简单的构建块,这些构建块允许客户端和服务器实现其自己的数据流和连接级流控制
class Http2ExchangeCodec(
client: OkHttpClient,
override val connection: RealConnection, //使用的连接
private val chain: RealInterceptorChain, //调用链
private val http2Connection: Http2Connection
) : ExchangeCodec {
private var stream: Http2Stream? = null
拿到request的body
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return stream!!.getSink()
}
写出request的header,可以看到.就是把参数封装到stream里了
override fun writeRequestHeaders(request: Request) {
header组成集合
val requestHeaders = http2HeadersList(request)
stream = http2Connection.newStream(requestHeaders, hasRequestBody)
stream!!.readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
stream!!.writeTimeout().timeout(chain.writeTimeoutMillis.toLong(),TimeUnit.MILLISECONDS)
}
override fun flushRequest() {
http2Connection.flush()
}
}
这里看到.主要功能都是在http2Connection中.这个看了下.比较难.现在看不太懂.大概意思就是不同的请求都做成帧,使用一个连接来发送.并且请求的头和请求提是分开发送的.
可以看到这个拦截器.主要是找到合适的连接.并创建数据交换类. 这里我们要知道.可能一个连接会发送多个请求.所以连接由连接池管理.产生连接是socket连接.这里就产生了两个流,对于http1.1来说.是Http1ExchangeCodec里的source: BufferedSource(读取缓存)和sink: BufferedSink(写出缓存),通过这两个流来负责与服务器交互.
3.6 CallServerInterceptor
这是最后一个拦截器.(不算自己提供的).他用来产生一个网络call给服务器.到这里时.与服务器的连接已经有了.读取写入的流也有了.这里就是发送数据.然后获取返回的数据.
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
1.这个exchange 的实现是Http1ExchangeCodec或者Http2ExchangeCodec,这是数据写出读入的真正交换器
val exchange = realChain.exchange!!
2. 层层封装后的请求
val request = realChain.request
val requestBody = request.body
3.先把header写入到输出流中.
exchange.writeRequestHeaders(request)
var responseBuilder: Response.Builder? = null
4.这里表示request允许发送body .如果是get请求就没有body
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
如果请求有100-continue.表示是上次响应的100.户端应当继续发送请求.这个临时响应是用来通知客户端它的部分请求已经被服务器接收,且仍未被拒绝.客户端应当继续发送请求的剩余部分,或者如果请求已经完成,忽略这个响应.服务器必须在请求完成后向客户端发送一个最终响应.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
发送请求.读取响应. flushRequest就是输出流刷新缓存.输出内容到socket服务器
exchange.flushRequest()
读取响应头,如果响应头的code是100系列的.那么就返回空,也就是还要继续发送请求体
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
}
if (responseBuilder == null) {
5.为请求的body创建一个写出缓存sink.用来写出body
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
body写出到缓冲buffer中
requestBody.writeTo(bufferedRequestBody)
}
} else {
//这里表示响应头不是100.
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
}
6.刷新缓存.写出请求的内容.这里就会把请求头和body一起写出去.
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
7.读取响应头,从BufferedSource中读取.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
}
8.由响应头构建响应.这里的body是空的.下边在根据响应code 来决定如何构建响应body
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) { 9.表示响应未完成,仍需重试,构建一个要重试的响应
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
response = if (forWebSocket && code == 101) {
response.newBuilder() 构建空响应
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder() 构建一个含有响应body的source的响应返回.这里是含有响应body的
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
return response
}
可以看到.这里把请求和响应的头和body都分开处理了. 底层是交给Http1ExchangeCodec或者Http2ExchangeCodec对应http1.1和http2.0的数据交换.然后又把写出的数据封装成BufferedSink写出流和BufferedSource读取流. 这两个流带有缓冲. 并且有多个实现.这里就不细看了.
大概逻辑是先写出请求头.在写出请求体. 然后获得响应头.根据code在决定响应体.然后返回给上层.
附一个很好的讲解的.okhttp对网络的处理是很丰富的.考虑了各种网络情况.
https://yq.aliyun.com/articles/78101?spm=a2c4e.11153940.0.0.54d72e38d4OLgg
HTTP网络响应码https://www.cnblogs.com/isykw/p/6115469.html
http2.0详解 https://juejin.im/post/5a4dfb2ef265da43305ee2d0
附一张晚上别人的图
img img
网友评论