近段时间在阅读okhttp的源码,以此作为笔记。
第一篇分析okhttp的执行流程,首先是okhttp的使用,以下为源码提供的简单使用。
1.使用示例
public static void main(String... args) throws Exception {
//请求地址
String ENDPOINT = "https://api.github.com/repos/square/okhttp/contributors";
//构建okhttpclient
OkHttpClient client = new OkHttpClient();
//创建一个请求,没有指定方法默认为GET请求
// Create request for remote resource.
Request request = new Request.Builder()
//设置请求地址
.url(ENDPOINT)
.build();
//执行请求,获取请求结果,注意这里是同步请求会阻塞线程,execute方法是同步执行的
// Execute the request and retrieve the response.
try (Response response = client.newCall(request).execute()) {
//获取请求体
// Deserialize HTTP response to concrete type.
ResponseBody body = response.body();
//...以下代码省略
}
}
当然OkHttpClient和Request也可以通过各自的构造器自定义构建,使用的都是建造者模式。
通过Builder自定义OkHttpClient:
//自定义OkHttpClient
OkHttpClient client = new OkHttpClient.Builder()
//设置超时时间为15s,默认为10s
.connectTimeout(15, TimeUnit.SECONDS)
.build();
通过Builder创建一个POST请求:
//构建请求体
FormBody formBody = new FormBody.Builder()
//添加请求参数
.add("key", "value")
.build();
//创建一个post请求
Request post_request = new Request.Builder()
.url(ENDPOINT)
//指定方法为post,并添加请求参数
.post(formBody)
.build();
请求执行
//同步执行
client.newCall(request).execute()
//异步执行
client.newCall(requestrequest).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求服务器失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求服务器成功
}
});
2.OkHttpClient
OkHttpClient.Builder包含的字段如下:
public static final class Builder {
//请求分发器,这个类主要用于线程执行和分发
Dispatcher dispatcher;
//代理设置
@Nullable Proxy proxy;
//支持的协议版本,默认支持http2和http1.1
List<Protocol> protocols;
//http连接信息,包括https tls版本和加密算法
List<ConnectionSpec> connectionSpecs;
//自定义应用拦截器
final List<Interceptor> interceptors = new ArrayList<>();
//自定义网络请求拦截器
final List<Interceptor> networkInterceptors = new ArrayList<>();
//请求执行过程回调
EventListener.Factory eventListenerFactory;
//代理设置
ProxySelector proxySelector;
//cookie设置
CookieJar cookieJar;
//缓存设置,可以通过该类来设置是否使用缓存
@Nullable Cache cache;
//okhttp默认缓存实现
@Nullable InternalCache internalCache;
//socket factory
SocketFactory socketFactory;
@Nullable SSLSocketFactory sslSocketFactory;
@Nullable CertificateChainCleaner certificateChainCleaner;
//约束哪些证书和域名是被信任的
HostnameVerifier hostnameVerifier;
CertificatePinner certificatePinner;
Authenticator proxyAuthenticator;
Authenticator authenticator;
//链接池,可回收请求链接
ConnectionPool connectionPool;
//dns解析服务,默认为系统的解析服务
Dns dns;
boolean followSslRedirects;
boolean followRedirects;
boolean retryOnConnectionFailure;
//超时时间 默认10s
int connectTimeout;
//读写超时时间 默认10s
int readTimeout;
int writeTimeout;
//http2 ping服务器的时间间隔
int pingInterval;
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
//...以下代码省略
}
常用的方法主要有:
/****超时时间的设置****/
public Builder connectTimeout(long timeout, TimeUnit unit) {
connectTimeout = checkDuration("timeout", timeout, unit);
return this;
}
public Builder readTimeout(long timeout, TimeUnit unit) {
readTimeout = checkDuration("timeout", timeout, unit);
return this;
}
public Builder writeTimeout(long timeout, TimeUnit unit) {
writeTimeout = checkDuration("timeout", timeout, unit);
return this;
}
/****cookie设置****/
public Builder cookieJar(CookieJar cookieJar) {
if (cookieJar == null) throw new NullPointerException("cookieJar == null");
this.cookieJar = cookieJar;
return this;
}
/****缓存设置****/
public Builder cache(@Nullable Cache cache) {
this.cache = cache;
this.internalCache = null;
return this;
}
/****添加自定义拦截器****/
public Builder addInterceptor(Interceptor interceptor) {
if (interceptor == null) throw new IllegalArgumentException("interceptor == null");
interceptors.add(interceptor);
return this;
}
...
3.Request
Request.Builder包含的字段如下:
public static class Builder {
//请求地址信息,协议、端口、ip、域名
HttpUrl url;
//请求方法
String method;
//请求头
Headers.Builder headers;
//请求体
RequestBody body;
/** A mutable map of tags, or an immutable empty map if we don't have any. */
Map<Class<?>, Object> tags = Collections.emptyMap();
public Builder() {
//默认为GET请求
this.method = "GET";
this.headers = new Headers.Builder();
}
//...省略
}
同步请求
在上文有提到client.newCall(request).execute()表示发起一个同步请求,看一下OkHttpClient.newCall(Request)方法实现
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
OkHttpClient.newCall方法只是创建了一个RealCall对象,所以同步请求发起具体实现在RealCall.execute()方法,再来看一下RealCall.execute()方法执行了什么操作
@Override
public Response execute() throws IOException {
//判断当前任务是否在执行状态
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//事件回调
eventListener.callStart(this);
try {
//这里调用了OkHttpClient的Dispatcher的executed方法,该方法将当前任务添加到任务执行队列
client.dispatcher().executed(this);
//执行请求,获取请求结果
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
看到Response关键字
Response result = getResponseWithInterceptorChain();
说明最后是通过getResponseWithInterceptorChain()该方法执行请求并获取请求结果,在方法中有看到调用了client.dispatcher().executed(this),在上文中我们有提到Dispatcher主要用于分发和执行线程,接下来分析一下Dispatcher这个类。
1.Dispatcher
Dispatcher主要用于分发和执行线程,OkHttpClient发起异步请求的时候Dispatcher采用线程池进行执行线程,简单分析下Dispatcher这个类的属性和常用方法:
public final class Dispatcher {
//最大请求数量
private int maxRequests = 64;
//同一主机所支持的最大请求数量
private int maxRequestsPerHost = 5;
//线程池
private @Nullable ExecutorService executorService;
//存放正在等待执行的异步请求任务
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//存放正在执行异步请求任务
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//存放正在执行的同步请求任务
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//...省略
//创建线程池,懒加载
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
//发起异步请求
synchronized void enqueue(AsyncCall call) {
//判断请求数量是否超过阈值
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加到任务执行队列
runningAsyncCalls.add(call);
//通过线程池执行请求
executorService().execute(call);
} else {
//添加到等待队列
readyAsyncCalls.add(call);
}
}
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//异步请求结束时调用该方法继续分发正在等待的请求
if (promoteCalls){
promoteCalls();
}
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
//在每个异步请求结束后会继续分发正在等待的请求
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
//执行同步请求,这里只添加到任务执行队列
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
//...省略
}
在执行同步请求时Dispatcher. executed()只将任务添加到任务执行队列。具体执行通过RealCall.getResponseWithInterceptorChain()该方法请求并获取请求结果,所以OkHttp的网络请求执行的精髓就在这个方法,接下来看一下该方法的具体实现:
2.getResponseWithInterceptorChain()
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//添加用户自定义的拦截器
interceptors.addAll(client.interceptors());
//重试和重定向的拦截器
interceptors.add(retryAndFollowUpInterceptor);
//请求构造的拦截器如果有设置cookie可用也在这里添加
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//链接拦截器这里建立socket链接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//这里是最后一个拦截器,发起服务器请求
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建一个拦截器请求责任链,这里传了index参数为0,留意这个参数
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
首先getResponseWithInterceptorChain添加了以下拦截器:
1.用户自定义的拦截器client.interceptors
2.重试和重定向的拦截器retryAndFollowUpInterceptor
3.请求构造的拦截器BridgeInterceptor
4.缓存拦截器CacheInterceptor
5.链接拦截器ConnectInterceptor
6.用户自定义的网络拦截器client.networkInterceptors
7.最后一个是与服务器通信的CallServerInterceptor
拦截器请求链如下:
image.png最后调用了RealInterceptorChain.proceed()方法,看一下该方法的执行
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//...省略
// 责任链模式,通过index+1获取下一个拦截器,执行下一个拦截器的intercept方法
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
//...省略
return response;
}
责任链模式,通过index+1获取下一个拦截器,执行下一个拦截器的intercept方法,在这里每一个Interceptor都有自己负责的功能,重定向、cookie设置、缓存等都在与服务器通信之前完成,最后一个 Interceptor负责和服务器通信,获取的通信结果由最后一级依次向上传递。
异步请求
1.简单使用示例
client.newCall(post_request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求服务器失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求服务器成功
}
});
在client.newCall(Request)创建一个RealCall对象后通过RealCall.enqueue(Callback)发起一个异步请求,来看一下RealCall.enqueue()方法的具体实现:
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//发起异步请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在RealCall的enqueue方法中,先是创建了一个AsyncCall最后通过OkHttpClient.Dispatcher.enqueue()发起异步请求,来看一下AsyncCall这个类:
2.AsyncCall
final class AsyncCall extends NamedRunnable {
//...省略
@Override
protected void execute() {
boolean signalledCallback = false;
try {
//通过拦截器请求链执行请求
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最后调用该方法继续分发执行正在等待的任务
client.dispatcher().finished(this);
}
}
}
可以看到AsyncCall继承了Runnable,执行请求的方式跟同步请求无异都是通过getResponseWithInterceptorChain()执行请求,跟同步请求不同的是在每个异步请求任务结束后会判断当前是否还存在正在等待执行的任务,如果有则通过线程池继续分发执行。
网友评论