美文网首页
聊聊httpclient的validateAfterInacti

聊聊httpclient的validateAfterInacti

作者: go4it | 来源:发表于2023-11-17 20:51 被阅读0次

    本文主要研究一下httpclient的validateAfterInactivity

    validateAfterInactivity

    org/apache/http/pool/AbstractConnPool.java

    @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
    public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                                                   implements ConnPool<T, E>, ConnPoolControl<T> {
    
        private final Lock lock;
        private final Condition condition;
        private final ConnFactory<T, C> connFactory;
        private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
        private final Set<E> leased;
        private final LinkedList<E> available;
        private final LinkedList<Future<E>> pending;
        private final Map<T, Integer> maxPerRoute;
    
        private volatile boolean isShutDown;
        private volatile int defaultMaxPerRoute;
        private volatile int maxTotal;
        private volatile int validateAfterInactivity;
    
        public AbstractConnPool(
                final ConnFactory<T, C> connFactory,
                final int defaultMaxPerRoute,
                final int maxTotal) {
            super();
            this.connFactory = Args.notNull(connFactory, "Connection factory");
            this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
            this.maxTotal = Args.positive(maxTotal, "Max total value");
            this.lock = new ReentrantLock();
            this.condition = this.lock.newCondition();
            this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
            this.leased = new HashSet<E>();
            this.available = new LinkedList<E>();
            this.pending = new LinkedList<Future<E>>();
            this.maxPerRoute = new HashMap<T, Integer>();
        }
    
        //......
    
        /**
         * @return the number of milliseconds
         * @since 4.4
         */
        public int getValidateAfterInactivity() {
            return this.validateAfterInactivity;
        }
    
        /**
         * @param ms the number of milliseconds
         * @since 4.4
         */
        public void setValidateAfterInactivity(final int ms) {
            this.validateAfterInactivity = ms;
        }    
    }   
    

    AbstractConnPool定义了validateAfterInactivity属性,与defaultMaxPerRoute、maxTotal不同,该属性没有在构造器参数中,而是提供了setter来设置

    PoolingHttpClientConnectionManager

    org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java

    @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
    public class PoolingHttpClientConnectionManager
        implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
    
        public PoolingHttpClientConnectionManager(
            final HttpClientConnectionOperator httpClientConnectionOperator,
            final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
            final long timeToLive, final TimeUnit timeUnit) {
            super();
            this.configData = new ConfigData();
            this.pool = new CPool(new InternalConnectionFactory(
                    this.configData, connFactory), 2, 20, timeToLive, timeUnit);
            this.pool.setValidateAfterInactivity(2000);
            this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
            this.isShutDown = new AtomicBoolean(false);
        }
    
        /**
         * Defines period of inactivity in milliseconds after which persistent connections must
         * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
         *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
         * to this method disables connection validation. This check helps detect connections
         * that have become stale (half-closed) while kept inactive in the pool.
         *
         * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
         *
         * @since 4.4
         */
        public void setValidateAfterInactivity(final int ms) {
            pool.setValidateAfterInactivity(ms);
        }
    
        //......
    
    }    
    

    PoolingHttpClientConnectionManager默认设置pool的validateAfterInactivity为2000ms,另外也提供了setValidateAfterInactivity方法

    lease

    org/apache/http/pool/AbstractConnPool.java

        /**
         * {@inheritDoc}
         * <p>
         * Please note that this class does not maintain its own pool of execution
         * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
         * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
         * returned by this method in order for the lease operation to complete.
         */
        @Override
        public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
            Args.notNull(route, "Route");
            Asserts.check(!this.isShutDown, "Connection pool shut down");
    
            return new Future<E>() {
    
                private final AtomicBoolean cancelled = new AtomicBoolean(false);
                private final AtomicBoolean done = new AtomicBoolean(false);
                private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
    
                @Override
                public boolean cancel(final boolean mayInterruptIfRunning) {
                    if (done.compareAndSet(false, true)) {
                        cancelled.set(true);
                        lock.lock();
                        try {
                            condition.signalAll();
                        } finally {
                            lock.unlock();
                        }
                        if (callback != null) {
                            callback.cancelled();
                        }
                        return true;
                    }
                    return false;
                }
    
                @Override
                public boolean isCancelled() {
                    return cancelled.get();
                }
    
                @Override
                public boolean isDone() {
                    return done.get();
                }
    
                @Override
                public E get() throws InterruptedException, ExecutionException {
                    try {
                        return get(0L, TimeUnit.MILLISECONDS);
                    } catch (final TimeoutException ex) {
                        throw new ExecutionException(ex);
                    }
                }
    
                @Override
                public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                    for (;;) {
                        synchronized (this) {
                            try {
                                final E entry = entryRef.get();
                                if (entry != null) {
                                    return entry;
                                }
                                if (done.get()) {
                                    throw new ExecutionException(operationAborted());
                                }
                                final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                                if (validateAfterInactivity > 0)  {
                                    if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                                        if (!validate(leasedEntry)) {
                                            leasedEntry.close();
                                            release(leasedEntry, false);
                                            continue;
                                        }
                                    }
                                }
                                if (done.compareAndSet(false, true)) {
                                    entryRef.set(leasedEntry);
                                    done.set(true);
                                    onLease(leasedEntry);
                                    if (callback != null) {
                                        callback.completed(leasedEntry);
                                    }
                                    return leasedEntry;
                                } else {
                                    release(leasedEntry, true);
                                    throw new ExecutionException(operationAborted());
                                }
                            } catch (final IOException ex) {
                                if (done.compareAndSet(false, true)) {
                                    if (callback != null) {
                                        callback.failed(ex);
                                    }
                                }
                                throw new ExecutionException(ex);
                            }
                        }
                    }
                }
    
            };
        }
    

    AbstractConnPool的lease方法返回一个future,其get方法通过getPoolEntryBlocking(route, state, timeout, timeUnit, this)获取leasedEntry,之后判断validateAfterInactivity是否大于0,大于0则判断leasedEntry.getUpdated()+validateAfterInactivity是否小于等于当前时间,是则执行validate方法,validate不通过则close该entry然后release,然后继续循环执行getPoolEntryBlocking

    validate

    org/apache/http/impl/conn/CPool.java

        protected boolean validate(final CPoolEntry entry) {
            return !entry.getConnection().isStale();
        }
    

    CPool的validate则是通过entry.getConnection().isStale()来判断

    isStale

    org/apache/http/impl/AbstractHttpClientConnection.java

        public boolean isStale() {
            if (!isOpen()) {
                return true;
            }
            if (isEof()) {
                return true;
            }
            try {
                this.inBuffer.isDataAvailable(1);
                return isEof();
            } catch (final SocketTimeoutException ex) {
                return false;
            } catch (final IOException ex) {
                return true;
            }
        }
    

    AbstractHttpClientConnection的isStale先判断是否open,再判断是否eof,最后执行inBuffer.isDataAvailable(1),出现SocketTimeoutException返回false,出现IOException返回true,若没有异常返回isEof

    小结

    apache的httpclient的AbstractConnPool提供了validateAfterInactivity属性,默认是2000ms,它的作用是在连接池获取连接的时候进行判断,如果该entry的最后更新时间+validateAfterInactivity小于等于当前时间,则执行validate方法,validate不通过则继续循环获取连接。而validate方法则是通过connection的isStale来判断的。该属性有助于检测连接池中空闲连接的stale(half-closed)状态,避免真正使用的时候报错。

    相关文章

      网友评论

          本文标题:聊聊httpclient的validateAfterInacti

          本文链接:https://www.haomeiwen.com/subject/gzluwdtx.html