美文网首页
Apache HttpClient连接池泄露问题排查

Apache HttpClient连接池泄露问题排查

作者: hdfg159 | 来源:发表于2021-09-05 12:08 被阅读0次

    Apache HttpClient连接池泄露问题排查

    问题背景

    • 业务系统主要的业务是一个数据聚合管理平台,其中系统有一个功能是同步所有资源(简称 大同步)

    • 业务同步数据请求数据工具是适配 Apache HttpClientFeign ,这种请求封装是我当时根据业务适配业务封装请求 api

    • Feign 版本: 10.10.1

    问题来源

    • 在生产环境,大同步功能(20多个任务)发现跑了一半多的任务时候卡住,在测试环境并没有发现这个问题

    同步接口

    public interface SyncHelper {
    
        Order syncOrder();
    
        void syncAllAccount();
        
        void syncSingleAccount(Long accountId);
        
        default boolean enableSync() {
            return true;
        }
    }
    

    大同步功能实现

    @Slf4j
    @Component
    public class SyncAccountResourceListener {
        @Autowired
        private final List<SyncHelper> helpers;
        // 单线程线程池
        private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
                .setDaemon(false)
                .setNameFormat("h3c-sync-resource-%d")
                .build()
        );
        
        public void sync(){
            for (SyncHelper helper : helpers) {
                if (Thread.currentThread().isInterrupted()) {
                    log.error("[{}] sync task interrupted,account:[{}]", className, accountId);
                    continue;
                }
                Future<?> future = EXECUTOR.submit(() -> helper.syncSingleAccount(accountId));
                try {
                    future.get(helper.getTimeOut(), TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException|TimeoutException e) {
                    log.error("[{}] sync error,account:[{}]", className, accountId, e);
                } finally {
                    future.cancel(true);
                }
            }
        }
    }
    

    排查步骤

    本想着以最快速度解决问题,系统上同步进度列表 显示都卡在同一个同步类,然后粗略看了一下相关同步类的代码,发现并没有相关可能导致死循环的代码

    尝试复现

    • 在测试环境测试大同步,发现没问题(包括请求来回数据日志、数据库sql打印日志),顺利完成所有的同步任务
    • 那就针对卡住的同步类做单元测试反复执行多次,结果发现也并没有问题

    至此,问题就更加疑惑。并无法在测试环境和本地单元测试复现,生产怎么就会有相关的问题?

    死锁

    一开始没去排查死锁问题,因为大部分同步都没有用到多线程

    可能原因

    • 用到多线程在大同步资源使用单线程的线程池跑任务,然后任务超时 TimeOut 没做好任务中断的处理,导致后面任务全部阻塞
    • 看到有同事同步数据用了多线程,用的不是很合理,类似以下代码:
            List<SysDept> deptList = ......
            List<CompletableFuture<CmdbUsageReport>> futureList = new ArrayList<>();
            deptList.forEach(t -> futureList.add(
                    CompletableFuture.supplyAsync(() -> {
                        // 耗时任务
                        return report;
                    }, ioPool)));
            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
            List<CmdbUsageReport> reportList = futureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(toList());
            // ......
    

    既然排查线程的问题,直接使用相关的分析工具去分析,看一下到底是怎么回事

    分析线程状态

    使用 阿里 arthas 或者 visualvm 查看同步任务的线程状态

    启动 arthas attach 相应进程

    java -jar arthas-boot.jar
    

    thread --all 查看所有线程简单信息

    arthas查看所有线程信息.png
    使用单线程线程池跑同步任务,执行线程池线程也有自定义名称,名称为 `h3c-sync-resource-0`(进程 ID 为 250 ,线程状态为 `WAITING` )
    

    thread 250 查看同步信息进程的详细信息

    arthas查看线程信息.png
    "h3c-sync-resource-0" - Thread t@195
            java.lang.Thread.State: WAITING
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for <66bb3d00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
            at org.apache.http.pool.AbstractConnPool.access$300(AbstractConnPool.java:70)
            at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:253)
            - locked <637b83f5> (a org.apache.http.pool.AbstractConnPool$2)
            at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:198)
            at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
            at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
            at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
            at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
            at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
            at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
            at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
            at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
            at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
            at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
            at feign.httpclient.ApacheHttpClient.execute(ApacheHttpClient.java:83)
            at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:119)
            at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89)
            at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100)
            at com.sun.proxy.$Proxy360.osAggregates(Unknown Source)
    

    根据线程信息看到关键代码,卡在 feign 请求的地方,再细看发现是 apache http client dead lock 死锁

    AbstractConnPool.java:393 位于 org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking

    详细看代码,发现是取 http client 连接池的空闲连接阻塞等待导致的问题

    源码追踪

    现在访问 http client 官网看一下简单的 demo example,demo 案例访问地址:httpcomponents-client-quickstart

    会看到一个简单的使用案例:

    try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
        HttpGet httpGet = new HttpGet("http://httpbin.org/get");
        // The underlying HTTP connection is still held by the response object
        // to allow the response content to be streamed directly from the network socket.
        // In order to ensure correct deallocation of system resources
        // the user MUST call CloseableHttpResponse#close() from a finally clause.
        // Please note that if response content is not fully consumed the underlying
        // connection cannot be safely re-used and will be shut down and discarded
        // by the connection manager.
        try (CloseableHttpResponse response1 = httpclient.execute(httpGet)) {
            System.out.println(response1.getCode() + " " + response1.getReasonPhrase());
            HttpEntity entity1 = response1.getEntity();
            // do something useful with the response body
            // and ensure it is fully consumed
            EntityUtils.consume(entity1);
        }
    }
    
    • 官方简单使用 java7 try-with-resources 的形式去使用 httpclient ,使用完成自动释放资源
    • 官方文档解释也说了:如果连接安全回收重用,需要使用 EntityUtils.consume 去消费 响应内容 并关闭流
    • 最后线程阻塞等待问题可能原因是没有及时消费完响应内容

    追踪 Feign 源码

    现在看一下 Apache HttpClient 转换 Feign 请求的大概流程和源码, HttpClient 转换 Feign Response 方法 : feign.httpclient.ApacheHttpClient.toFeignBody

      Response.Body toFeignBody(HttpResponse httpResponse) {
        final HttpEntity entity = httpResponse.getEntity();
        if (entity == null) {
          return null;
        }
        return new Response.Body() {
    
          @Override
          public Integer length() {
            // 如果是 Transfer-Encoding: chunked  length 必定是返回 null
            return entity.getContentLength() >= 0 && entity.getContentLength() <= Integer.MAX_VALUE
                ? (int) entity.getContentLength()
                : null;
          }
    
          @Override
          public boolean isRepeatable() {
            return entity.isRepeatable();
          }
    
          @Override
          public InputStream asInputStream() throws IOException {
            // 单纯传递 InputStream
            return entity.getContent();
          }
    
          @SuppressWarnings("deprecation")
          @Override
          public Reader asReader() throws IOException {
            return new InputStreamReader(asInputStream(), UTF_8);
          }
    
          @Override
          public Reader asReader(Charset charset) throws IOException {
            Util.checkNotNull(charset, "charset should not be null");
            return new InputStreamReader(asInputStream(), charset);
          }
    
          @Override
          public void close() throws IOException {
            // 资源回收方法
            EntityUtils.consume(entity);
          }
        };
      }
    

    因为响应内容需要完全被消费才能回到连接池重用连接, org.apache.http.util.EntityUtils.consume 大概代码如下:

        public static void consume(final HttpEntity entity) throws IOException {
            if (entity == null) {
                return;
            }
            if (entity.isStreaming()) {
                // 还在传输状态,获取流
                final InputStream inStream = entity.getContent();
                // close 直接关闭回收资源
                if (inStream != null) {
                    inStream.close();
                }
            }
        }
    

    EntityUtils.consume 消费响应内容并安全重用连接 流程如下,如果有兴趣可以自己去看一下,这边就不长篇讨论了:

    org.apache.http.util.EntityUtils.consume
    
    org.apache.http.impl.execchain.ResponseEntityProxy.getContent 包装成自动释放连接的 EofSensorInputStream
    
    org.apache.http.conn.EofSensorInputStream.close
    
    org.apache.http.conn.EofSensorInputStream.checkClose
    
    org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed
    
    org.apache.http.impl.execchain.ResponseEntityProxy.releaseConnection
    

    feign.AsyncResponseHandler#handleResponse

      void handleResponse(CompletableFuture<Object> resultFuture,
                          String configKey,
                          Response response,
                          Type returnType,
                          long elapsedTime) {
        // copied fairly liberally from SynchronousMethodHandler
        boolean shouldClose = true;
    
        try {
          if (logLevel != Level.NONE) {
            // 日志级别不是NONE就输出日志
            response = logger.logAndRebufferResponse(configKey, logLevel, response,
                elapsedTime);
          }
          if (Response.class == returnType) {
            if (response.body() == null) {
              resultFuture.complete(response);
            } else if (response.body().length() == null
                || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
              // 如果是 Transfer-Encoding: chunked  length 必定是返回 null,导致下面 finally 块 没有关闭回收资源
              shouldClose = false;
              resultFuture.complete(response);
            } else {
              // Ensure the response body is disconnected
              // InputStream 转 byte[] 回收资源,并存回 response
              final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
              resultFuture.complete(response.toBuilder().body(bodyData).build());
            }
          } else if (response.status() >= 200 && response.status() < 300) {
            if (isVoidType(returnType)) {
              resultFuture.complete(null);
            } else {
              final Object result = decode(response, returnType);
              shouldClose = closeAfterDecode;
              resultFuture.complete(result);
            }
          } else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
            final Object result = decode(response, returnType);
            shouldClose = closeAfterDecode;
            resultFuture.complete(result);
          } else {
            resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
          }
        } catch (final IOException e) {
          if (logLevel != Level.NONE) {
            logger.logIOException(configKey, logLevel, e, elapsedTime);
          }
          resultFuture.completeExceptionally(errorReading(response.request(), response, e));
        } catch (final Exception e) {
          resultFuture.completeExceptionally(e);
        } finally {
          if (shouldClose) {
            ensureClosed(response.body());
          }
        }
    
      }
    

    feign.slf4j.Slf4jLogger.logAndRebufferResponse

      protected Response logAndRebufferResponse(String configKey,
                                                Level logLevel,
                                                Response response,
                                                long elapsedTime)
          throws IOException {
        if (logger.isDebugEnabled()) {
          // DEBUG 级别调用父类方法进行日志输出
          return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
        }
        return response;
      }
    

    feign.Logger.logAndRebufferResponse

      protected Response logAndRebufferResponse(String configKey,
                                                Level logLevel,
                                                Response response,
                                                long elapsedTime)
          throws IOException {
        String reason =
            response.reason() != null && logLevel.compareTo(Level.NONE) > 0 ? " " + response.reason()
                : "";
        int status = response.status();
        log(configKey, "<--- HTTP/1.1 %s%s (%sms)", status, reason, elapsedTime);
        if (logLevel.ordinal() >= Level.HEADERS.ordinal()) {
           // 大于 HEADERS 级别进行日志输出
          for (String field : response.headers().keySet()) {
            for (String value : valuesOrEmpty(response.headers(), field)) {
              log(configKey, "%s: %s", field, value);
            }
          }
    
          int bodyLength = 0;
          if (response.body() != null && !(status == 204 || status == 205)) {
            // HTTP 204 No Content "...response MUST NOT include a message-body"
            // HTTP 205 Reset Content "...response MUST NOT include an entity"
            if (logLevel.ordinal() >= Level.FULL.ordinal()) {
              log(configKey, ""); // CRLF
            }
            // 这里很关键,读取 InputStream 转换成 byte[],调用 InputStream close 进行资源回收操作
            byte[] bodyData = Util.toByteArray(response.body().asInputStream());
            bodyLength = bodyData.length;
            if (logLevel.ordinal() >= Level.FULL.ordinal() && bodyLength > 0) {
              log(configKey, "%s", decodeOrDefault(bodyData, UTF_8, "Binary data"));
            }
            log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
            // 因为当前 response.body() 被消费,重新存回去 response
            return response.toBuilder().body(bodyData).build();
          } else {
            log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
          }
        }
        return response;
      }
    

    feign.Util.toByteArray

      public static byte[] toByteArray(InputStream in) throws IOException {
        checkNotNull(in, "in");
        try {
          ByteArrayOutputStream out = new ByteArrayOutputStream();
          copy(in, out);
          return out.toByteArray();
        } finally {
          // closeable close 关闭回收资源
          ensureClosed(in);
        }
      }
    
    追踪 Feign Slf4jLogger `feign/Logger.java:84` , 发现 `feign.Logger#logAndRebufferResponse` 会读取一次 `body` 内容
    
    大于`HEADERS` 级别:`logLevel.ordinal() >= Level.HEADERS.ordinal()`,在后面 `byte[] bodyData = Util.toByteArray(response.body().asInputStream());`,进行一次数据拷贝,然后 close 掉原来的 `InputStream`
    

    读源码思考

    源码分析到这个地方以后,发现一般情况下也会自动释放掉相应内容:

    • 开启日志(Feign 日志级别大于/等于 HEADERS && Logger 级别小于/等于 DEBUG)

      • 会拷贝原有的响应信息response.body() 并释放掉 InputStream
    • 关闭日志(Feign 日志级别小于 HEADERS && Logger 级别大于 DEBUG)

      • Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)这种情况不会释放掉响应信息
      • closeAfterDecode 为 false 不会释放掉响应信息

    验证问题

    生产环境 Feign 日志级别是 FULL(大于 HEADERS),但是 Logger 开启的日志级别是 INFO,尝试以下步骤复现问题

    • 构建 ApacheHttpClient 把连接池数量调整到最低
    CLIENT = new ApacheHttpClient(
                    HttpClientBuilder.create()
                            .setSSLSocketFactory(
                                    new SSLConnectionSocketFactory(trustAllSslSocketFactory(),
                                            (hostname, session) -> true)
                            )
                            // 设置最大路由
                            .setMaxConnPerRoute(1)
                            // 设置最大连接总数
                            .setMaxConnTotal(1)
                            .setDefaultRequestConfig(
                                    RequestConfig.custom()
                                            .setConnectTimeout(CONNECT_TIMEOUT)
                                            .setSocketTimeout(REQ_TIMEOUT)
                                            .build()
                            )
                            .useSystemProperties()
                            .build()
    
    • 本地同步的单元测试 Logger 级别配置等于 INFO && Feign 日志级别 大于/等于 HEADERS,单元测试结果同步任务一段时间后 死锁
    • 本地同步的单元测试 Logger 级别配置等于 DEBUG && Feign 日志级别 小于 HEADERS,单元测试结果同步任务一段时间后 死锁
    • 本地同步的单元测试 Logger 级别配置等于 DEBUG && Feign 日志级别 大于/等于 HEADERS(这就是测试、开发环境的配置),单元测试结果同步任务一段时间后 没发现死锁

    单元测试和源码分析后得到问题结果就非常明显:没有完全消费释放 响应信息 导致连接池连接无法安全复用

    追溯问题根源

    回顾上面源码分析的 feign.AsyncResponseHandler#handleResponse 源码, closeAfterDecode 默认为 true ,那就只有是以下情况才会不释放响应内容:
    Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)

    排查项目代码 是否存在 Response 返回类型,且请求响应体的长度为空的

    回想起来请求对应 api 时候会异步校验并刷新 token,检验 token 不需要处理返回值,之前用了 Response 并判断返回码,以下是部分关键代码:

    校验 token Feign 接口

        @RequestLine("GET /sys/identity/v2/tokens")
        @Headers({"X-Subject-Token: {token}"})
        Response verifyToken(@Param("token") String token);
    

    校验 token 代码

        @Override
        public boolean verifyToken(H3CClientConfig config, String token) {
            IdentityApi identityApi = Feigns.h3c(IdentityApi.class, config);
            Response response = identityApi.verifyToken(token);
            return response.status() == HttpStatus.OK.value();
        }
    

    请求响应日志

    GET https://19.50.81.200:8100/sys/identity/v2/tokens
    
    HTTP/1.1 200 OK
    Date: Sun, 05 Sep 2021 02:55:09 GMT
    Content-Type: application/json; charset=UTF-8
    Transfer-Encoding: chunked
    Connection: keep-alive
    X-Subject-Token: eyJjdHkiOiJKV1QiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiZGlyIn0..yWaex8RVEvBvw1D-kk_LMQ.IwiMpoRWSPsUZiEpr09tL7WDZ1-vIRZqFsGqQq2CV4wBp6S8mBIhICI3Ce2sE_TLA_A2oX_NnMpAf5D4C_DwunJaiJ3lnD51Sg1bxWao_gXnPS7JdfpyaRXY-rtPMaxs-0FisUuyVlKfQh3Ab8t3WsCLzU9Yz7sQ367CKtW1z32ttafrWRlotLN0y7XX3ZRz7Ttznm2cZ5Ae79MEPQF1-hbKiGoz4B8kR1NRgeL-arlpa8qtgERYEEtr-VtJgydDpylusItc_uOtPqwEh0HAgYQjJovF75pej5WlCgdzYVQMr08OGT0JnBrReWYxl0h2P0xxZQtNcM2d0T54TebvvRhQKRyywvasQ064FS4B4mGN-8E3TZkxSfSfr4OWZ1Nmwpr3xFGBOSVpKf5-AufCoXPW3yGu3vFSpCahoKq01n9_gd4UbKLE82Cwou4uZf4VMZ7A7hOAdWYo_geb1bTzLUyTdDSUgbS8XiiYCOpaA4euv409ELE22U77F940M2DO2y8lbaDuk4iAv3QIp5gCGg.9pzTvRPM-FAMa-17a2J5kQ
    

    上述代码我并没有处理 Response 响应,只是单纯判断一下响应码 ,但是通过请求来回日志发现请求响应的响应信息有点不一样:没有 Content-Length 字段且 带有响应头为 Transfer-Encoding: chunked

    关于 Http Chuncked
    Chuncked InputStream 的响应头是 Transfer-Encoding: chunked , 是未知长度, 没有 Content-Length 字段,最后一个数据块的长度必须为 0 ,也可以理解为 http 报文以0\r\n\r\n 来结尾。

    刚好符合泄露的条件 Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) 如果没及时释放掉 Response 会导致连接池的连接无法安全重用。

    问题总结

    最终发现问题的根源是 输入流 InputStream 泄露导致请求连接池未能正常释放,feign 请求返回值使用 Response 并没有用 try-with-resources 形式包裹,导致没执行 close 方法

    排查问题的关键:

    • Feign 源码需要有一定的调试分析能力
    • 掌握常规的应用性能分析方法
    • 业务上了解生产、测试环境的区别,最好测试环境和生产的配置尽量保持一致,尽早发现问题

    解决方案

    Feign 请求返回值使用 Response 响应类型的时候,使用 java7 try-with-resources 形式或者 try-finally 及时释放资源

    相关文章

      网友评论

          本文标题:Apache HttpClient连接池泄露问题排查

          本文链接:https://www.haomeiwen.com/subject/sqjjwltx.html