美文网首页
聊聊AsyncHttpClient的RequestFilter

聊聊AsyncHttpClient的RequestFilter

作者: go4it | 来源:发表于2023-12-11 20:24 被阅读0次

    本文主要研究一下AsyncHttpClient的RequestFilter

    RequestFilter

    org/asynchttpclient/filter/RequestFilter.java

    /**
     * A Filter interface that gets invoked before making an actual request.
     */
    public interface RequestFilter {
    
      /**
       * An {@link org.asynchttpclient.AsyncHttpClient} will invoke {@link RequestFilter#filter} and will use the
       * returned {@link FilterContext#getRequest()} and {@link FilterContext#getAsyncHandler()} to continue the request
       * processing.
       *
       * @param ctx a {@link FilterContext}
       * @param <T> the handler result type
       * @return {@link FilterContext}. The {@link FilterContext} instance may not the same as the original one.
       * @throws FilterException to interrupt the filter processing.
       */
      <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException;
    }
    

    RequestFilter定义了filter方法

    ThrottleRequestFilter

    org/asynchttpclient/filter/ThrottleRequestFilter.java

    /**
     * A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached,
     * waiting for the response to arrives before executing the next request.
     */
    public class ThrottleRequestFilter implements RequestFilter {
      private static final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
      private final Semaphore available;
      private final int maxWait;
    
      public ThrottleRequestFilter(int maxConnections) {
        this(maxConnections, Integer.MAX_VALUE);
      }
    
      public ThrottleRequestFilter(int maxConnections, int maxWait) {
        this(maxConnections, maxWait, false);
      }
    
      public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) {
        this.maxWait = maxWait;
        available = new Semaphore(maxConnections, fair);
      }
    
      /**
       * {@inheritDoc}
       */
      @Override
      public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
        try {
          if (logger.isDebugEnabled()) {
            logger.debug("Current Throttling Status {}", available.availablePermits());
          }
          if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
            throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s",
                    ctx.getRequest(), ctx.getAsyncHandler()));
          }
        } catch (InterruptedException e) {
          throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s",
                  ctx.getRequest(), ctx.getAsyncHandler()));
        }
    
        return new FilterContext.FilterContextBuilder<>(ctx)
                .asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available))
                .build();
      }
    }
    

    ThrottleRequestFilter实现了RequestFilter接口,它使用Semaphore来对request进行限流,限流不通过抛出FilterException,若通过则通过ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)包装一下asyncHandler以释放信号量ReleasePermitOnComplete

    ReleasePermitOnComplete

    org/asynchttpclient/filter/ReleasePermitOnComplete.java

    /**
     * Wrapper for {@link AsyncHandler}s to release a permit on {@link AsyncHandler#onCompleted()}. This is done via a dynamic proxy to preserve all interfaces of the wrapped handler.
     */
    public class ReleasePermitOnComplete {
    
      /**
       * Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}.
       *
       * @param handler   the handler to be wrapped
       * @param available the Semaphore to be released when the wrapped handler is completed
       * @param <T>       the handler result type
       * @return the wrapped handler
       */
      @SuppressWarnings("unchecked")
      public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) {
        Class<?> handlerClass = handler.getClass();
        ClassLoader classLoader = handlerClass.getClassLoader();
        Class<?>[] interfaces = allInterfaces(handlerClass);
    
        return (AsyncHandler<T>) Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            try {
              return method.invoke(handler, args);
            } finally {
              switch (method.getName()) {
                case "onCompleted":
                case "onThrowable":
                  available.release();
                default:
              }
            }
        });
      }
    
      //......
    }  
    

    ReleasePermitOnComplete的wrap对原来的handler进行代理,在finally里头执行available.release()

    preProcessRequest

    org/asynchttpclient/DefaultAsyncHttpClient.java

      /**
       * Configure and execute the associated {@link RequestFilter}. This class
       * may decorate the {@link Request} and {@link AsyncHandler}
       *
       * @param fc {@link FilterContext}
       * @return {@link FilterContext}
       */
      private <T> FilterContext<T> preProcessRequest(FilterContext<T> fc) throws FilterException {
        for (RequestFilter asyncFilter : config.getRequestFilters()) {
          fc = asyncFilter.filter(fc);
          assertNotNull(fc, "filterContext");
        }
    
        Request request = fc.getRequest();
        if (fc.getAsyncHandler() instanceof ResumableAsyncHandler) {
          request = ResumableAsyncHandler.class.cast(fc.getAsyncHandler()).adjustRequestRange(request);
        }
    
        if (request.getRangeOffset() != 0) {
          RequestBuilder builder = new RequestBuilder(request);
          builder.setHeader("Range", "bytes=" + request.getRangeOffset() + "-");
          request = builder.build();
        }
        fc = new FilterContext.FilterContextBuilder<>(fc).request(request).build();
        return fc;
      }
    

    DefaultAsyncHttpClient的preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)

    executeRequest

    org/asynchttpclient/DefaultAsyncHttpClient.java

      public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<T> handler) {
        if (config.getCookieStore() != null) {
          try {
            List<Cookie> cookies = config.getCookieStore().get(request.getUri());
            if (!cookies.isEmpty()) {
              RequestBuilder requestBuilder = new RequestBuilder(request);
              for (Cookie cookie : cookies) {
                requestBuilder.addOrReplaceCookie(cookie);
              }
              request = requestBuilder.build();
            }
          } catch (Exception e) {
            handler.onThrowable(e);
            return new ListenableFuture.CompletedFailure<>("Failed to set cookies of request", e);
          }
        }
    
        if (noRequestFilters) {
          return execute(request, handler);
        } else {
          FilterContext<T> fc = new FilterContext.FilterContextBuilder<T>().asyncHandler(handler).request(request).build();
          try {
            fc = preProcessRequest(fc);
          } catch (Exception e) {
            handler.onThrowable(e);
            return new ListenableFuture.CompletedFailure<>("preProcessRequest failed", e);
          }
    
          return execute(fc.getRequest(), fc.getAsyncHandler());
        }
      }
    

    executeRequest方法对于noRequestFilters为false会执行preProcessRequest

    小结

    AsyncHttpClient的RequestFilter定义了filter方法,它有一个实现类为ThrottleRequestFilter,使用信号量用于对请求进行限流;DefaultAsyncHttpClient的executeRequest方法对于noRequestFilters为false会执行preProcessRequest,而preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)。

    相关文章

      网友评论

          本文标题:聊聊AsyncHttpClient的RequestFilter

          本文链接:https://www.haomeiwen.com/subject/xvbmgdtx.html