介绍
一个现代的Http请求客户端,可以在java或者android使用,有以下特点
- 支持HTTP2
- 连接池,实现Http1.1长连接和http2.0多路复用
- 拦截器,内部预置拦截器和自定义拦截器支持,可以往HTTP请求时插入逻辑和职责
收获
- 拦截器的设计很精妙,责任链模式,单一职责思想,链式调用。可降低代码的工程复杂度,易扩展,易维护
- 分层和模块化是分解项目的重要手段,复杂庞大的功能可以通过分层和模块化一步步拆解,工程上更容易实现和稳定
- 各个层次拦截器的阅读,可以了解okhttp是如何一步步实现http协议,最底层的CallServerInterceptor是最终的HTTP包的构建,解析,读取,写入。
sample
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
调用流程
- 构建OkHttpClient
- 构建Request
- OkHttpClient#newCall(Request)
- call#execute或者call#enqueue(callback)
- 解析Response
接口分析
构建OKHttpClient
一如既往,提供一个外观类OKHttpClient封装了所有的配置,这个类毫无意外也是通过Builder构建。
Builder
- timeout参数配置(call,connect,read,write)
- proxy配置http代理
- cookieJar
- cache
- dns
- socketFactory
- sslSocketFactory https相关,配置CA
- hostnameVerifier
- connectionPool
- dispatcher
- addInterceptor
- addNetworkInterceptor
- eventListener 用于监听网络请求的时间
- build
构建Request
也提供Builder
- url
- header(String name,String value)
- cacheControl
- get,post,delete,put,patch
- method(String method,RequestBody)设定http请求方法
- tag
对比Retrofit就发现接口比较原始,基本上更接近Http协议 - Url
- http method
- header
- body
Call
public interface Call extends Cloneable {
Response execute() throws IOException;
void enqueue(Callback responseCallback);
void cancel();
Request request();
interface Factory {
Call newCall(Request request);
}
}
提供同步和异步方法,注意OKHttp enqueue后的callback返回并不是UI线程,Retrofit帮我们转接了。
框架设计
okhttp架构.png这是原文,这个图大致按照调用栈大致描绘了层次关系。
- 接口层
- 协议层
- 连接层 连接池,支持长连接和多路复用
- cache层
- I/O层 高效的IO操作,依赖okio
- 拦截器 贯穿上下,非常重要
拦截器
拦截器是OKHttp的一大特性,它是典型的责任链模式,链式递归调用
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
Call call();
}
分为application和network拦截器,主要处理request和response
一个interceptor通常步骤
- 处理Request
- 调用Chain#proceed
- 处理Response并返回
我们知道OkHttp通过newCall,返回的其实是RealCall,然后我们看RealCall#execute方法
public Response execute() throws IOException {
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
return result;
}finally {
client.dispatcher().finished(this);
}
}
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
getResponseWithInterceptorChain干了4件事
- 添加用户自定义的application interceptor
- 添加内置拦截器
- 添加用户自定义的network interceptor
- 通过RealInterceptorChain开始链式递归调用
public final class RealInterceptorChain implements Interceptor.Chain {
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
return response;
}
}
这个RealInterceptorChain#proceed里又构建了RealInterceptorChain,调用了拦截器链表的下一个,每个拦截器的intercept方法需要调用的chain都是这个RealInterceptorChain,只不过新的实例,新的参数。Interceptor负责调chain#proceed触发下一个拦截器
拦截器.png
内置拦截器
- retryAndFollowUpInterceptor 超时重试和重定向
- BridgeInterceptor 一些header字段,content-length,content-encode做透明gzip,cookie,keep-alive等
- CacheInterceptor
- ConnectInterceptor
- CallServerInterceptor 真正的网络请求
自定义拦截器
- application 不考虑重传和重定向,不考虑cache,永远调用一次
- network 在connect和callserver之间,命中cache会被短路
总结拦截器我们发现,整个流程一层层往下贯穿,再一层层往上,跟网络协议栈的思路是一样的。这里其实也可以用装饰者来实现
interface ITask {
Response call(Requst requst);
}
class TaskImpl implements ITask{
private ITask nextTask;
public TaskImpl(ITask task){
nextTask = task;
}
public Response call(Requst requst) {
// 在此处可以处理request
if(nextTask != null){
response = nextTask.call(requst);
}
// 在此处可以处理response
return response;
}
}
class main(){
ITask a = new TaskImpl();
ITask b = new TaskImpl(a);
ITask c = new TaskImpl(b);
c.call(request);
}
任务调度
通常我们会调用call#enqueu(callback)异步方法等待结果返回,OKHttp内部维护线程池用来执行请求,具体实现类是Dispatcher
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
}
内部维护了3个任务队列来存储请求,一个线程池来执行任务
enqueue
- 先把任务插入readyQueue
- 遍历readyQueue,判断是否超过总体最大值和单host最大值
- 遍历所有可运行的请求,调用AsyncCall#executeOn(executorService)
- 这个AsyncCall最终也是调用的getResponseWithInterceptorChain触发拦截器,获取结果,然后直接在子线程回调结果
缓存
CacheInterceptor来拦截缓存,使用DiskLruCache来实现缓存,CacheStrategy做缓存策略
public final class CacheInterceptor implements Interceptor {
public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
}
}
- 通过request获取cache结果
- 通过CacheStrategy判断是否需要请求网络,不需要直接短路返回,不继续往下走拦截器
- 继续chain.proceed,请求网络,获取response
- 更新缓存
- 返回response
CacheStrategy
public final class CacheStrategy {
/** The request to send on the network, or null if this call doesn't use the network. */
public final @Nullable Request networkRequest;
/** The cached response to return or validate; or null if this call doesn't use a cache. */
public final @Nullable Response cacheResponse;
public static class Factory {
final long nowMillis;
final Request request;
final Response cacheResponse;
private Date servedDate;
private Date lastModified;
private Date expires;
private String etag;
}
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
// We're forbidden from using the network and the cache is insufficient.
return new CacheStrategy(null, null);
}
return candidate;
}
private CacheStrategy getCandidate() {
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
return new CacheStrategy(request, null); // No condition! Make a regular request.
}
。。。
}
}
- 通过Factory构建CacheStrategy
- 两个公有final变量,networkRequest标识是否需要请求网络,CacheResponse组装了缓存的结果
- 工厂循环遍历cache response的header,主要是缓存刷新的两组字段,expires和last-modifiled,etag
- CacheStrategy基本根据HTTP的cache协议
连接池
性能提升的关键,为了实现http1.1的长连接和http2的多路复用
- 长连接,一个请求结束后,不会立即关闭TCP socket,而是等待下一个请求,直到超时。规避TCP的拥塞控制的慢启动,可以显著提升响应速度
- 多路复用,二进制帧,header压缩。一个tcp socket支持多个http请求并行,大大增加并行效率
地址
- url
- Address 包含域名,port,https setting,protocol
- Route 包含ip,proxy。同一个Address可能有多个Route,因为DNS返回多个ip
流程
public interface Connection {
Route route();
//TCP连接
Socket socket();
//TLS
Handshake handshake();
//Http协议
Protocol protocol();
}
- 通过url创建Address
- 从ConnectionPool获取Connection
- 如果没获取到,则向DNS查询IP,得到Route
- 如果是新的route,发起tcp连接或者tls握手,获得Connection
- 通过Connection发起请求,流转到network拦截器
ConnectInterceptor通过StreamAllocation#newStream获得Connection
CallServerInterceptor
真正的http请求和解析都在这个拦截器里面,依赖okio这个库。
- exchange 管理类
- ExchangeCode,接口类,定义打包request,解析response的行为
/** Encodes HTTP requests and decodes HTTP responses. */
interface ExchangeCodec {
/** Returns an output stream where the request body can be streamed. */
fun createRequestBody(request: Request, contentLength: Long): Sink
/** This should update the HTTP engine's sentRequestMillis field. */
fun writeRequestHeaders(request: Request)
/** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
fun finishRequest()
fun readResponseHeaders(expectContinue: Boolean): Response.Builder?
fun openResponseBodySource(response: Response): Source
}
okhttp迁移很多文件为Kotlin,我们至少要大致能看懂Kotlin代码
- Http1ExchangeCodec HTTP/1协议的实现类
- Http2ExchangeCodec HTTP/2协议的实现类。二进制Header和Body。多路复用。
public final class Http1ExchangeCodec implements ExchangeCodec {
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private final OkHttpClient client;
/** The connection that carries this stream. */
private final RealConnection realConnection;
//socket对应的输入流
private final BufferedSource source;
//socket对应的输出流
private final BufferedSink sink;
/** HTTP协议标准,写入request到流,requestline和header */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
return responseBuilder;
}
/** 按照http标准读取header,一行一行的读 */
private Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// parse the result headers until the first blank line
for (String line; (line = readHeaderLine()).length() != 0; ) {
addHeaderLenient(headers, line);
}
return headers.build();
}
@Override public Source openResponseBodySource(Response response) {
if (!HttpHeaders.hasBody(response)) {
return newFixedLengthSource(0);
}
//分段读取
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
return newChunkedSource(response.request().url());
}
// size已知
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
return newFixedLengthSource(contentLength);
}
return newUnknownLengthSource();
}
}
[如何调试](https://blog.csdn.net/alvinhuai/article/details/81288270,用Android Studio跑OkHttp的sampleClient模块,加一些配置,可在本机直接跑,也可以用AS的Debugger断点调试。像Retrofit这种纯java的库,都可以在本机调试,效率高。
)
网友评论