一、ConnectInterceptor拦截器
okhttp连接池与链路ConnectInterceptor拦截器,intercept()方法,创建Network链路。
每一次RealCall请求类,分配一个StreamAllocation对象,负责RealConnection管理,连接池操作,HttpCodec,网络链路的分配、取消、释放,RealConnection可复用。
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();//从Chain中获取Request
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//处理返回Response
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
RetryAndFollowUpInterceptor拦截器(位于拦截器列表第二项目),intercept()方法,创建StreamAllocation对象,每次递归将引用传给下一个新建Chain。
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
到达ConnectInterceptor拦截器时,从Chain中取出,StreamAllocation类的newStream()方法,创建/查找链路。
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
...
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout,
readTimeout,writeTimeout, connectionRetryEnabled,
doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
}
}
findConnection()方法,通过策略寻找一个真正的RealConnection连接,该连接和Server建立个Socket连接,RealConnection类初始化HttpCodec。
public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation)
throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
若采用Protocol.HTTP_2协议,即Http2Connection存在,创建Http2Codec,否则,创建Http1Codec,封装BufferedSource和BufferedSink。
这些新初始化的对象StreamAllocation、HttpCodec、RealConnection,一起传递给Chain节点,下一个拦截器使用。
二、分配策略
StreamAllocation类findConnection()方法。
private RealConnection findConnection(int connectTimeout, int readTimeout,
int writeTimeout,boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
//先使用StreamAllocation内部保存的RealConnection
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
//连接池中寻找
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
...
RealConnection result;
synchronized (connectionPool) {
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
//创建RealConnection
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
//socket连接
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
//入池
Internal.instance.put(connectionPool, result);
...
}
return result;
}
1,StreamAllocation内部RealConnection连接。
2,从ConnectionPool连接池查找。
3,创建RealConnection对象,保存在StreamAllocation内部,入池。
OkHttpClient类静态代码段初始化一个Internal对象。
static {
Internal.instance = new Internal() {
@Override public
RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
@Override
public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
....//其他方法.
};
}
ConnectionPool类get()方法,查询连接池中的RealConnection。
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
遍历连接池每项RealConnection,当Address相同且StreamAllocationReference数量小于限制,说明是可用连接。
RealConnection内部引用StreamAllocation类型弱引用列表,allocationLimit变量限制StreamAllocation弱引用数量。
public void acquire(RealConnection connection) {
..
this.connection = connection;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
将可用RealConnection设置成StreamAllocation内部connection,同时将StreamAllocation加入RealConnection内部弱引用列表。
达到限制时,不能再被StreamAllocation使用,新建链路,新RealConnection同样赋值到StreamAllocation内部,将该StreamAllocation加入弱引用列表,最后put连接池。
RealConnection代表一个真正的链路,封装BufferedSource、BufferedSink、路由、socket、协议、Handshake握手信息。
若是新链接,进行Socket连接,connect()方法,connectSocket()方法建立连接。
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//创建Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
创建一个Socket,Platform#connectSocket方法,连接。
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException {
socket.connect(address, connectTimeout);
}
成功,和Server建立一个Socket连接,失败,抛出异常。
三、连接清理
ConnectionPool连接池管理所有Socket连接,当有新的请求时,从池中分配一个链路。
ArrayDeque双向队列,线性连续空间,双向开口,在头尾两端插入删除高效,同时具有队列和栈性质,缓存常用。
默认支持5个并发keepalive,链路生命为5分钟,即链路数据传输完成,可保持5分钟的存活时间。
自动清除线程,将查找超过5分钟的链路,关闭socket。
ConnectionPool的get()/put()操作方法。
void put(RealConnection connection) {
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);//执行清理任务
}
connections.add(connection);//加入连接池队列
}
线程池执行cleanupRunnable清理任务,设置cleanupRunning标志位。实质上是一个阻塞的清理任务。若while一直运行,下次put()将不会触发。
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
waitNanos等待下次清理的间隔时间,-1表示不需要再次清理,退出循环,0表示立即再次清理,wait()方法等待,释放锁与时间片。
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// 是否在使用
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//找空闲最长的
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//决定是否清理
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//删除连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
return keepAliveDurationNs;
} else {
cleanupRunning = false;
return -1;
}
}
//关闭Socket
closeQuietly(longestIdleConnection.socket());
return 0;
}
遍历连接,pruneAndGetAllocationCount()方法查看连接是否在使用。inUseConnectionCount,正在使用数量,自增,idleConnectionCount,空闲数量,未使用自增。
idleDurationNs,空闲连接,计算已空闲时间,查找到空闲时间最长的。
清理算法
1,longestIdleDurationNs(最长空闲时间),大于5min或空闲数量超过5个(默认),将对应longestIdleConnection连接删除,返回0,下一次立即清理。
2,不足5min,空闲连接数量<5,返回timeout(距离5min还有多久),线程阻塞timeout后,再次清理。
3,空闲连接=0,且正使用连接>0,返回5min,最长等待时间。
4,空闲和正使用连接都0,返回-1,不清理,线程结束。设置cleanupRunning标志位,下次put()方法重新唤起cleanupRunnable任务。
清理过程
从连接池队列ArrayDeque中删除该项连接。
closeQuietly()方法关闭Socket。
using连接判断
遍历RealConnection内部Reference<StreamAllocation>列表
reference.get()不空,说明有StreamAllocation正引用RealConnection。
reference.get()不存在,说明StreamAllocation已被Jvm清理,同时,从references列表中删除该项reference。
若references列表是空,说明references弱引用都是空,没有StreamAllocation使用该连接。
四、总结
ConnectInterceptor拦截器负责Network连接。
每一个RealCall请求对应一个StreamAllocation。
真正的连接RealConnection复用。
连接池采用ArrayDeque双向队列数据结构。
连接清理任务。
任重而道远
网友评论