序
本文主要研究一下httpclient的validateAfterInactivity
validateAfterInactivity
org/apache/http/pool/AbstractConnPool.java
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
private final Lock lock;
private final Condition condition;
private final ConnFactory<T, C> connFactory;
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
private final Set<E> leased;
private final LinkedList<E> available;
private final LinkedList<Future<E>> pending;
private final Map<T, Integer> maxPerRoute;
private volatile boolean isShutDown;
private volatile int defaultMaxPerRoute;
private volatile int maxTotal;
private volatile int validateAfterInactivity;
public AbstractConnPool(
final ConnFactory<T, C> connFactory,
final int defaultMaxPerRoute,
final int maxTotal) {
super();
this.connFactory = Args.notNull(connFactory, "Connection factory");
this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
this.maxTotal = Args.positive(maxTotal, "Max total value");
this.lock = new ReentrantLock();
this.condition = this.lock.newCondition();
this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
this.pending = new LinkedList<Future<E>>();
this.maxPerRoute = new HashMap<T, Integer>();
}
//......
/**
* @return the number of milliseconds
* @since 4.4
*/
public int getValidateAfterInactivity() {
return this.validateAfterInactivity;
}
/**
* @param ms the number of milliseconds
* @since 4.4
*/
public void setValidateAfterInactivity(final int ms) {
this.validateAfterInactivity = ms;
}
}
AbstractConnPool定义了validateAfterInactivity属性,与defaultMaxPerRoute、maxTotal不同,该属性没有在构造器参数中,而是提供了setter来设置
PoolingHttpClientConnectionManager
org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class PoolingHttpClientConnectionManager
implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit timeUnit) {
super();
this.configData = new ConfigData();
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, timeUnit);
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}
/**
* Defines period of inactivity in milliseconds after which persistent connections must
* be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
* long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
* to this method disables connection validation. This check helps detect connections
* that have become stale (half-closed) while kept inactive in the pool.
*
* @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
*
* @since 4.4
*/
public void setValidateAfterInactivity(final int ms) {
pool.setValidateAfterInactivity(ms);
}
//......
}
PoolingHttpClientConnectionManager默认设置pool的validateAfterInactivity为2000ms,另外也提供了setValidateAfterInactivity方法
lease
org/apache/http/pool/AbstractConnPool.java
/**
* {@inheritDoc}
* <p>
* Please note that this class does not maintain its own pool of execution
* {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
* or {@link Future#get(long, TimeUnit)} method on the {@link Future}
* returned by this method in order for the lease operation to complete.
*/
@Override
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (done.compareAndSet(false, true)) {
cancelled.set(true);
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.cancelled();
}
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public E get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (final TimeoutException ex) {
throw new ExecutionException(ex);
}
}
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
};
}
AbstractConnPool的lease方法返回一个future,其get方法通过getPoolEntryBlocking(route, state, timeout, timeUnit, this)获取leasedEntry,之后判断validateAfterInactivity是否大于0,大于0则判断leasedEntry.getUpdated()+validateAfterInactivity是否小于等于当前时间,是则执行validate方法,validate不通过则close该entry然后release,然后继续循环执行getPoolEntryBlocking
validate
org/apache/http/impl/conn/CPool.java
protected boolean validate(final CPoolEntry entry) {
return !entry.getConnection().isStale();
}
CPool的validate则是通过entry.getConnection().isStale()来判断
isStale
org/apache/http/impl/AbstractHttpClientConnection.java
public boolean isStale() {
if (!isOpen()) {
return true;
}
if (isEof()) {
return true;
}
try {
this.inBuffer.isDataAvailable(1);
return isEof();
} catch (final SocketTimeoutException ex) {
return false;
} catch (final IOException ex) {
return true;
}
}
AbstractHttpClientConnection的isStale先判断是否open,再判断是否eof,最后执行inBuffer.isDataAvailable(1),出现SocketTimeoutException返回false,出现IOException返回true,若没有异常返回isEof
小结
apache的httpclient的AbstractConnPool提供了validateAfterInactivity属性,默认是2000ms,它的作用是在连接池获取连接的时候进行判断,如果该entry的最后更新时间+validateAfterInactivity小于等于当前时间,则执行validate方法,validate不通过则继续循环获取连接。而validate方法则是通过connection的isStale来判断的。该属性有助于检测连接池中空闲连接的stale(half-closed)状态,避免真正使用的时候报错。
网友评论