一、基本概念
首先从使用出发,其次再结合源码来分析OkHttp3的内部实现的,建议大家下载 OkHttp 源码跟着本文,过一遍源码。首先来看一下OkHttp3的请求代码。
同步请求方式代码:
//同步请求
private void testSyncRequest() {
OkHttpClient okHttpClient = new OkHttpClient();
//指定url,get()代表get请求方式
Request request = new Request.Builder()
.url("https://www.baidu.com/img/bd_logo1.png").get().build();
Call call = okHttpClient.newCall(request);
new Thread() {
@Override
public void run() {
try {
//调用execute,同步请求(必须放在子线程中,否则报错android.os.NetworkOnMainThreadException)
Response response = call.execute();
String result = response.body().toString();
Log.i(TAG, "testSyncRequest==>result="+result);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
异步请求方式代码:
//异步请求
private void testAsyncRequest() {
OkHttpClient okHttpClient = new OkHttpClient();
//指定url,get()代表get请求方式
Request request = new Request.Builder()
.url("https://www.baidu.com/img/bd_logo1.png").get().build();
Call call = okHttpClient.newCall(request);
//调用enqueue异步请求方式
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.i(TAG, "testAsyncRequest==>onFailure...err="+e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.i(TAG, "testAsyncRequest==>result="+response.body().toString());
}
});
}
二、OkHttp3的执行流程
1.创建OkHttpClient对象。OkHttpClient为网络请求执行的一个中心,它会管理连接池,缓存,SocketFactory,代理,各种超时时间,DNS,请求执行结果的分发等许多内容。
2.创建Request对象。Request用于描述一个HTTP请求,比如请求的方法是GET还是POST,请求的URL,请求的header,请求的body,请求的缓存策略等。
3.创建Call对象。Call是一次HTTP请求的Task,它会执行网络请求以获得响应。OkHttp中的网络请求执行Call既可以同步进行,也可以异步进行。调用call.execute()将直接执行网络请求,阻塞直到获得响应。而调用call.enqueue()传入回调,则会将Call放入一个异步执行队列,由ExecutorService在后台执行。
4.执行网络请求并获取响应。
上面的代码中涉及到几个常用的类:Request、Response和Call。下面就这几个类做详细的介绍。
Request
每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头,请求还可能包含一个特定内容类型的数据类的主体部分。
Response
响应是对请求的回复,包含状态码、HTTP头和主体部分。
Call
OkHttp使用Call抽象出一个满足请求的模型,尽管中间可能会有多个请求或响应。执行Call有两种方式,同步或异步。
那么首先来看一下OkHttpClient的源码实现:
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
public OkHttpClient() {
this(new Builder());
}
OkHttpClient(Builder builder) {
//分发器,后面会提到
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
}
然后使用okHttpClient发起异步请求。例如:
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
那接下来我们在看下Request。例如:
Request request = new Request.Builder().url("url").build();
该段代码主要实现初始化建造者模式和请求对象,并且用URL替换Web套接字URL。其源码如下:
public final class Request {
public static class Builder {
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
Builder(Request request) {
this.url = request.url;
this.method = request.method;
this.body = request.body;
this.tags = request.tags.isEmpty()
? Collections.<Class<?>, Object>emptyMap()
: new LinkedHashMap<>(request.tags);
this.headers = request.headers.newBuilder();
}
public Builder url(String url) {
if (url == null) throw new NullPointerException("url == null");
// Silently replace web socket URLs with HTTP URLs.
if (url.regionMatches(true, 0, "ws:", 0, 3)) {
url = "http:" + url.substring(3);
} else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
url = "https:" + url.substring(4);
}
return url(HttpUrl.get(url));
}
public Builder url(HttpUrl url) {
if (url == null) throw new NullPointerException("url == null");
this.url = url;
return this;
}
......
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
}
}
okHttpClient调用newCall()方法分析:
//OkHttpClient.java
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
看下RealCall源码:
//RealCall.java
//RealCall是接口Call的实现类
final class RealCall implements Call {
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
//创建RealCall的实例
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
//异步请求方法
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
//executed 用来标记请求是否已经执行了, 如果已经执行的过程中,再次请求抛异常
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//最后调用分发器dispatcher的enqueue方法,参数是异步AsyncCall对象
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
@Override
public RealCall clone() {
return RealCall.newRealCall(client, originalRequest, forWebSocket);
}
}
由上面的代码可以得出:
1.检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone方法进行克隆。
2.利用 client.dispatcher().enqueue(this) 来进行实际执行,dispatcher 是刚才看到的OkHttpClient.Builder 的成员之一。
AsyncCall是RealCall的一个内部类并且继承NamedRunnable。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
......
//实现execute方法
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
//调用getResponseWithInterceptorChain方法获取相应
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//调用失败
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//调用成功
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//请求完成
client.dispatcher().finished(this);
}
}
}
而NamedRunnable又实现了Runnable接口,来看代码:
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到NamedRunnable实现了Runnbale接口并且是个抽象类,其抽象方法是execute(),该方法是在run方法中被调用的,这也就意味着NamedRunnable是一个任务,并且其子类应该实现execute方法。上面已经看到了AsyncCall的实现方法execute(),首先是调用getResponseWithInterceptorChain()方法获取响应,然后获取成功后,就调用回调的onReponse方法,如果失败,就调用回调的onFailure方法,并调用Dispatcher的finished方法。
Dispatcher线程池介绍
那还看一下Dispatcher类的相关代码:
public final class Dispatcher {
/** 最大并发请求数为64 */
private int maxRequests = 64;
/** 每个主机最大请求数为5 */
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** 线程池 */
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** 准备执行的请求队列 */
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在执行的异步请求队列,包含已经取消但未执行完的请求 */
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在执行的同步请求队列,包含已经取消单未执行完的请求 */
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
//构造线程池
if (executorService == null) {
executorService = new ThreadPoolExecutor(
0, //corePoolSize:最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁
Integer.MAX_VALUE,//maximumPoolSize:最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
60, //当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
TimeUnit.SECONDS,//单位秒
new SynchronousQueue<Runnable>(), //工作队列,先进先出
Util.threadFactory("OkHttp Dispatcher", false));//单个线程的工厂
}
return executorService;
}
}
可以看出,在Okhttp中,构建了一个核心为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做”OkHttp Dispatcher”的线程工厂。也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。
关于参数SynchronousQueue:
我们知道SynchronousQueue这个队列是压根没有容器的,然后就找空闲线程啊,一找发现,没有空闲线程,那就重开一个线程来跑这个runnable。
这么写有一个好处,就是假如我们的某个runnable1是一个需要很久才能返回结果,说极端点就是直接一个死循环,永远执行不完,这时候呢下一个runnable2进来了,发现空闲线程没有,核心线程数为0,队列也是一个没容器的玩意儿,那怎么办?就直接new Thread出一个线程自己跑自己的。我们反过来说,如果说我们的队列是有容器的,假如队列的容量为1,runnable1一开始在队列中,然后被线程执行,换句话说,现在队列为空,但是线程已经在跑runnable1的死循环了,你再来一个runnable2也是只能塞到队列中,默默地看runnable1在线程中疯狂死循环,永远也等不到线程空闲的时候。唯一能拯救runnable1的只有再来一个runnable3,当runnable3进入线程池后,发现runnable1在疯狂死循环,runnable2就像小鸟一样被强制囚禁在笼子里,观看runnable1的疯狂死循环,runnable3既进不了笼子,也没有核心线程给他跑,于是线程池就给它(runnable3)新建了一个线程3,等runnable3跑完了以后,线程3就处于空闲,那线程池能让这个线程3闲着吗?当然不行,那线程池就去找啊,找啊,发现runnable2还在队列中没有被执行,于是就吧runnable2交给了线程3来执行,这个时候runnable2才终于被执行了。
接着看下执行方法enqueue():
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果正在执行的队列已满,跳出循环
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//如果同一主机请求数到达最大值,遍历下一个
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
//加入执行队列
executableCalls.add(asyncCall);
//加入正在执行的队列中
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//遍历,执行
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//调用executeOn方法
asyncCall.executeOn(executorService());
}
return isRunning;
}
看下AsyncCall的executeOn方法:
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//调用线程池执行,实际上就是执行NamedRunnable的run方法
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
上面已经描述AsyncCall的继承关系,就是调用NamedRunnable的run方法
->class AsyncCall extends NamedRunnable
->class NamedRunnable implements Runnable
看下代码:
//抽象类NamedRunnable
public abstract class NamedRunnable implements Runnable {
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//调用抽象方法execute
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
//子类AsyncCall
final class AsyncCall extends NamedRunnable {
//调用execute
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
//此方法是各种拦截器
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
从上述源码分析,如果当前还能执行一个并发请求,则加入 runningAsyncCalls ,立即执行,否则加入 readyAsyncCalls 队列。由此,可以得出Dispatcher的以下作用:
1.调度线程池Disptcher实现了高并发,低阻塞的实现;
2.采用Deque作为缓存,先进先出的顺序执行;
3.任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能。
getResponseWithInterceptorChain方法是okhttp3的各种拦截器,后面文章分析。
网友评论