一、OkHttp历史过程
OkHttp一开始是Square公司觉得原生的HttpURLConnection和阿帕奇的HttpClient不太好用,然后基于他们封装的,然后开源了,全世界都觉得好用。由于Google不建议使用HttpClient,所以Square一开始是把这一块独立出来了,后面又直接去掉了。后面他们一股作气,把HttpURLConnection也去掉了,这个就是完全不依赖Google原生,自己从实现了socket来进行网络连接。最后Google官方收录了OkHttp代码。实际上Android的HttpURLConnection的代码已经是OkHttp的代码了。这个时候是2013年,目前都是2020年了。
OkHttp4已经开始用Kotlin来重写了,所以kotlin对于安卓来说很重要,我们不是小孩子,我们不做选择,Flutter和kotlin我们都要。
https://square.github.io/okhttp/ 官网介绍
二、基本使用
val url = "https://api.github.com/"
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
client.newCall(request).enqueue(object : Callback{
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})
三、源码分析,从基本使用的API开始入手 ,enqueue方法
调用的是RealCall的enqueue方法,源码如下(我贴出来的源码中,关键的地方都有注释,只看注释可以找到流程中的关键点,建议把源码翻出来对着看,流程并不太难的)
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
看到RealCall里面有OkHttpClient,OkHttpClient是啥,里面有请求的各种配置。还有Request,这个是个啥,就是我们要发起一个请求所有的东西都封装在这个里面了。forwebSockt:Boolean 是否启用webSockt,webSockt android端常用的长连接,可以随时接受服务端的推送。webSockt协议是在http协议之上建立的。
enqueue的跟踪:调用到RealCall的enqueue方法,
client.dispatcher().enqueue(new AsyncCall(responseCallback));
首先我们看下Dispatcher类
private int maxRequests = 64;//默认允许的最大网络请求数
private int maxRequestsPerHost = 5;//默认允许的最大主机数
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */ //异步请求缓存队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ //异步请求正在运行的队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */ //同步请求正在运行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
再来看看Dispatcher的enqueue方法源码
void enqueue(AsyncCall call) {
synchronized (this) {
//把请求放到异步请求准备队列中
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
接着看promoteAndExecute()
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
//遍历准备队列,如果没有超过最大请求数和最大主机数量,从异步准备队列移处,
//然后添加到executableCalls和正在运行的异步队列中
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//开始把刚才的请求遍历,然后执行
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//这里查看下怎么执行请求任务
asyncCall.executeOn(executorService());
}
return isRunning;
}
查看asyncCall.executeOn(executorService());源码
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//通过线程池来执行,this表示当前类实现了Runnable
//当前类是AsyncCall final class AsyncCall extends NamedRunnable {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
//responseCallback是我们传进来的回调,这个地方回掉onFailure
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
查看NamedRunnable源码
//果然实现了Runnable
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
// run 方法中调用了本类的抽象方法execute,所以我们在AsyncCall中查看execute()实现
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
查看AsyncCall中的execute实现
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
//通过这行代码去连接服务器并获取对应的响应
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//我们的失败回调
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//我们的成功回调
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//不管成功还是失败都会结束本次请求
client.dispatcher().finished(this);
}
}
}
四、拦截器
RetryAndFollowUpInterceptor 负责失败重试以及重定向
BridgeInterceptor 把客户端的请求转化为服务器需要的请求,把服务器返回的响应转化为
CacheInterceptor 负责读取缓存以及更新缓存
ConnectInterceptor 负责与服务器建立连接
CallServerInterceptor 负责从服务器读取响应数据
分析下责任链模式
首先是RealCall 的getResponseWithInterceptorChain()中
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
可以看到创建了一个RealInterceptor构造,其中index为0,然后chain.proceed(originalRequest) 这个会调用RealInterceptor的proceed(t)方法,我们看下源码
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
重点看下上面的注释 // Call the next interceptor in the chain. 处
首先会构造一个RealInterceptorChain,这个的index会+1,然后获取当前的interceptor,执行interceptor的intercept(chain)方法,找一下系统的RetryAndFollowUpInterceptor,看下他的intercept源码
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try { // ***********************************注释1***********************************
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
if (followUp == null) {
streamAllocation.release();
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
找到代码 // 注释1处
这个里面的chain是刚才index+1的RealInterceptorChain,又会重新调用之前的chain.proceed(originalRequest) ,只不过这是调用的第二个intercepter的intercept方法,就这样一直掉下去,直到最后一个CallServerInterceptor里面,这个里面是没有调用chain方法,所以会结束调用,返回response。所以我们自定义Intercept一般的都是会调用chain方法,否则OkHttp自带的intercepter就不会被调用到了。这个就是完整的拦截器责任链分发。
网友评论