背景
我们有一个类似于消息中间件的产品,会将数据push给下游业务系统消费。业务系统可以通过我们的sdk获取消息,然后消费
问题
有一次收到报警,一个业务方消息堆积了几十万条,捞取一条消息观察,发现消息的发送跟收到消费完成ack有几分钟的时间间隔,这消费RT也太长了
分析
初步分析后,怀疑可能是以下原因造成:
1. 服务器load问题,导致发送消息线程未及时发出,造成堆积
2. 网络原因没有发出去
3. 客户端原因,消费速度太慢,严重阻塞消息的发送
排查过程
1. 服务端排查
先用top查看,cpu、load都比较低,用dstat查看网络流量也比较正常,初步感觉服务器没啥问题。
使用netstat查看,引起注意的是Send-Q有40多k,Send-Q的意思是
Send-Q:在发送缓冲区实际未发送或者没有收到对端Ack的字节数
说明:应用已经把数据写到Socket缓冲区,因网络原因导致数据没有发送。
2. 网络原因排查
使用tcpdump抓取业务方ip的数据包,win被不时的变成0
win:发送窗口,表示对端能够接受的数据量
说明:大致可以表明业务方消费能力不足,导致消息IO线程阻塞,没有调用recv接受消息
3. 客户端排查
咨询业务方后,业务方表示获取消息后就直接提交到线程池中消费,应该不会阻塞消息IO线程。
登录业务方服务器,使用netstat发现Recv-Q很大,基本可以确定业务应用消息IO线程没有及时调用recv造成数据堆积。
使用jstack抓取线程栈,果然消息IO线程全部处于BLOCK状态,并且全部阻塞与一个LinkedBlockingQueue
. 到此,基本确定因为消费线程处理能力不足,任务提交到阻塞队列,但阻塞队列满后阻塞了消息IO线程,查看业务代码,基本确定是这个原因。
总结
通过上面的排查,该问题的原因:
业务消费能力不足->线程池忙->阻塞队列满->消息IO线程阻塞->没有调用recv->数据堆积到Recv-Q->发送窗口被置为0->数据堆积到服务端Send-Q->消息堆积
解决方法: 业务方优化消费能力
...
这是一个典型的生产与消费速度不匹配的问题,这类问题非常常见:
- 线程池:消费能力不足,任务会被放到任务队列中,如果队列无界,那么可能会爆,如果队列有界,那么任务入队列失败,会选择一种拒绝策略拒绝该任务,或者直接阻塞生产线程
- 连接池:很多db类产品基本还是使用连接池获取连接,当连接池满时,此时获取连接线程会阻塞一会,如果maxWaitTime还不能获取,那就直接抛异常
在Netty发送数据的时候,也需要考虑一个发送速度过快问题,以一个简单的echo client demo为例(连接创建时,向服务端写1000000000次数据)
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channelActive..");
for(int i = 0; i<1000000000;i++){
ctx.write(Unpooled.copiedBuffer("test test test test test!", CharsetUtil.UTF_8));
System.out.println(i);
}
ctx.flush();
}
用jstat -gcutil pid 3000
抓下gc情况
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
0.00 0.00 24.00 0.00 17.39 19.90 0 0.000 0 0.000 0.000
0.00 99.96 47.50 7.20 95.82 87.88 1 0.231 0 0.000 0.231
99.93 0.00 87.49 22.08 95.92 87.92 2 0.530 0 0.000 0.530
99.96 0.00 5.88 52.14 95.92 87.92 4 1.150 0 0.000 1.150
0.00 99.93 35.52 67.18 95.92 87.92 5 1.479 0 0.000 1.479
99.93 0.00 0.00 82.22 95.92 87.92 6 1.833 1 0.000 1.833
0.00 0.00 100.00 87.62 95.92 87.92 6 1.833 2 2.285 4.118
0.00 0.00 100.00 94.14 95.92 87.92 6 1.833 3 3.862 5.695
0.00 0.00 74.11 99.98 95.92 87.92 6 1.833 4 7.324 9.156
0.00 0.00 100.00 99.98 95.92 87.92 6 1.833 6 9.103 10.935
很快系统就不停在fullgc,查看内存中主要的类为:
jmap -histo 11184
num #instances #bytes class name
----------------------------------------------
1: 2915777 256588376 io.netty.buffer.PooledUnsafeDirectByteBuf
2: 2915777 186609728 io.netty.channel.ChannelOutboundBuffer$Entry
3: 5831554 186609728 io.netty.util.Recycler$DefaultHandle
4: 2915777 116631080 io.netty.channel.DefaultChannelPromise
在使用非阻塞IO的发送数据时,应用都需要配一个发送缓冲区,因为你不知道啥时候IO可写. netty在write数据的时候,会将数据包装成一个Entry
,放入到自己的发送缓冲区ChannelOutboundBuffer
(每个channel一个), 这个ChannelOutboundBuffer
实际上就是一个链表,它是无界的. 代码如下:
--io.netty.channel.ChannelOutboundBuffer#addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
即write会一直的创建新节点,直到OOM.
那Netty中高水位WRITE_BUFFER_WATER_MARK
为什么没启作用 ? 因为,这个配置只控制IO是否可写并且触发fireChannelWritabilityChanged
事件有用户自行处理。它并不会控制channel.write
行为. 代码在上面的addMessage
的最后一句incrementPendingOutboundBytes
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
所以,在用Netty发送数据的时候,要注意发送速度,避免OOM,同时也避免发送不及时,引起不必要的FullGC
网友评论