作者:Calculus_小王
概述
OKHttp是一个基于HTTP协议的网络请求框架,它支持HTTP/2协议,连接复用和连接池,缓存策略等功能。它的核心设计是拦截器(Interceptor),它将请求的复杂逻辑切分成多个独立的模块,并通过责任链模式串联起来。每个拦截器都有自己的职责,比如重试和重定向,桥接,缓存,连接,调用服务器等。拦截器之间通过RealInterceptorChain
类来传递请求和响应,最终完成整个网络请求的过程。
建造器
以一段简单代码示例开始
// enqueue需要callback;execute不需要,但需要自行try catch
// 这里忽略两个build的参数配置
// 整段代码拆解为几步
// 1. OkHttpClient.Builder().build()建造OkHttpClient
// 2. Request.Builder().build()建造Request
// 3. client.newCall(request)得到Call
// 4. call.enqueue()发起请求
OkHttpClient.Builder().build().newCall(Request.Builder().build()).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
OkHttpClient.newCall():将Request封装成RealCall,并持有Client,为的就是对于每一个Request,可以便利的调用Client的能力,而避免client.xxx(request)
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
)
发起请求
RealCall.execute()\enqueue():请求的起点,均以client.dispatcher进行分发。其中exectue直接返回了Response,说明已经完成了请求,结果来源于getResponseWithInterceptorChain()
,先记住。因为exectue直接在发起调用的线程进行请求,而enqueue需要进行线程池调度,那么最终应该也会调用该方法发起请求,并返回结果
override fun execute(): Response {
// 校验,保证一个Request只请求一次
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 这里主要涉及一个EventListener,支持监听request的各个阶段和状态,比如connectEnd、dnsStart……
timeout.enter()
callStart()
try {
// 这一段就是正主,注意executed需要try包裹
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
// 结束
client.dispatcher.finished(this)
}
}
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
// enqueue的话不马上执行
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
调度器
Dispatcher:主要负责队列调度和线程切换,三个队列的释义一目了然,就不多作解释了,关注下线程池的配置吧
private var executorServiceOrNull: ExecutorService? = null
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
// 解析下此处线程池的配置,如果不太清楚的,要去补补课喔
// 核心线程池大小为0,即直接入队blockQueue
// 阻塞队列为SynchronousQueue,size为0,且同步阻塞,进入下一步最大线程数判断
// 最大线程数为Int.MAX_VALUE,那么综上即每次需要就新起一个线程,达到60s不工作即回收
// 那么这里有个问题,这样设置线程池会不会爆,因为不设上限的话
// 最大轻轻数maxRequests:64 和 最大请求域名数maxRequestsPerHost:5会进行限制,后面会看到
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
简单介绍了Dispatcher,接着看看调度的具体实现。同步调用就直接被放在了对应的运行队列里,而异步调用除了需要进入等待队列外,需要执行promoteAndExecute()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
// 从等待队列中迭代
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 这里的判断,就避免了线程池大吞吐量设计的溢出缺陷了
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
// 可以执行的,就会放在线程池队列,并加入异步运行队列
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 这里就把刚才放进来的,通过线程池进行分发执行了
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 这个方法继续跟进,这里把线程池传了过去
asyncCall.executeOn(executorService)
}
return isRunning
}
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 发起线程池调度
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
// callback,也就是一开始设置的回调响应
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 结束,回顾下execute发起时是不是也有这个,对这个方法不列出来了,直接讲一下作用
// 1.完成的request需要进行出队调度
// 2.调用promoteAndExecute(),推动等待队列的运行,这个是等待队列得以持续运动的核心
// 3.如果没有要运行的,也就是处于闲置状态,会运行idleCallback,类似于IdleHandler的设计
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
到这块,对Dispatcher的能力也大致有了一定的认知:
- 维护线程池
- 维护、管理、调度 运行\等待队列,类似于MessageQueue,而Dispatcher就相当于是Looper,那Request或者说RealCall就相当于是Message.Runnable
上面追踪到executorService.execute(this)
,这意味着RealCall实现了Runnable接口,找一下run
方法吧。然后我们发现了response来源于getResponseWithInterceptorChain()
,再回顾下execute()
,是不是同样调用了这个方法
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 异步请求的结果不同于同步直接return,而是通过responseCallback进行回调
// 如果需要多回调设置,那可以继承RealCall,维护callbackList,进行分发
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
那么OkHttpClient基础阶段的设计已经浮出水面了,那接下去就是重头戏getResponseWithInterceptorChain
的解析了
核心方法getResponseWithInterceptorChain
根据getResponseWithInterceptorChain
方法名,大概可以猜测这是拦截器和责任链模式的一种结合,先过一下责任链模式吧
责任链
责任链模式的设计在于核心接口,比如有个run()
,然后各链子类需要实现该接口,重写 run() 中,需要关注自身所需执行逻辑的时机、调度下游的时机,比较类似于在树结构递归DFS对于前中后序列遍历时的感觉
1. 单链表式
这种方式,通常通过上下游的对象持有传递,来构成整体的链。在Android中事件分发就采用了这种设计,但也是因为view的树形结构的原因
// 该代码段取自菜鸟教程
public class ChainPatternDemo {
private static AbstractLogger getChainOfLoggers(){
AbstractLogger errorLogger = new ErrorLogger(AbstractLogger.ERROR);
AbstractLogger fileLogger = new FileLogger(AbstractLogger.DEBUG);
AbstractLogger consoleLogger = new ConsoleLogger(AbstractLogger.INFO);
errorLogger.setNextLogger(fileLogger);
fileLogger.setNextLogger(consoleLogger);
return errorLogger;
}
public static void main(String[] args) {
AbstractLogger loggerChain = getChainOfLoggers();
loggerChain.logMessage(AbstractLogger.INFO, "This is an information.");
loggerChain.logMessage(AbstractLogger.DEBUG,
"This is a debug level information.");
loggerChain.logMessage(AbstractLogger.ERROR,
"This is an error information.");
}
}
// 输出结果
Standard Console::Logger: This is an information.
File::Logger: This is a debug level information.
Standard Console::Logger: This is a debug level information.
Error Console::Logger: This is an error information.
File::Logger: This is an error information.
Standard Console::Logger: This is an error information.
public abstract class AbstractLogger {
public static int INFO = 1;
public static int DEBUG = 2;
public static int ERROR = 3;
protected int level;
//责任链中的下一个元素
protected AbstractLogger nextLogger;
public void setNextLogger(AbstractLogger nextLogger){
this.nextLogger = nextLogger;
}
public void logMessage(int level, String message){
if(this.level <= level){
write(message);
}
if(nextLogger !=null){
nextLogger.logMessage(level, message);
}
}
abstract protected void write(String message);
}
public class ConsoleLogger extends AbstractLogger {
public ConsoleLogger(int level){
this.level = level;
}
@Override
protected void write(String message) {
System.out.println("Standard Console::Logger: " + message);
}
}
public class ErrorLogger extends AbstractLogger {
public ErrorLogger(int level){
this.level = level;
}
@Override
protected void write(String message) {
System.out.println("Error Console::Logger: " + message);
}
}
public class FileLogger extends AbstractLogger {
public FileLogger(int level){
this.level = level;
}
@Override
protected void write(String message) {
System.out.println("File::Logger: " + message);
}
}
2. 数组\队列
而数组\队列形式,则是在下游对象的传递\获取方式上进行了一点变化,其余设计均一致
// 取自网络博客代码
public abstract class Handler {
// ...
// 处理请求的抽象方法
public abstract void handleRequest(Request request);
protected void next(Request request, Handler[] handlers, int index) {
if (index < handlers.length) { // 如果有后继节点,则转发请求
handlers[index].handleRequest(request);
}
}
}
public class Client {
public static void main(String[] args) {
Handler[] handlers = new Handler[] {new ConcreteHandlerA(), new ConcreteHandlerB()};
Request request = new Request();
handlers[0].handleRequest(request, handlers, 1); // 发送请求到链头
}
}
然后我们来看正主的设计,看着似乎像是集合进行维护,发起调用的地方是proceed
,结果也由它返回。为了印证我们的思路,跟着源码和注释看下去
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 组链:这里的顺序需要记住
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 这里将链全部投入,封装为RealInterceptorChain,注意这个0,在入参中代表index
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
// 发起调用,获取结果,承接下面的源码
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
override fun proceed(request: Request): Response {
return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
// ……
// Call the next interceptor in the chain.
// next这里对Index+1
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
// index的作用在这,表示当前节点
val interceptor = interceptors[index]
// 这个设计就是责任链的形式,上游可以调度下游,把next传入
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// ……
return response
}
我们将next称作下游,interceptor称作当前节点,或者看作上游,得到如下的责任链伪代码
1. 事前处理 this.doBefore()
1.1 传递处理 next.interceptor -> return nextResponse
1.2 得到下游返回的结果nextResponse
2. 事后处理 this.doAfter(nextResponse)
对比一下事件分发的责任链思路,当然这里省略了关于记忆性搜索(即child找到确立后不必再找)的一些细节
1. 事前拦截阶段 viewgroup.intercepter
1.1 传递处理 child.dispatchTouchEvent -> child.onTouchEvent return result
1.2 得到回传结果result
2. 事后处理 handled……
拦截器
理解了责任链的整体运行思路后,大概明白了整个请求的执行思路了,那我们就需要跟着链chain的对象Interceptor逐个看看,他们都做了什么
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
根据如上代码,得到链顺序如图:最先的是
- 自定义interceptors
- RetryAndFollowUpInterceptor:重定向拦截器,负责处理请求异常、重试和重定向的逻辑
- BridgeInterceptor:桥接拦截器,负责把用户构造的请求转换为发送到服务器的请求,把服务器返回的响应转换为用户友好的响应
- CacheInterceptor:缓存拦截器,负责根据缓存策略和响应头判断是否使用缓存或网络,以及更新缓存
- ConnectInterceptor:连接拦截器,负责建立连接,选择路由和协议
- 自定义networkInterceptors
- CallServerInterceptor:调用服务器拦截器,负责向服务器发送请求和接收响应
自定义拦截器
比如我们可能需要输出接口日志、处理接口公参等情况,就可以用到这个。当然如果需要关于header、body等其他细节的话,那就应该放在networkInterceptors中,至于放在哪,完全看你处理的时机和内容。当然,一定需要proceed(chain.request())传递下去
OkHttpClient.Builder()
.connectTimeout(builder.timeout, TimeUnit.SECONDS)
.readTimeout(builder.timeout, TimeUnit.SECONDS)
.writeTimeout(builder.timeout, TimeUnit.SECONDS)
.addInterceptor(LogInterceptor())
.build()
class LogInterceptor : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
1. 事前
val request = chain.request()
// 方法细节忽略
generateRequestLog(request).let {
Logger.json(it)
}
2. 递归
val response = chain.proceed(request)
3. 事后
getResponseText(response)?.let {
Logger.json(it)
}
return response
}
}
然后会将五大拦截器,但整体不会详细展开,仅对部分源码作注,因为细节点过多,对于整体框架设计来说,可以不过多关注
RetryAndFollowUpInterceptor
处理请求异常、重试或重定向的逻辑
override fun intercept(chain: Interceptor.Chain): Response {
var request = chain.request()
val realChain = chain as RealInterceptorChain
val transmitter = realChain.transmitter()
var followUpCount = 0
var priorResponse: Response? = null
while (true) {
// 死循环,以用于重试,如果不需要的话,进行break或return
var response: Response
var success = false
try {
// 实战训练,得到response
response = realChain.proceed(request, transmitter, null)
success = true
}
// ……
val exchange = response.exchange
// 这里是重定向判断
val route = exchange?.connection()?.route()
// 重试判断的关键,return的情况就意味着不需要重试了
val followUp = followUpRequest(response, route)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
transmitter.timeoutEarlyExit()
}
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
return response
}
response.body?.closeQuietly()
if (transmitter.hasExchange()) {
exchange?.detachWithViolence()
}
// 重试次数限制
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
}
}
BridgeInterceptor
修改请求或响应的头部信息,例如添加 Content-Type, Content-Length, Host, Cookie, User-Agent
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
// Content-Type
// Content-Length\Transfer-Encoding
// Host
// Connection
// Accept-Encoding
// Cookie
// User-Agent
// ……
val networkResponse = chain.proceed(requestBuilder.build())
// 这里还对response进行cookie解析
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 这里如果response是Gzip的话,还会进行解压
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
CacheInterceptor
实现缓存功能,提高请求效率和节省流量。注意只缓存Get方法,因为Post认为是涉及增删改等状态变更情况确认的,是需要c-s确认的,所以不能用缓存。具体的一些策略细节在CacheStrategy
中
// 创建一个缓存目录和大小
File cacheDirectory = new File(context.getCacheDir().getAbsolutePath(), "HttpCache");
int cacheSize = 10 \* 1024 \* 1024; // 10 MiB
Cache cache = new Cache(cacheDirectory, cacheSize);
// 创建一个 OkHttpClient 并设置缓存
OkHttpClient client = new OkHttpClient.Builder()
.cache(cache)
.build();
-
CacheInterceptor 会根据请求和响应的 Cache-Control 首部来判断是否使用缓存或网络,以及更新缓存
-
如果请求有 Cache-Control: no-cache 或 Cache-Control: max-age=0,那么会强制使用网络,不使用缓存。
-
如果请求有 Cache-Control: only-if-cached,那么会强制使用缓存,不使用网络。
-
如果请求没有特殊的 Cache-Control,那么会根据响应的 Cache-Control 来判断是否使用缓存或网络,以及缓存的有效期。
-
如果响应有 Cache-Control: no-store,那么会强制不使用缓存,不更新缓存。
-
如果响应有 Cache-Control: no-cache 或 Cache-Control: must-revalidate,那么会强制进行再验证,即向服务器发送 If-None-Match 或 If-Modified-Since 首部来判断缓存是否有效。
-
如果响应有 Cache-Control: max-age=n,那么会设置缓存的有效期为 n 秒,在此期间可以直接使用缓存,不需要再验证。
// 缓存保存在client中
override fun intercept(chain: Interceptor.Chain): Response {
// 取缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// now时间戳也是影响缓存策略的要素之一
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// networkRequest:网络请求,null 表示是不使用网络
// cacheResponse:响应缓存,null 表示不使用缀存
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
// 缓存有,但无法响应,代表不可用
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 不需要请求,但缓存也无法响应,返回504
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// If we don't need the network, we're done.
// 不需要请求,经过上面的判断,这时cacheResponse肯定有,那就返回缓存结果
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
var networkResponse: Response? = null
try {
// 递归传递
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
// 返回code 304,那就使用缓存
// 和上面不一样的是,这时请求已经完成,但与服务端协商后使用缓存,会更新部分信息
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 更新缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
// 缓存失效,移除
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
缓存策略概括如图:
ConnectInterceptor
建立连接,选择最优的路由和协议
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// We need the network to satisfy this request. Possibly for validating a conditional GET.
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
其主要工作内容不作源码层面的展开,概括如下(先描述,后代码): • 根据请求的 URL 和 OkHttpClient 的配置,选择一个 RouteSelector 对象,用于寻找最优的路由和地址。
• 通过 RouteSelector 获取一个 Connection 对象,如果没有可用的连接,就创建一个新的连接。
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
• 通过 Connection 获取一个 RealConnection 对象,用于建立 Socket 连接,并通过 Okio 获取输入流和输出流。
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection
• 根据请求的协议,创建一个 ExchangeCodec 对象,用于编码请求和解码响应。
internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec
• 将 StreamAllocation, RealConnection 和 ExchangeCodec 封装成一个 Exchange 对象,用于和服务器进行通信。
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
• 将 Exchange 传递给下一个拦截器 CallServerInterceptor,用于向服务器发送请求和接收响应。
// newExchange的返回内容就是上面的reuslt
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
CallServerInterceptor
直接和服务器通信,发送请求和接收响应
override fun intercept(chain: Interceptor.Chain): Response {
// 内容很多,但主要就两个方法
// 对于request、response的操作,均根据 协议和编码 进行输入\输出流操作
// 发送请求头,exchange是上游ConnectInterceptor构建传递下来的
exchange.flushRequest()
// 解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
// 其余有一些处理,有需要可自行探索,不作展开了
}
总结
OkHttp 的源码结构设计是基于责任链模式和工厂模式的。责任链模式是指将请求和响应的处理分成多个拦截器,每个拦截器可以对请求或响应进行修改或转发,从而实现不同的功能,如重试、重定向、缓存、编解码等。工厂模式是指将对象的创建过程封装成一个工厂类,根据不同的参数或条件,返回不同的对象实例,从而实现多态和解耦,如 ConnectionPool、CallFactory、WebSocketFactory 等。
灵魂发问
1. Okhttp 有哪些优点或特性?
支持 HTTP/2,对一台机器的所有请求共享同一个 Socket
内置连接池,支持连接复用,减少延迟
支持透明的 gzip 压缩响应体
响应缓存可以完全避免网络重复请求
请求失败时自动重试主机的其他 IP,自动重定向
2. Okhttp 的请求和响应的流程是怎样的?
• 通过 OkHttpClient.Builder 构建一个 OkHttpClient 对象,设置一些配置参数,如超时时间、拦截器、缓存等。
• 通过 Request.Builder 构建一个 Request 对象,设置一些请求参数,如 URL、方法、头部、体等。
• 通过 OkHttpClient.newCall 方法创建一个 Call 对象,表示一个网络请求任务。
• 通过 Call.execute 或 Call.enqueue 方法发起同步或异步请求。
• 通过 RealCall 类实现 Call 接口,并将请求任务加入到 Dispatcher 的同步或异步队列中。
• 通过 Dispatcher 类管理同步或异步队列,并根据最大并发数和主机数来调度任务执行。
• 通过 RealCall.getResponseWithInterceptorChain 方法创建一个责任链模式的拦截器链,并依次执行拦截器的逻辑。
• 通过 RetryAndFollowUpInterceptor 拦截器处理请求异常、重试和重定向的逻辑。
• 通过 BridgeInterceptor 拦截器把用户构造的请求转换为发送到服务器的请求,把服务器返回的响应转换为用户友好的响应。
• 通过 CacheInterceptor 拦截器根据缓存策略和响应头判断是否使用缓存或网络,以及更新缓存。
• 通过 ConnectInterceptor 拦截器建立连接,选择路由和协议。
• 通过用户自定义的网络拦截器处理一些网络层面的逻辑,如日志、监控、修改等。
• 通过 CallServerInterceptor 拦截器向服务器发送请求和接收响应,并根据协议进行编解码。
• 通过 Response 类封装响应头和响应体,并返回给上层调用者。
3. Okhttp 如何实现缓存功能?它是如何根据 Cache-Control 首部来判断缓存策略的?
• Okhttp 实现缓存功能是通过 CacheInterceptor 拦截器和 Cache 类来实现的。
在创建 OkHttpClient 对象时,可以指定一个 Cache 对象,用于存储缓存的响应。
CacheInterceptor 拦截器会根据请求和响应的 Cache-Control 首部来判断是否使用缓存或网络,以及更新缓存。
Cache-Control 首部是用于控制缓存行为的指令,它有以下几种常见的值:
• no-cache:表示不使用本地缓存,必须向服务器验证缓存是否有效。
• no-store:表示不使用本地缓存,也不更新本地缓存。
• only-if-cached:表示只使用本地缓存,不使用网络。
• max-age=n:表示本地缓存在 n 秒内有效,超过 n 秒后需要向服务器验证或重新获取。
• must-revalidate:表示本地缓存必须向服务器验证是否有效,如果无效则重新获取。
• CacheInterceptor 拦截器的工作流程大致如下:
根据请求查找是否有匹配的缓存响应,如果没有,则直接使用网络,并将响应写入缓存(如果满足条件)。
如果有匹配的缓存响应,判断是否过期或需要再验证,如果是,则向服务器发送带有验证首部的请求,并根据服务器的响应来决定是否使用缓存或更新缓存。
如果不过期或不需要再验证,则直接使用缓存,并添加 Age 首部来表示缓存的新鲜度。
4. Okhttp 如何实现重试和重定向功能?
Okhttp 实现重试和重定向功能是通过 RetryAndFollowUpInterceptor 拦截器来实现的。
在发送请求前,会根据 OkHttpClient 的配置,创建一个 RouteSelector 对象,
用于寻找最优的路由和地址。如果请求失败或收到重定向的响应,
会根据 RouteSelector 的策略来选择是否重试或重定向,并更新请求的 URL 和头部信息
5. Okhttp 如何自定义拦截器?你有没有使用过或编写过自己的拦截器?
通过实现 Interceptor 接口,Interceptor 接口只有一个方法 intercept,
该方法接收一个 Chain 参数,表示拦截器链。
在 intercept 方法中,可以对请求或响应进行修改或转发,
并且可以决定是否继续传递给下一个拦截器。
在创建 OkHttpClient 对象时,可以通过 addInterceptor 或 addNetworkInterceptor 方法来添加自定义的应用拦截器或网络拦截器。
我有使用过或编写过自己的拦截器,例如:
• 一个日志拦截器,用于打印请求和响应的信息,方便调试和监控。
• 一个加密拦截器,用于对请求参数进行加密,保证数据的安全性。
• 一个认证拦截器,用于对请求添加认证信息,如 token、签名等,实现用户的身份验证。
6. Okhttp 如何实现同步和异步的请求方式?它是如何处理回调和异常的?
Okhttp 实现同步和异步的请求方式是通过 Call 接口和 RealCall 类来实现的。
Call 接口表示一个网络请求任务,它有两个方法 execute 和 enqueue。
execute 方法用于发起同步请求,它会阻塞当前线程直到获取响应,并返回一个 Response 对象。
enqueue 方法用于发起异步请求,它会将请求任务加入到 Dispatcher 的异步队列中,并传入一个 Callback 参数。
Callback 参数是一个回调接口,它有两个方法 onResponse 和 onFailure。
onResponse 方法用于处理成功的响应,它会在异步线程中回调,并传入一个 Call 和一个 Response 参数。
onFailure 方法用于处理失败的异常,它也会在异步线程中回调,并传入一个 Call 和一个 IOException 参数。
7. Okhttp 如何管理连接池和线程池?它是如何复用和回收连接的?
• Okhttp 管理连接池和线程池是通过 ConnectionPool 类和 Dispatcher 类来实现的。
ConnectionPool 类表示一个连接池,它维护了一个双端队列,用于存储空闲的连接。
它有一个清理线程,用于定期检查连接是否过期或超过最大空闲数,并将其移除。
Dispatcher 类表示一个调度器,它维护了三个双端队列,分别用于存储同步任务、异步任务和等待执行的异步任务。
它有一个线程池,用于执行异步任务,并根据最大并发数和主机数来调度任务执行。
• Okhttp 复用和回收连接是通过 StreamAllocation 类和 RealConnection 类来实现的。
StreamAllocation 类表示一个流分配器,它负责管理连接的分配和释放。
RouteSelector 对象,用于寻找最优的路由和地址。
RealConnection 对象,用于表示当前分配的连接。
release 方法,用于释放连接,并根据连接是否空闲或是否可以复用来决定是否将其加入到连接池中。
RealConnection 类表示一个真实的连接,它封装了一个 Socket 对象,用于建立 TCP 连接,并通过 Okio 获取输入流和输出流。
allocationLimit 属性,用于表示该连接可以分配给多少个流。
noNewStreams 属性,用于表示该连接是否可以创建新的流。
onStreamFinished 方法,用于在流结束时减少 allocationLimit 的值,并根据情况释放或回收连接。
网友评论