android原生网络连接使用,HttpURLConnection。
retrofit底层使用Okhttp3,OKhttp3内部采用socket连接,socket可以保持连接,复用连接减少建立链接时三次握手的时间消耗。
okhttp最大链接数16,有三个队列来保存请求。
RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
Dispatcher
/** Ready async calls in the order they'll be run.
发起的请求都会被先丢到这个里面排队。
*/
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet.
运行的异步请求。包含取消还没结束的请求。
*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet.
运行的同步请求。包含取消还没结束的请求。
*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
ArrayDeque 默认构造的时候容量是16.
**ArrayDeque **
/**
* Constructs an empty array deque with an initial capacity
* sufficient to hold 16 elements.
*/
public ArrayDeque() {
elements = new Object[16];
}
Dispatcher.java
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. 默认64
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
如果runningAsyncCalls 正在执行的线程数大于maxRequests的数量,就直接跳出循环。break
如果保持的主机域名数大于maxRequestsPerHost,就先执行后面的,跳过当前的。 countinue。
最后加入符合条件的集合中executableCalls。遍历执行。executeOn(executorService())最后就到了RealCall#executeOn执行自己的execute方法。通过getResponseWithInterceptorChain责任链的方式,一级一级拼接请求以及回调获得response。
默认配置添加的责任链。
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
RetryAndFollowUpInterceptor
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Transmitter transmitter = realChain.transmitter();
int followUpCount = 0;
Response priorResponse = null;
while (true) {
transmitter.prepareToConnect(request);//如果与上次的主机地址端口等一致,直接返回啦。
if (transmitter.isCanceled()) {
throw new IOException("Canceled");
}
Response response;
boolean success = false;
try {
response = realChain.proceed(request, transmitter, null);//通过责任链管理员跳到下一个处理链上
success = true;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), transmitter, false, request)) {
throw e.getFirstConnectException();
}
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
} finally {
// The network call threw an exception. Release any resources.
if (!success) {
transmitter.exchangeDoneDueToException();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
Request followUp = followUpRequest(response, route);
if (followUp == null) {
if (exchange != null && exchange.isDuplex()) {
transmitter.timeoutEarlyExit();
}
return response;
}
RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot()) {
return response;
}
closeQuietly(response.body());
if (transmitter.hasExchange()) {
exchange.detachWithViolence();
}
if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
request = followUp;
priorResponse = response;
}
}
BridgeInterceptor一顿乱加头部信息
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//以上疯狂加入头部信息
Response networkResponse = chain.proceed(requestBuilder.build());//跳转下一个责任链
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
就这么一直下一个下一个,来到了ConnectInterceptor
ConnectInterceptor
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
代码特别少,transmitter.newExchange-->Transmitter#newExchange-->ExchangeFinder#find--->ExchangeFinder#findHealthyConnection-->ExchangeFinder#findConnection-->
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
如果找到了直接返回,没招到就去connect一个。
RealConnection#connect --->connectSocket看到
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
一连串眼花缭乱的操作,来到最后一个责任链 CallServerInterceptor。
Exchange exchange = realChain.exchange();
又是一顿解析什么的。通过exchange来做发送。拼接返回的response。
网友评论