美文网首页
消息堆积问题排查

消息堆积问题排查

作者: zxRay | 来源:发表于2018-10-19 17:37 被阅读304次

    背景

    我们有一个类似于消息中间件的产品,会将数据push给下游业务系统消费。业务系统可以通过我们的sdk获取消息,然后消费

    问题

    有一次收到报警,一个业务方消息堆积了几十万条,捞取一条消息观察,发现消息的发送跟收到消费完成ack有几分钟的时间间隔,这消费RT也太长了

    分析

    初步分析后,怀疑可能是以下原因造成:
    1. 服务器load问题,导致发送消息线程未及时发出,造成堆积
    2. 网络原因没有发出去
    3. 客户端原因,消费速度太慢,严重阻塞消息的发送

    排查过程

    1. 服务端排查

    先用top查看,cpu、load都比较低,用dstat查看网络流量也比较正常,初步感觉服务器没啥问题。
    使用netstat查看,引起注意的是Send-Q有40多k,Send-Q的意思是
    Send-Q:在发送缓冲区实际未发送或者没有收到对端Ack的字节数

    说明:应用已经把数据写到Socket缓冲区,因网络原因导致数据没有发送。

    2. 网络原因排查

    使用tcpdump抓取业务方ip的数据包,win被不时的变成0
    win:发送窗口,表示对端能够接受的数据量

    说明:大致可以表明业务方消费能力不足,导致消息IO线程阻塞,没有调用recv接受消息

    3. 客户端排查

    咨询业务方后,业务方表示获取消息后就直接提交到线程池中消费,应该不会阻塞消息IO线程。
    登录业务方服务器,使用netstat发现Recv-Q很大,基本可以确定业务应用消息IO线程没有及时调用recv造成数据堆积。
    使用jstack抓取线程栈,果然消息IO线程全部处于BLOCK状态,并且全部阻塞与一个LinkedBlockingQueue. 到此,基本确定因为消费线程处理能力不足,任务提交到阻塞队列,但阻塞队列满后阻塞了消息IO线程,查看业务代码,基本确定是这个原因。

    总结

    通过上面的排查,该问题的原因:
    业务消费能力不足->线程池忙->阻塞队列满->消息IO线程阻塞->没有调用recv->数据堆积到Recv-Q->发送窗口被置为0->数据堆积到服务端Send-Q->消息堆积

    解决方法: 业务方优化消费能力

    ...

    这是一个典型的生产与消费速度不匹配的问题,这类问题非常常见:

    1. 线程池:消费能力不足,任务会被放到任务队列中,如果队列无界,那么可能会爆,如果队列有界,那么任务入队列失败,会选择一种拒绝策略拒绝该任务,或者直接阻塞生产线程
    2. 连接池:很多db类产品基本还是使用连接池获取连接,当连接池满时,此时获取连接线程会阻塞一会,如果maxWaitTime还不能获取,那就直接抛异常

    在Netty发送数据的时候,也需要考虑一个发送速度过快问题,以一个简单的echo client demo为例(连接创建时,向服务端写1000000000次数据)

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channelActive..");
            for(int i = 0; i<1000000000;i++){
                ctx.write(Unpooled.copiedBuffer("test test test test test!", CharsetUtil.UTF_8));
                System.out.println(i);
            }
            ctx.flush();
        }
    

    jstat -gcutil pid 3000抓下gc情况

    S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT   
      0.00   0.00  24.00   0.00  17.39  19.90      0    0.000     0    0.000    0.000
      0.00  99.96  47.50   7.20  95.82  87.88      1    0.231     0    0.000    0.231
     99.93   0.00  87.49  22.08  95.92  87.92      2    0.530     0    0.000    0.530
     99.96   0.00   5.88  52.14  95.92  87.92      4    1.150     0    0.000    1.150
      0.00  99.93  35.52  67.18  95.92  87.92      5    1.479     0    0.000    1.479
     99.93   0.00   0.00  82.22  95.92  87.92      6    1.833     1    0.000    1.833
      0.00   0.00 100.00  87.62  95.92  87.92      6    1.833     2    2.285    4.118
      0.00   0.00 100.00  94.14  95.92  87.92      6    1.833     3    3.862    5.695
      0.00   0.00  74.11  99.98  95.92  87.92      6    1.833     4    7.324    9.156
      0.00   0.00 100.00  99.98  95.92  87.92      6    1.833     6    9.103   10.935
     
    

    很快系统就不停在fullgc,查看内存中主要的类为:

    jmap -histo 11184
    
     num     #instances         #bytes  class name
    ----------------------------------------------
       1:       2915777      256588376  io.netty.buffer.PooledUnsafeDirectByteBuf
       2:       2915777      186609728  io.netty.channel.ChannelOutboundBuffer$Entry
       3:       5831554      186609728  io.netty.util.Recycler$DefaultHandle
       4:       2915777      116631080  io.netty.channel.DefaultChannelPromise
    

    在使用非阻塞IO的发送数据时,应用都需要配一个发送缓冲区,因为你不知道啥时候IO可写. netty在write数据的时候,会将数据包装成一个Entry,放入到自己的发送缓冲区ChannelOutboundBuffer(每个channel一个), 这个ChannelOutboundBuffer实际上就是一个链表,它是无界的. 代码如下:

    --io.netty.channel.ChannelOutboundBuffer#addMessage
    public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
            }
            tailEntry = entry;
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }
    
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(entry.pendingSize, false);
        }
    

    即write会一直的创建新节点,直到OOM.

    那Netty中高水位WRITE_BUFFER_WATER_MARK为什么没启作用 ? 因为,这个配置只控制IO是否可写并且触发fireChannelWritabilityChanged事件有用户自行处理。它并不会控制channel.write行为. 代码在上面的addMessage的最后一句incrementPendingOutboundBytes

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
                setUnwritable(invokeLater);
            }
        }
    
    private void setUnwritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue | 1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue == 0 && newValue != 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }
    

    所以,在用Netty发送数据的时候,要注意发送速度,避免OOM,同时也避免发送不及时,引起不必要的FullGC

    相关文章

      网友评论

          本文标题:消息堆积问题排查

          本文链接:https://www.haomeiwen.com/subject/nvpyzftx.html