美文网首页
队列消费者线程不工作问题排查

队列消费者线程不工作问题排查

作者: 望穿天堂 | 来源:发表于2019-08-21 15:24 被阅读0次

    现象:队列在抛出异常后不再进行消费,但是线程仍然存活

    查看线程状态

    1. 进入docker
    2. jps -l查看pid
    3. jstack -l pid
        "pool-1-thread-1" #37 prio=5 os_prio=0 cpu=100.93ms elapsed=125.98s tid=0x00007fc530ccd800 nid=0x39 waiting on condition  [0x00007fc4dc9b9000]
       java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.2/Native Method)
        - parking to wait for  <0x0000000095cc6b80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.2/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.2/AbstractQueuedSynchronizer.java:2081)
        at org.apache.http.pool.PoolEntryFuture.await(PoolEntryFuture.java:138)
        at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:306)
        at org.apache.http.pool.AbstractConnPool.access$000(AbstractConnPool.java:64)
        at org.apache.http.pool.AbstractConnPool$2.getPoolEntry(AbstractConnPool.java:192)
        at org.apache.http.pool.AbstractConnPool$2.getPoolEntry(AbstractConnPool.java:185)
        at org.apache.http.pool.PoolEntryFuture.get(PoolEntryFuture.java:107)
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:276)
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
        at com.XXXXXX.taiji.common.api.ExternalAPIClient.lambda$callWithTracer$0(ExternalAPIClient.java:81)
        at com.XXXXXX.taiji.common.api.ExternalAPIClient$$Lambda$374/0x00000001005cd440.run(Unknown Source)
        at com.XXXXXX.taiji.common.tracer.TracerSupport.lambda$new$0(TracerSupport.java:6)
        at com.XXXXXX.taiji.common.tracer.TracerSupport$$Lambda$310/0x000000010045d840.with(Unknown Source)
        at com.XXXXXX.taiji.common.tracer.TracerSupport.lambda$tracer$1(TracerSupport.java:17)
        at com.XXXXXX.taiji.common.tracer.TracerSupport$$Lambda$331/0x00000001004d6440.run(Unknown Source)
        at com.XXXXXX.taiji.common.cat.CatExternalAPITracer.with(CatExternalAPITracer.java:22)
        at com.XXXXXX.taiji.common.cat.CatExternalAPITracer.with(CatExternalAPITracer.java:11)
        at com.XXXXXX.taiji.common.tracer.TracerSupport.lambda$tracer$2(TracerSupport.java:16)
        at com.XXXXXX.taiji.common.tracer.TracerSupport$$Lambda$312/0x000000010045d040.with(Unknown Source)
        at com.XXXXXX.taiji.common.api.ExternalAPIClient.callWithTracer(ExternalAPIClient.java:79)
        at com.XXXXXX.taiji.common.api.ExternalAPIClient.executeWithJSON(ExternalAPIClient.java:72)
        at com.XXXXXX.taiji.qingniao.service.factory.WeiWangSendService.sendMsg(WeiWangSendService.java:76)
        at com.XXXXXX.taiji.qingniao.service.factory.AbstractSendService.sendSMS(AbstractSendService.java:63)
        at com.XXXXXX.taiji.qingniao.service.MessageService.deal(MessageService.java:286)
        at com.XXXXXX.taiji.qingniao.redismq.SendMessageRedisQueue.lambda$start$0(SendMessageRedisQueue.java:74)
        at com.XXXXXX.taiji.qingniao.redismq.SendMessageRedisQueue$$Lambda$315/0x000000010045a440.accept(Unknown Source)
        at com.XXXXXX.taiji.common.redismq.RedisQueueWorker.lambda$kickit$1(RedisQueueWorker.java:85)
        at com.XXXXXX.taiji.common.redismq.RedisQueueWorker$$Lambda$372/0x00000001005ce440.run(Unknown Source)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.2/Executors.java:515)
        at java.util.concurrent.FutureTask.run(java.base@11.0.2/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.2/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.2/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.2/Thread.java:834)
    

    waiting on condition

    通过jstack -l pid发现pool-1-thread-1这个线程有问题

    线程状态为“waiting on condition”:

    说明它在等待另一个条件的发生,来把自己唤醒,或者干脆它是调用了 sleep(N)。

    此时线程状态大致为以下几种:

    java.lang.Thread.State: WAITING (parking):线程挂起,一直等那个条件发生;

    java.lang.Thread.State: TIMED_WAITING (parking或sleeping):定时的,那个条件不到来,也将定时唤醒自己。

    at org.apache.http.pool.PoolEntryFuture.await

    由at org.apache.http.pool.PoolEntryFuture.await可知,是http连接池被拿光了连接数

    推测可能是由于http的连接资源没有正确释放导致的

    配合业务日志打印的异常“api xxxx respond status code 503”

    推测可能是http请求过程中抛出异常导致了没有正确close

    代码排查

    this.tracer().with(traceKey, req, () -> {
        try {
            var res = this.client.execute(req);
            var code = res.getStatusLine().getStatusCode();
            if (code < 200 || code >= 300) {
                throw GlobalErrors.API_STATUS_ERROR.args(path, code);
            }
            var output = new ByteArrayOutputStream();
            var input = res.getEntity().getContent();
            IOUtils.copy(input, output);
            input.close();
            String content = output.toString();
            if (content.isEmpty()) {
                throw GlobalErrors.API_ACCESS_ERROR.args(path, "http body is empty");
            }
            holder.value(content);
        } catch (IOException e) {
            throw GlobalErrors.API_ACCESS_ERROR.args(path, e.getMessage(), e);
        }
    });
    

    res、output、input一共三个

    OutputStream和InputStream的close方法是一个空方法,交给jvm的gc来处理,不关闭也没事

    this.client.execute(req)返回的是一个CloseableHttpResponse(接口),源码比较复杂,最终找到res的close方法调用了releaseConnection

    public void close() throws IOException {
        this.releaseConnection(false);
    }
    

    那么应该是Response没有close导致的

    AutoCloseable

    java的输入输出流、各种Connection,都继承了AutoCloseable接口

    看了下AutoCloseable接口的源码,注释比较长,从注释可知它的出现是为了更好的管理资源,准确说是资源的释放,当一个资源类实现了该接口close方法,在使用try-catch-resources语法创建的资源抛出异常后,JVM会自动调用close 方法进行资源释放,当没有抛出异常正常退出try-block时候也会调用close方法。

    try-catch-resources语法

    try-catch-resources语法自jdk1.7新增,在try的()内部创建资源,创建的资源在退出try-block时候会自动调用该资源的close方法

    示例:

    public class AutoCloseableDemo {
        public static void main(String[] args) {
            try (AutoCloseableApp app = new AutoCloseableApp(); AutoCloseableApp2 app2 = new AutoCloseableApp2()) {
                System.out.println("--执行main方法--");
                throw new RuntimeException("--exception--");
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                System.out.println("--finally--");
            }
        }
    
        public static class AutoCloseableApp implements AutoCloseable {
            @Override
            public void close() throws Exception {
                System.out.println("--close1--");
            }
        }
    
        public static class AutoCloseableApp2 implements AutoCloseable {
            @Override
            public void close() throws Exception {
                System.out.println("--close2--");
            }
        }
    }
    

    输出:

    --执行main方法--
    --close2--
    --close1--
    --exception--
    --finally--
    
    1. 由带资源的try语句管理的资源必须是实现了AutoCloseable接口的类的对象
    2. 在try代码中声明的资源被隐式声明为fianl
    3. 通过使用分号分隔每个声明可以管理多个资源
    4. 执行顺序:close-catch-finally
    5. 有多个资源时,关闭顺序为资源声明顺序的反序

    修改后的代码

    this.tracer().with(traceKey, req, () -> {
        try (var res = this.client.execute(req)) {
            var code = res.getStatusLine().getStatusCode();
            if (code < 200 || code >= 300) {
                throw GlobalErrors.API_STATUS_ERROR.args(path, code);
            }
            var output = new ByteArrayOutputStream();
            var input = res.getEntity().getContent();
            IOUtils.copy(input, output);
            String content = output.toString();
            if (content.isEmpty()) {
                throw GlobalErrors.API_ACCESS_ERROR.args(path, "http body is empty");
            }
            holder.value(content);
        } catch (IOException e) {
            throw GlobalErrors.API_ACCESS_ERROR.args(path, e.getMessage(), e);
        }
    });
    

    问题来了,为什么不抛出异常的时候不close也没事?

    this.tracer().with(traceKey, req, () -> {
        try {
            var res = this.client.execute(req);
            var code = res.getStatusLine().getStatusCode();
            var output = new ByteArrayOutputStream();
            var input = res.getEntity().getContent();
            IOUtils.copy(input, output);
            if (code < 1000) {
                throw GlobalErrors.API_STATUS_ERROR.args(path, code);
            }
            String content = output.toString();
            if (content.isEmpty()) {
                throw GlobalErrors.API_ACCESS_ERROR.args(path, "http body is empty");
            }
            holder.value(content);
        } catch (IOException e) {
            throw GlobalErrors.API_ACCESS_ERROR.args(path, e.getMessage(), e);
        }
    });
    

    以上这段代码,一定会抛出异常throw GlobalErrors.API_STATUS_ERROR.args(path, code),res也没有close,但是并不会出现线程park的情况。

    当把IOUtils.copy(input, output)这一行代码挪到抛出异常下面时,线程会park

    查看了一下IOUtils.copy的源码,也没发现什么特殊之处

    还需要继续研究

    相关文章

      网友评论

          本文标题:队列消费者线程不工作问题排查

          本文链接:https://www.haomeiwen.com/subject/zntisctx.html