源代码版本3.12.1
一、一般使用
1、创建OkHttpClient客户端
OkHttpClient mOkHttpClient = new OkHttpClient.Builder()
.connectTimeout(timeoutSecond, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build();
2、创建 Request:
//get请求
Request request = new Request.Builder()
.url(url)
.header("User-Agent", userAgent)
.build();
//post请求
FormBody.Builder builder = new FormBody.Builder();
builder.add(Key, Value);
RequestBody requestBody = builder.build();
Request request = new Request.Builder()
.url(url)
.header("User-Agent", userAgent)
.post(requestBody)
.build();
3、创建Call并发起一个请求:
Call call =mOkHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
二、源码解读
#OkHttpClient
// call.enqueue之前需要获取Call
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
# RealCall
//其实就是为了获取一个RealCall
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
//这个原始请求就是我们写的,未走拦截器之前的请求
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
//重试拦截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
this.timeout = new AsyncTimeout() {
@Override protected void timedOut() {
//cancel是取消网络请求的方法;
cancel();
}
};
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
#newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//创建一个realcall
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
#RealCall
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//AsyncCall是一个Runnable。继承NamedRunnable,实现体是execute方法;
//responseCallback是我们穿进去的回调
//Dispatcher是一个调度类,它里面有线程池
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
#NamedRunnable
public abstract class NamedRunnable implements Runnable {
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
Dispatcher 是 OkHttpClient 的调度器,是一种门户模式。主要用来实现执行、取消异步请求操作。本质上是内部维护了一个线程池去执行异步操作,并且在 Dispatcher 内部根据一定的策略,保证最大并发个数、同一 host 主机允许执行请求的线程个数等。
# Dispatcher
void enqueue(AsyncCall call) {
synchronized (this) {
// 将call放入准备队列里面:Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//从准备队列取出。判断是否满足执行队列数据要求
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
// 将满足条件的call放入要执行的队列里面
//private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//满足条件的call。executeOn方法执行线程任务
asyncCall.executeOn(executorService());
}
return isRunning;
}
//executorService()方法返回一个线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出,实际上就是使用线程池执行了一个 AsyncCall,而 AsyncCall 实现了 Runnable 接口,因此整个操作会在一个子线程(非 UI 线程)中执行。
#RealCall-AsyncCall
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//在线程池中执行这个realcall的execute()方法
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
#RealCall-AsyncCall
@Override protected void execute() {
boolean signalledCallback = false;
try {
//最重要的一步。获取拦截器链返回的相应。
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//responseCallback是我们传入的回调接口
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//responseCallback是我们传入的回调接口
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
在 run 方法中执行了另一个 execute 方法,而真正获取请求结果的方法是在 getResponseWithInterceptorChain 方法中:
#RealCall
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//自定义的interceptor
interceptors.addAll(client.interceptors());
//重定向interceptor
interceptors.add(retryAndFollowUpInterceptor);
//桥interceptor,可以给请求设置header信息
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存interceptor,默认有一个Cache,但是自定义的需要指定保存路径和大小
interceptors.add(new CacheInterceptor(client.internalCache()));
//链接interceptor,由连接池ConnectionPool管理,会从连接池里面选择可用的复用连接。什么样的连接才可以复用:请求数量有限制,须符合;是否连接到同一个address(ip地址,端口都需要一样)、HTTP加密配置一样、主机名一样、等等...
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//网络interceptor
interceptors.addAll(client.networkInterceptors());
}
//CallServerInterceptor负责向服务器发送请求,并从服务器拿到远端数据结果。
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//这里是第一次执行,穿入原始的request
return chain.proceed(originalRequest);
}
}
#RealInterceptorChain
//拦截器执行逻辑。默认index==0;上一个拦截器执行完返回后index + 1。再继续执行下一个拦截器。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
//每执行一次拦截,chain.proceed携带自己处理的request到这里后,index+1,创建下一个拦截器链执行器。
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
//当前拦截器执行自己的拦截逻辑,传入下一个拦截器链的执行器
Response response = interceptor.intercept(next);
return response;
}
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
//连接器链的proceed方法参数是request。
Response proceed(Request request) throws IOException;
}
看下桥拦截器BridgeInterceptor:
# BridgeInterceptor
public final class BridgeInterceptor implements Interceptor {
//这里传入的chain是调用链中的下一个拦截器链
@Override public Response intercept(Chain chain) throws IOException {
//这里的userRequest也就是找个拦截器拦截之前的request
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
···
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//这个拦截器给request添加了header信息之后,继续调用chain的proceed方法,传入自己拦截并处理过的request。
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
...
return responseBuilder.build();
}
}
最后看下执行网络请求和返回响应的CallServerInterceptor拦截器
public final class CallServerInterceptor implements Interceptor {
private final boolean forWebSocket;
public CallServerInterceptor(boolean forWebSocket) {
this.forWebSocket = forWebSocket;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//获取HttpCodec
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
//将请求头发送到服务器 realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
//如果有请求体就发生到服务器
...
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
}
//网络请求过程结束
httpCodec.finishRequest();
---------------------
//读取响应头部数据
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
//构建响应body
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//如果需要请求进度,可以处理这个body中传入的响应体,根据读取数据去实现。
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
CallServerInterceptor执行完成,返回最终服务器返回的response。
至此,完结。
网友评论