美文网首页
聊聊httpclient的staleConnectionChec

聊聊httpclient的staleConnectionChec

作者: go4it | 来源:发表于2023-11-23 08:52 被阅读0次

    本文主要研究一下httpclient的staleConnectionCheckEnabled

    staleConnectionCheckEnabled

    org/apache/http/client/config/RequestConfig.java

    public class RequestConfig implements Cloneable {
    
        public static final RequestConfig DEFAULT = new Builder().build();
    
        private final boolean expectContinueEnabled;
        private final HttpHost proxy;
        private final InetAddress localAddress;
        private final boolean staleConnectionCheckEnabled;
    
        //......
    
        /**
         * Determines whether stale connection check is to be used. The stale
         * connection check can cause up to 30 millisecond overhead per request and
         * should be used only when appropriate. For performance critical
         * operations this check should be disabled.
         * <p>
         * Default: {@code false} since 4.4
         * </p>
         *
         * @deprecated (4.4) Use {@link
         *   org.apache.http.impl.conn.PoolingHttpClientConnectionManager#getValidateAfterInactivity()}
         */
        @Deprecated
        public boolean isStaleConnectionCheckEnabled() {
            return staleConnectionCheckEnabled;
        }   
    
    
        public static class Builder {
    
            private boolean expectContinueEnabled;
            private HttpHost proxy;
            private InetAddress localAddress;
            private boolean staleConnectionCheckEnabled;
    
            //......
    
            Builder() {
                super();
                this.staleConnectionCheckEnabled = false;
                this.redirectsEnabled = true;
                this.maxRedirects = 50;
                this.relativeRedirectsAllowed = true;
                this.authenticationEnabled = true;
                this.connectionRequestTimeout = -1;
                this.connectTimeout = -1;
                this.socketTimeout = -1;
                this.contentCompressionEnabled = true;
                this.normalizeUri = true;
            }        
    
            /**
             * @deprecated (4.4) Use {@link
             *   org.apache.http.impl.conn.PoolingHttpClientConnectionManager#setValidateAfterInactivity(int)}
             */
            @Deprecated
            public Builder setStaleConnectionCheckEnabled(final boolean staleConnectionCheckEnabled) {
                this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
                return this;
            }  
    
            //......
        }       
    }    
    

    RequestConfig定义了staleConnectionCheckEnabled属性,在4.4版本废弃了,默认为false,替换设置是org.apache.http.impl.conn.PoolingHttpClientConnectionManager.setValidateAfterInactivity(int);Builder方法也提供了setStaleConnectionCheckEnabled方法

    MainClientExec

    org/apache/http/impl/execchain/MainClientExec.java

        public CloseableHttpResponse execute(
                final HttpRoute route,
                final HttpRequestWrapper request,
                final HttpClientContext context,
                final HttpExecutionAware execAware) throws IOException, HttpException {
            Args.notNull(route, "HTTP route");
            Args.notNull(request, "HTTP request");
            Args.notNull(context, "HTTP context");
    
            AuthState targetAuthState = context.getTargetAuthState();
            if (targetAuthState == null) {
                targetAuthState = new AuthState();
                context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
            }
            AuthState proxyAuthState = context.getProxyAuthState();
            if (proxyAuthState == null) {
                proxyAuthState = new AuthState();
                context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
            }
    
            if (request instanceof HttpEntityEnclosingRequest) {
                RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
            }
    
            Object userToken = context.getUserToken();
    
            final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
            if (execAware != null) {
                if (execAware.isAborted()) {
                    connRequest.cancel();
                    throw new RequestAbortedException("Request aborted");
                }
                execAware.setCancellable(connRequest);
            }
    
            final RequestConfig config = context.getRequestConfig();
    
            final HttpClientConnection managedConn;
            try {
                final int timeout = config.getConnectionRequestTimeout();
                managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
            } catch(final InterruptedException interrupted) {
                Thread.currentThread().interrupt();
                throw new RequestAbortedException("Request aborted", interrupted);
            } catch(final ExecutionException ex) {
                Throwable cause = ex.getCause();
                if (cause == null) {
                    cause = ex;
                }
                throw new RequestAbortedException("Request execution failed", cause);
            }
    
            context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
    
            if (config.isStaleConnectionCheckEnabled()) {
                // validate connection
                if (managedConn.isOpen()) {
                    this.log.debug("Stale connection check");
                    if (managedConn.isStale()) {
                        this.log.debug("Stale connection detected");
                        managedConn.close();
                    }
                }
            }
    
            final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
            try {
                if (execAware != null) {
                    execAware.setCancellable(connHolder);
                }
    
                HttpResponse response;
                for (int execCount = 1;; execCount++) {
    
                    if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                        throw new NonRepeatableRequestException("Cannot retry request " +
                                "with a non-repeatable request entity.");
                    }
    
                    if (execAware != null && execAware.isAborted()) {
                        throw new RequestAbortedException("Request aborted");
                    }
    
                    if (!managedConn.isOpen()) {
                        this.log.debug("Opening connection " + route);
                        try {
                            establishRoute(proxyAuthState, managedConn, route, request, context);
                        } catch (final TunnelRefusedException ex) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug(ex.getMessage());
                            }
                            response = ex.getResponse();
                            break;
                        }
                    }
    
                    context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
                    response = requestExecutor.execute(request, managedConn, context);
    
                    //......
                }
            }        
    
                    //......
            //......
        }    
    

    MainClientExec的execute先通过connManager.requestConnection获取ConnectionRequest,然后通过connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS)获取managedConn,之后判断requestConfig的isStaleConnectionCheckEnabled,为true的话,会执行连接的校验,先判断是否open,再判断是否stale,为stale的话则执行close;在managedConn关闭的时候,会通过establishRoute再进行连接

    establishRoute

    org/apache/http/impl/execchain/MainClientExec.java

        /**
         * Establishes the target route.
         */
        void establishRoute(
                final AuthState proxyAuthState,
                final HttpClientConnection managedConn,
                final HttpRoute route,
                final HttpRequest request,
                final HttpClientContext context) throws HttpException, IOException {
            final RequestConfig config = context.getRequestConfig();
            final int timeout = config.getConnectTimeout();
            final RouteTracker tracker = new RouteTracker(route);
            int step;
            do {
                final HttpRoute fact = tracker.toRoute();
                step = this.routeDirector.nextStep(route, fact);
    
                switch (step) {
    
                case HttpRouteDirector.CONNECT_TARGET:
                    this.connManager.connect(
                            managedConn,
                            route,
                            timeout > 0 ? timeout : 0,
                            context);
                    tracker.connectTarget(route.isSecure());
                    break;
                case HttpRouteDirector.CONNECT_PROXY:
                    this.connManager.connect(
                            managedConn,
                            route,
                            timeout > 0 ? timeout : 0,
                            context);
                    final HttpHost proxy  = route.getProxyHost();
                    tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
                    break;
                case HttpRouteDirector.TUNNEL_TARGET: {
                    final boolean secure = createTunnelToTarget(
                            proxyAuthState, managedConn, route, request, context);
                    this.log.debug("Tunnel to target created.");
                    tracker.tunnelTarget(secure);
                }   break;
    
                case HttpRouteDirector.TUNNEL_PROXY: {
                    // The most simple example for this case is a proxy chain
                    // of two proxies, where P1 must be tunnelled to P2.
                    // route: Source -> P1 -> P2 -> Target (3 hops)
                    // fact:  Source -> P1 -> Target       (2 hops)
                    final int hop = fact.getHopCount()-1; // the hop to establish
                    final boolean secure = createTunnelToProxy(route, hop, context);
                    this.log.debug("Tunnel to proxy created.");
                    tracker.tunnelProxy(route.getHopTarget(hop), secure);
                }   break;
    
                case HttpRouteDirector.LAYER_PROTOCOL:
                    this.connManager.upgrade(managedConn, route, context);
                    tracker.layerProtocol(route.isSecure());
                    break;
    
                case HttpRouteDirector.UNREACHABLE:
                    throw new HttpException("Unable to establish route: " +
                            "planned = " + route + "; current = " + fact);
                case HttpRouteDirector.COMPLETE:
                    this.connManager.routeComplete(managedConn, route, context);
                    break;
                default:
                    throw new IllegalStateException("Unknown step indicator "
                            + step + " from RouteDirector.");
                }
    
            } while (step > HttpRouteDirector.COMPLETE);
        }
    

    establishRoute方法在循环里头通过connManager.connect建立连接

    requestConnection

    org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java

        public ConnectionRequest requestConnection(
                final HttpRoute route,
                final Object state) {
            Args.notNull(route, "HTTP route");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection request: " + format(route, state) + formatStats(route));
            }
            final Future<CPoolEntry> future = this.pool.lease(route, state, null);
            return new ConnectionRequest() {
    
                @Override
                public boolean cancel() {
                    return future.cancel(true);
                }
    
                @Override
                public HttpClientConnection get(
                        final long timeout,
                        final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                    final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
                    if (conn.isOpen()) {
                        final HttpHost host;
                        if (route.getProxyHost() != null) {
                            host = route.getProxyHost();
                        } else {
                            host = route.getTargetHost();
                        }
                        final SocketConfig socketConfig = resolveSocketConfig(host);
                        conn.setSocketTimeout(socketConfig.getSoTimeout());
                    }
                    return conn;
                }
    
            };
    
        }
    

    PoolingHttpClientConnectionManager的requestConnection返回的ConnectionRequest的get方法是通过leaseConnection(future, timeout, timeUnit)来获取连接的,而leaseConnection依赖的是pool.lease(route, state, null)返回的future

    lease

    org/apache/http/pool/AbstractConnPool.java

        /**
         * {@inheritDoc}
         * <p>
         * Please note that this class does not maintain its own pool of execution
         * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
         * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
         * returned by this method in order for the lease operation to complete.
         */
        @Override
        public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
            Args.notNull(route, "Route");
            Asserts.check(!this.isShutDown, "Connection pool shut down");
    
            return new Future<E>() {
    
                private final AtomicBoolean cancelled = new AtomicBoolean(false);
                private final AtomicBoolean done = new AtomicBoolean(false);
                private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
    
                @Override
                public boolean cancel(final boolean mayInterruptIfRunning) {
                    if (done.compareAndSet(false, true)) {
                        cancelled.set(true);
                        lock.lock();
                        try {
                            condition.signalAll();
                        } finally {
                            lock.unlock();
                        }
                        if (callback != null) {
                            callback.cancelled();
                        }
                        return true;
                    }
                    return false;
                }
    
                @Override
                public boolean isCancelled() {
                    return cancelled.get();
                }
    
                @Override
                public boolean isDone() {
                    return done.get();
                }
    
                @Override
                public E get() throws InterruptedException, ExecutionException {
                    try {
                        return get(0L, TimeUnit.MILLISECONDS);
                    } catch (final TimeoutException ex) {
                        throw new ExecutionException(ex);
                    }
                }
    
                @Override
                public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                    for (;;) {
                        synchronized (this) {
                            try {
                                final E entry = entryRef.get();
                                if (entry != null) {
                                    return entry;
                                }
                                if (done.get()) {
                                    throw new ExecutionException(operationAborted());
                                }
                                final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                                if (validateAfterInactivity > 0)  {
                                    if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                                        if (!validate(leasedEntry)) {
                                            leasedEntry.close();
                                            release(leasedEntry, false);
                                            continue;
                                        }
                                    }
                                }
                                if (done.compareAndSet(false, true)) {
                                    entryRef.set(leasedEntry);
                                    done.set(true);
                                    onLease(leasedEntry);
                                    if (callback != null) {
                                        callback.completed(leasedEntry);
                                    }
                                    return leasedEntry;
                                } else {
                                    release(leasedEntry, true);
                                    throw new ExecutionException(operationAborted());
                                }
                            } catch (final IOException ex) {
                                if (done.compareAndSet(false, true)) {
                                    if (callback != null) {
                                        callback.failed(ex);
                                    }
                                }
                                throw new ExecutionException(ex);
                            }
                        }
                    }
                }
    
            };
        }
    

    lease返回的future的get(final long timeout, final TimeUnit timeUnit)方法在一个循环里头去获取连接,内部是通过getPoolEntryBlocking获取到leasedEntry,然后在validateAfterInactivity大于0的时候进行连接校验,在leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()的时候进行validate,校验不成功的话进行关闭掉leasedEntry,然后release掉leasedEntry,然后继续循环获取下一个连接

    小结

    httpclient的RequestConfig提供了staleConnectionCheckEnabled属性用于在请求获取到连接的时候进行连接检测,不过这个属性在4.4版本被废弃了,并默认设置为false,替换设置是org.apache.http.impl.conn.PoolingHttpClientConnectionManager.setValidateAfterInactivity(int),它是在ConnectionRequest的lease方法里头根据leasedEntry.getUpdated() + validateAfterInactivity判断是否需要校验连接,若需要且校验不通过则循环继续获取;而staleConnectionCheckEnabled则是在requestConnection之后根据RequestConfig的isStaleConnectionCheckEnabled来判断,然后进行连接校验,校验不通过则关闭managedConn,最后会在判断如果managedConn.isOpen()为false,则执行establishRoute,在循环里头通过connManager.connect来建立新连接。

    相关文章

      网友评论

          本文标题:聊聊httpclient的staleConnectionChec

          本文链接:https://www.haomeiwen.com/subject/fdowwdtx.html