一个基本的OKhttp基本创建立流程
OkHttpClient client = new OkHttpClient();
RequestBody body = RequestBody.create(JSON, "{json:json}");
Request request = new Request.Builder()
.url("https:www.baidu.com")
.post(body)
.build();
Response response = client.newCall(request).execute();
分析下最基础的类
1.OkHttpClient
分析构造方法并添加注释(添加注释是为了 后面看着更加方便)
public OkHttpClient() {
this(new Builder());
}
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher; //调度器
this.proxy = builder.proxy;//代理
this.protocols = builder.protocols;//协议
this.connectionSpecs = builder.connectionSpecs;//传输层版本和连接协议
this.interceptors = Util.immutableList(builder.interceptors);//拦截器
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);//网络拦截器
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;//代理选择器
this.cookieJar = builder.cookieJar;//cookie
this.cache = builder.cache;//cache 缓存
this.internalCache = builder.internalCache;//内部缓存
this.socketFactory = builder.socketFactory;//socket 工厂
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;//socket工厂 用于https
this.certificateChainCleaner = builder.certificateChainCleaner;//验证确认响应书,适用HTTPS 请求连接的主机名
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;//主机名字确认
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);//证书链
this.proxyAuthenticator = builder.proxyAuthenticator;//代理身份验证
this.authenticator = builder.authenticator;//本地省份验证
this.connectionPool = builder.connectionPool;//链接池 复用连接
this.dns = builder.dns;//域名
this.followSslRedirects = builder.followSslRedirects;//安全套接层重定向
this.followRedirects = builder.followRedirects;//本地重定向
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;//重试连接失败
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;//连接超时
this.readTimeout = builder.readTimeout;//读取超时
this.writeTimeout = builder.writeTimeout;//写入超时
this.pingInterval = builder.pingInterval;
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
我们在创建好了 OkHttpClient 之后我们有创建了一个Request
2.Request、Response
request:我们通常说的一个请求所携带的所有信息由 Headers和 RequestBody组成 RequestBody有两个子类 FormBody (表单提交的)和 MultipartBody(文件上传)
Response:服务请求的返回值,同样包括Headers和RequestBody,而ResponseBod的子类也是有两个:RealResponseBody(真实响应)和CacheResponseBody(缓存响应)
在生成了真正的请求后 我们 然后就是使用 client.newCall(request).execute();去真正的请求数据
OkHttpClient 的newCall方法
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
实际上是调用了 RealCall的newRealCall 方法
下面我们来看看RealCall类
3.RealCall类
先来看构造方法 已添加注释
/***
* 构造方法
* @param client 初始的 OkHttpClient对象
* @param originalRequest 请求的 Request对象
* @param forWebSocket 是否使用webSokcet
*/
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
里面的两个重要方法
execute():同步网络请求
enqueue():异步网络请求
我们以同步为例子继续向下看
@Override public Response execute() throws IOException {
//使用同步加 executed 布尔值判断 确认executed只有一个在执行
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//创建超时控制
transmitter.timeoutEnter();
// 调用eventListener的callStart方法(绑定call)
transmitter.callStart();
try {
//将call添加到runningSyncCalls这个双向队列中
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
关于Transmitter 后面再或这次只过主流程
我们走到最后 到getResponseWithInterceptorChain方法中 我们进去看一下
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//添加开发者应用层自定义的Interceptor
interceptors.addAll(client.interceptors());
//这个Interceptor是处理请求失败的重试,重定向
interceptors.add(new RetryAndFollowUpInterceptor(client));
//这个Interceptor工作是添加一些请求的头部或其他信息 并对返回的Response做一些友好的处理
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
interceptors.add(new CacheInterceptor(client.internalCache()));
//这个Interceptor的职责是建立客户端和服务器的连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//添加开发者自定义的网络层拦截器
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
//一个包裹这request的chain
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
这这里我们获得了一个拦截器集合 通过 RealInterceptorChain的构造方法获得 Interceptor.Chain
我们看一下RealInterceptorChain的构造方法
public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
@Nullable Exchange exchange, int index, Request request, Call call,
int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.transmitter = transmitter;
this.exchange = exchange;
this.index = index;
this.request = request;
this.call = call;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
这里面只有赋值没有其他的操作
然后就剩下一个了chain.proceed(originalRequest) 我们来看这个方法
真正的proceed方法
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
最后返回带到 excute中
@Override public Response execute() throws IOException {
//使用同步加 executed 布尔值判断 确认executed只有一个在执行
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//创建超时控制
transmitter.timeoutEnter();
// 调用eventListener的callStart方法(绑定call)
transmitter.callStart();
try {
//将call添加到runningSyncCalls这个双向队列中
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
到这里同步的流程就走完了。
再来看看异步的请求
Response response = client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
进入 enqueue方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 调用eventListener的callStart方法(绑定call)
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
合同步一样先调用了 transmitter.callStart();然后调用 dispatcher.enqueue(new AsyncCall(responseCallback));那么咱们进入dispatcher看下
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
//这只是一个赋值函数
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
第一步我们把call添加到 readyAsyncCalls 然后判断 是否为websocket 进入reuseCallsPerHostFrom方法 最后进图promoteAndExecute() 我们进入promoteAndExecute里面看一下
//最大同时请求数量
private int maxRequests = 64;
//同一个Host下的最大同时请求数量
private int maxRequestsPerHost = 5;
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果正在执行的请求小于设定值即64
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//请求同一个主机的request小于设定值即5
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
根据源码和注释大家可以看到如果正在执行的异步请求小于64,并且请求同一个主机小于5的时候就先往正在运行的队列里面添加这个call,然后循环 执行 asyncCall.executeOn我们先去看一下 AsyncCall类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
AtomicInteger callsPerHost() {
return callsPerHost;
}
void reuseCallsPerHostFrom(AsyncCall other) {
this.callsPerHost = other.callsPerHost;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
有父类进去看一下
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
看到这里我们终于有看到熟悉的方法了 就是getResponseWithInterceptorChain()这个方法 然后就是安装同步的流程走完全部
下面我们来就来看一下整的Okhttp的流程图

下篇开始一个个的详解
最后献上一份添加了注释的源码 https://github.com/525642022/okhttpTest/blob/master/README.md
哈哈
网友评论