入门用法的传送门:https://www.jianshu.com/p/500abf06f447
上篇我们简单讲解了一下okHttp的简单使用,这篇开始我们会从核心源码开始探究一下okhttp,这里只会把核心的源码贴出来讲解一下。
开始使用这个网络库的时候首先要创建一个 OkHttpClient,这个类是通过构造者模式创建出来的:
Builder是OkHttpClient的一个内部类
// 这里面进行了一系列初始化
public Builder() {
dispatcher = new Dispatcher(); // 同步和异步请求队列的分发(比较核心的一个类)
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool(); // 连接池,当选择的连接相同时可以复用节省资源
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
build()方法:
public OkHttpClient build() {
return new OkHttpClient(this);
}
Request创建也是使用的构造者模式:
public Builder() {
this.method = "GET"; // 默认的请求时get
this.headers = new Headers.Builder();
}
build();也是很简洁的
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
构造的时候可以写入请求连接(传入的字符串会被转化为HttpUrl),指定请求方法不指定默认请求方法是get请求。
把Request对象转化为Call ,我们来看看源码
Call call = okHttpClient.newCall(request);
点击进去
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
Call是一个接口,定义了供网络执行请求调用的以下方法;
Response execute() throws IOException; // 同步
void enqueue(Callback responseCallback); // 异步
void cancel(); // 取消请求
....
RealCall.newRealCall方法:
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call; // 返回一个call对象
}
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client; // 初始化客户端请求
this.originalRequest = originalRequest; // 初始化好request对象
this.forWebSocket = forWebSocket;
// 创建重定向拦截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
this.timeout = new AsyncTimeout() { // 创建异步超时对象
@Override protected void timedOut() {
cancel();
}
};
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); // 初始化超时时间
}
传进来的Request对象最终在RealCall类中通过拦截器链进行网络请求:
// 所使用的拦截器链 在官网上分为两个拦截器 Application 应用拦截器和NetWork网络拦截器
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); // 添加一个用户自定义的拦截器
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest); // 拦截后调用proceed方法来执行相应请求
}
下面我们通过源码看一下同步和异步是怎么执行的:
同步
@Override public Response execute() throws IOException {
synchronized (this) { // 加同步锁
// 同一个http请求只能执行一次,执行完就设置为你true
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this); // 客户端分发异步和同步的网络请求
Response result = getResponseWithInterceptorChain(); // 通过拦截器实现网络请求并返回响应体
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this); // 这个方法等把异步分析后一起讲
}
}
这里我们说一个核心类Dispatcher,什么是Dispatcher呢?
Dispatcher的作用为维护请求的状态,并维护一个线程池,用于执行请求。
我们来看看它的成员变量:
/**
* Policy on when async requests are executed.
大概意思:执行异步请求时的策略,分发异步和同步的网络请求
*/
public final class Dispatcher {
private int maxRequests = 64; // 异步请求的最大个数为64
private int maxRequestsPerHost = 5; // 相同主机最大请求数
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService; // 线程池
/** Ready async calls in the order they'll be run.当请求条件不满足我们的条件时,异步请求就会进入这个队列当中等待执行
* 这里的条件就是小于异步请求的最大个数为64 和 相同主机最大请求数小于5 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); // 异步就绪队列,缓存等待的请求队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet.包含了已经取消但没有执行的异步请求 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 正在执行的异步 队列,判断并发请求的数量
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); // 同步的执行队列
内部所维护的线程池:
// 返回一个线程池对象 在这里保证线程池是一个单例
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 第一个参数表示设置核心线程池的数量,这里设置为0,表示线程空闲,超过60秒后会把所有的线程全部销毁
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
回到上面接着说最后同步的方法在Distapcher类中的实现过程:
synchronized void executed(RealCall call) {
runningSyncCalls.add(call); // 把请求添加到同步请求队列当中队列当中
}
把同步请求加入队列中,交给线程池执行就好了,到这里同步就完成了。
异步
进入RealCall类中的enqueue方法:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
// 和同步一样判断是否请求过这个链接,已经请求过就直接抛出异常
if (executed) throw new IllegalStateException("Already Executed");
executed = true; // 请求过就设置为true
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback)); // 关键代码
}
继续走进去到dispatcher的enqueue方法:
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call); // 加入等待队列当中
}
promoteAndExecute(); // 关键实现方法
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
// 循环缓存等待的异步请求队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next(); // 把循环的队列中的元素取出
// 当前异步请求的个数大于最大请求数的时候就打断 break 是跳出整个循环体
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
// 当前网络请求的相同Host请求数大于等于5个的时候 continue 结束单次循环并继续
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove(); // 把上面取出的元素从队列中删除
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall); // 添加到正在执行的异步请求队列中,重新计算正在执行异步请求的数量
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService()); // 通过线程池执行网络请求
}
return isRunning;
}
异步最终执行线程的回调:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this); // 在这里才真正开始执行线程
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
// 这个方法才是线程真正实现的方法,在父类NamedRunnable中是一个抽象的方法,在线程的run方法中实现的,这里也证明了异步的回调都是在子线程中执行的
@Override protected void execute() { // 这个方法是在子线程中操作的
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain(); // 拦截器链
if (retryAndFollowUpInterceptor.isCanceled()) { // 重定向和重试的拦截器是否取消了
signalledCallback = true;
// 如果取消了,就回掉返回失败给用户,这是在子线程中执行的回调
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 没取消重定向拦截器就正常返回数据,这是在子线程中执行的回调
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
/**
* 这个finish做的事情
* 1.把这个请求从正在请求的队列中删除
* 2.调整我们整个异步请求的队列,因为这个队列是非线程安全的
* 3.重新调整异步请求的数量
*/
client.dispatcher().finished(this);
}
}
}
enqueue方法总结:
1.判断当前call
2.封装成一个AsyncCall对象
3.client.dispatcher().enqueue
finish()方法:
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
// 移除网络请求从队列中,不能移除就抛出异常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
// 返回执行异步请求和同步请求的数量总和
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
异步请求为什么会需要两个队列呢
1.异步的队列维护都在Dispatcher这个类中,可以暂时把这个类理解成“生产者”。
2.Dispatcher类中有一个线程池ExecutorService,可以把这个也暂时理解成“消费者池”。
3.Deque<AsyncCall> readyAsyncCalls;缓存等待的队列。
4.Deque<AsyncCall> runningAsyncCalls;正在执行的异步队列
当异步任务很多的时候超过了runningAsyncCalls正在执行的异步队列的限制的时候
这是Dispatcher中异步的方法:
void enqueue(AsyncCall call) {
synchronized (this) { // 避免加入脏数据,加上同步代码块
readyAsyncCalls.add(call); // 先全部加入等待队列中
}
promoteAndExecute(); // 在这里进行条件判断,满足条件就加入异步执行队列中
}
这里的条件就是小于异步请求的最大个数为64 和 相同主机最大请求数小于5,就会把请求加入异步执行队列中。
这里上一张图,大家可以对照着图片理解:
image.png
到这里异步和同步的核心代码都分析完了,下一篇就是Okhttp的重中之重拦截器的分析。
谢谢阅读,如有错误欢迎指正。
OkHttp拦截器链源码解读:https://www.jianshu.com/p/1181f48d6dcf
网友评论