美文网首页Java性能调优netty网络
一次netty"引发的"诡异old gc问题

一次netty"引发的"诡异old gc问题

作者: 简书闪电侠 | 来源:发表于2016-09-11 14:51 被阅读2892次

应用:新美大push服务-长连通道sailfish
日推送消息:180亿
QPS峰值: 35W
最大实时在线用户:2200W

push服务简单结构为

客户端sdk<=>长连通道<=>pushServer

1.客户端sdk: 负责提供客户客户端收发push的api
2.长连通道:负责维持海量客户端连接
3.pushServer:负责给业务方提供收发push的rpc服务,与长连通道通过tcp连接,自定义协议,具体的push服务设计另起文章

首先依据这篇文章把push长连通道应用的jvm参数调到最优,见海量连接服务端jvm参数调优杂记, 剩下的都是这篇文章之后所发生

2016年9月2号6:00 左右陆续收到两台机器的报警,上去看一下cat监控

改造之前gc情况

发现 在凌晨4:11分左右,这台机器cms old区域到达old gc阀值1525M(old区域设置为2048M, -XX:CMSInitiatingOccupancyFraction=70,所以阀值为1433M,前一分钟为1428.7M),于是进行old gc,结果进行一次old gc之后,啥也没回收掉,接下来一次次old gc,old区不减反增,甚是诡异!

gc日志

在4:10:29开始频繁old gc(其实这是第二次old gc了,之前已经有过一次,不过可以忽略,我就拿这次来分析),发现old gc过后,old区域大小基本没变,所以这个时候可以断定old区里面肯定有一直被引用的对象,猜测为缓存之类的对象

使用 jmap -dump:live,format=b,file=xxx [pid] 先触发一次gc再dump
重点关注这台10.32.145.237

dump 的时候,花了long long的时间,为了不影响线上引用,遂放弃。。。

9月3号早上又发现old gc,于是连忙起床去dump内存,总内存为1.8G,MAT载入分析

堆内存

光这两个家伙就占据了71.24%,其他的可以忽略不计
然后看到NioSocketChannel这个家伙,对应着某条TCP连接,于是追根溯源,找到这条连接对应的机器

NioSocketChannel 堆内存

然后去cmdb里面一查

cmdb

发现是pushServer的机器。长连通道服务器是用netty实现,自带缓冲区,对外连接着海量的客户端,将海量用户的请求转发给pushServer,而pushServer是BIO实现,无IO缓冲区,当pushServer的TCP缓冲区满了之后,TCP滑动窗口为0,那么长连服务器发送给这台机器的消息netty就一直会保存在自带的缓冲区ChannelOutBoundBuffer里,撑大old区。接下来需要进一步验证

9月5号早上,来公司验证,跑到10.12.22.253这台机器看一下tcp底层缓冲区的情况

tcp -antp | grep 9000

发现tcp发送队列积压了这么多数据没发出去,这种情况发生的原因是接收方来不及处理,接收方的接收队列里面数据积压,于是导致发送方发送不出去,接下来就跑到接收方机器上看下tcp的接收队列

10.32.177.127$ tcp -antp | grep 9000 10.4.210.192$ tcp -antp | grep 9000 10.4.210.193$ tcp -antp | grep 9000

果不其然,三台机器接受队列都撑得很大,到这里,问题基本排查出来了,结论是接收方处理速度过慢导致发送方积压消息过多,netty会把要发送的消息保存在ChannelOutboundBuffer里面,随着积压的消息越来越多,导致old区域逐渐扩大,最终old gc,然而这些消息对象都是live的,因此回收不掉,所以频繁old gc

9月5号下午
考虑到pushServer改造nio需要一段时间,长连通道这边又无法忍受频繁old gc而不得不重启应用,于是在通道端做了一点更改,在选择pushServer写的时候,只选择可写的Channel

 public ChannelGroupFuture writeAndFlushRandom(Object message) {
        final int size = super.size();
        if (size <= 0) {
            return super.writeAndFlush(message);
        }

        return super.writeAndFlush(message, new ChannelMatcher() {
            private int index = 0;
            private int matchedIndex = random.nextInt(size());

            @Override
            public boolean matches(Channel channel) {
                return matchedIndex == index++ && channel.isWritable();
            }
        });
    }

以上&& channel.isWritable()为新添加代码,追踪一下isWritable方法的实现,最终是调用到ChannelOutboundBuffer的isWritable方法

AbstractChannel.java

@Override
    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable();
    }

ChannelOutboundBuffer.java

  public boolean isWritable() {
        return unwritable == 0;
    }

而unwritable这个field是在这里被确定

ChannelOutboundBuffer.java

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

其中channel.config().getWriteBufferHighWaterMark()返回的field是ChannelConfig里面对应的writeBufferHighWaterMark,可以看到,默认值为64K, 表示如果你在写之前调用调用isWriteable方法,netty最多给你缓存64K的数据, 否则,缓存就一直膨胀

DefaultChannelConfig.java

private volatile int writeBufferHighWaterMark = 64 * 1024;

由此可见,Channel可写至少是TCP缓冲区+netty缓冲区(默认64K)都没有写满, 我这边的做法就是当某个Channel写满之后,就放弃这条Channel,随机选择其他的Channel。

改造完之后,观察了一个多礼拜,old区域已缓慢稳定增长,达到预期效果

改造之后gc情况

可以发现,每次old区域都是1M左右的增长

另外一个问题:每次olg gc的时候重启机器,瞬间异常井喷

TransferToPushServerException 异常井喷

重启三次,三次异常,结合前面的ChannelOutboundBuffer,不难分析,这些写失败的都是之前被堵塞的buffer,重启之后,关闭与pushServer的连接,进入到如下方法

AbstractChannel.java

  public final void close(final ChannelPromise promise) {
            if (!promise.setUncancellable()) {
                return;
            }

            if (outboundBuffer == null) {
                // Only needed if no VoidChannelPromise.
                if (!(promise instanceof VoidChannelPromise)) {
                    // This means close() was called before so we just register a listener and return
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            if (closeFuture.isDone()) {
                // Closed already.
                safeSetSuccess(promise);
                return;
            }

            final boolean wasActive = isActive();
            final ChannelOutboundBuffer buffer = outboundBuffer;
            outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
            Executor closeExecutor = closeExecutor();
            if (closeExecutor != null) {
                closeExecutor.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new OneTimeTask() {
                                @Override
                                public void run() {
                                    // Fail all the queued messages
                                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    // Fail all the queued messages.
                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                }
                if (inFlush0) {
                    invokeLater(new OneTimeTask() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

程序进入到outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
然后进入到ChannelOutboundBufferfailFlushed方法

void failFlushed(Throwable cause, boolean notify) {
        // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
        // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
        // indirectly (usually by closing the channel.)
        //
        // See https://github.com/netty/netty/issues/1501
        if (inFail) {
            return;
        }

        try {
            inFail = true;
            for (;;) {
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }

这里的for循环导致remove0 会遍历Entry缓存对象链表

private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, fail and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);

            safeFail(promise, cause);
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // recycle the entry
        e.recycle();

        return true;
    }
private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 指针指向下一个待删除的缓存
            flushedEntry = e.next;
        }
    }

直到所有的缓存对象都被remove掉,remove0 每调用一次都会调用一次safeFail(promise, cause)方法,

 private static void safeFail(ChannelPromise promise, Throwable cause) {
        if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
            logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
        }
    }

然后进入到

DefaultPromise.java

 @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

DefaultPromise.java

static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }

最终一个future回调,调回到用户方法

@Override
    protected void channelRead0(ChannelHandlerContext ctx, TransferFromSdkDataPacket msg) throws Exception {
        TransferToPushServerDataPacket dataPacket = new TransferToPushServerDataPacket();

        dataPacket.setVersion(Constants.PUSH_SERVER_VERSION);
        dataPacket.setData(msg.getData());
        dataPacket.setConnectionId(ctx.channel().attr(AttributeKeys.CONNECTION_ID).get());

        final long startTime = System.nanoTime();

        try {
            ChannelGroupFuture channelFutures = pushServerChannels.writeAndFlushRandom(dataPacket);
            channelFutures.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        CatUtil.logTransaction(startTime, null, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    } else {
                        Channel channel = (Channel) ((ChannelGroupFuture) future).group().toArray()[0];
                        final String pushServer = channel.remoteAddress().toString();
                        TransferToPushServerException e = new TransferToPushServerException(String.format("pushServer: %s", pushServer), future.cause());

                        CatUtil.logTransaction(new CatUtil.CatTransactionCallBack() {
                            @Override
                            protected void beforeComplete() {
                                Cat.logEvent(CatEvents.WriteToPushServerError, pushServer);
                            }
                        }, startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    }
                }

            });
        } catch (Exception e) {
            CatUtil.logTransaction(startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
        }
    }

而在用户方法里面,我们包装了一下自定义异常,喷到cat,导致瞬间TransferToPushServerException飙高

如果你觉得看的不过瘾,想系统学习Netty原理,那么你一定不要错过我的Netty源码分析系列视频:https://coding.imooc.com/class/230.html

相关文章

网友评论

  • 9dd25e6200bc:“TCP滑动窗口为0” 是指滑动窗口里都是已发送但未被确认的数据 导致可用的窗口为0么?
    简书闪电侠:@远山_b260 对,可以这么理解
  • wggfxn:您好,想请教一个问题。
    我在循环给客户端推送消息的时候,不同的客户端推送不同的对象,出现问题:所有的客户端都收到同样的对象,如果在ctx.writeAndFlush(request); 加上Thread.sleep(1000); 则正常。
    tmp.name 就是客户端的IP,作为id标记ChannelHandlerContext
    // 遍历发送消息
    for (User tmp : resultList) {
    String result = tmp.getValue();

    if (result != null && result.length() > 0 && )) {
    request.setRecord(result);
    request.setDesc("id&name&pwd");
    request.setOpr("init");

    NettyChannelMap.get(tmp.getName()).writeAndFlush(request);
    Thread.sleep(1000);

    }
    wggfxn:@简书闪电侠

    package com.tiptop.netty.server;

    /**
    * 服务端启动类
    *
    * @Author wgg
    *
    */
    @component
    public class ServerBootstrapStarter {
    public void start(int serverPort) {

    ServerBootstrap serverBootstrap = new ServerBootstrap();

    serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
    .option(ChannelOption.TCP_NODELAY, true)
    //保持长连接状态
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    //.handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();

    pipeline.addLast("logging",new LoggingHandler(LogLevel.DEBUG));
    pipeline.addLast("encoder", new MsgEncoder());
    pipeline.addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(65 *1024, 2, 2,0,2));
    //pipeline.addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(65 * 1024, 0, 2));
    pipeline.addLast("decoder", new MsgDecoder());
    //pipeline.addLast(new IdleStateHandler(120, 100, 60, TimeUnit.SECONDS));
    //pipeline.addLast(new HeartBeatReqHandler());
    pipeline.addLast("ping", new IdleStateHandler(120, 110, 100,TimeUnit.SECONDS));
    pipeline.addLast("handler", new ServerDispatcherHandler());
    }

    });

    ChannelFuture bindFuture = serverBootstrap.bind(serverPort);
    bindFuture.addListener(new ChannelFutureListener() {

    public void operationComplete(ChannelFuture channelFuture) throws Exception {
    if (channelFuture.isSuccess()) {
    System.out.println("Server bound");
    } else {
    System.err.println("Bound attempt failed");
    channelFuture.cause().printStackTrace();
    }
    }

    });
    }
    }
    wggfxn:@简书闪电侠
    下面是几个主要实现类代码,推送给客户端的时候,出问题的点在: 遍历客户端,根据不同客户端修改msg,但是所有客户端总是收到同样的msg,如果增加 Thread.sleep(1000); 服务端就推送正常了,不同的客户端就推送出不同的msg, 百思不得其解。

    package com.tiptop.netty.server;

    import java.util.Iterator;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.ConcurrentHashMap;

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.socket.SocketChannel;

    /**
    * 记录客户端连接
    *
    * @Author wgg
    *
    */
    public class NettyChannelMap {
    public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<String, ChannelHandlerContext>();

    public static int count() {
    return map.size();
    }

    public static void add(String clientId, ChannelHandlerContext ctx) {

    map.put(clientId, ctx);
    }

    public static ChannelHandlerContext get(String clientId) {
    return map.get(clientId);
    }

    public static void remove(ChannelHandlerContext ctx) {

    for (Iterator<Entry<String, ChannelHandlerContext>> iterator = map.entrySet().iterator(); iterator.hasNext();) {
    Entry<String, ChannelHandlerContext> entry = iterator.next();
    if (entry.getValue() == ctx) {

    map.remove(entry.getKey());
    }
    }
    }
    }

    }
    简书闪电侠:你好,问题解决了吗?这个代码貌似不完整,可以多贴一些代码吗?
  • e4783408d583:你好,qps峰值是指单节点发送消息的峰值吗?
  • 鋒Nic:长连接出现Old GC不减反增的异常场景是不是都跟ChannelOutboundBuffer把失败的也做了缓存没放弃掉导致的?
    简书闪电侠:目前遇到的是这样,也要看具体场景,比如最场景的,一个map保存每条连接的所有的数据,连接关掉之后没有去释放,也会出现old gc
  • zhaiydong:你好,我在安卓上用tcp向服务器发送消息,每秒大约3M的数据,内存持续增长导致系统奔溃,发现bytebuf占用大量内存, 能请教下如何解决吗?
    简书闪电侠:@zhaiydong 是自己创建的bytebuf吗?用完需要释放
  • justge:您好 , 请教下有没有遇到过客户端channelread死循环接收到服务器返回的最后一条消息,关掉服务端,channelread也无限接收到那条消息
    简书闪电侠:@justge 还是上一段代码看看吧
    justge:@the_flash 我是用的长连接 都使用的ChannelInboundHandlerAdapter share的,压测的时候,客户端重复接收到同一个调用返回,停掉服务端也能接收到,最后CPU满了 内存也撑满了,而且客户端的接收队列感觉一直不减少都维持在一个数目,服务端收发队列均为0,感觉channelread在重复消费接收队列的数据,未确认
    简书闪电侠:死循环接受消息指的是?能上一段代码看看吗
  • 15ed69f426a3:您好,想请教一个问题。
    据我了解ChannelOutboundBuffer中的message buffer集合是由DirectBuffer组成的(buffer集合是一个ChannelOutboundBuffer.Entry组成的链表,Entry的msg字段也是DirectBuffer实现),
    可是在您的截图中显示出ChannelOutboundBuffer占用了33%的heap 内存,这些heap内存是什么对象占用的?
    15ed69f426a3:@the_flash 好的,谢谢
    简书闪电侠:@杨硕_7868 其实就是和这些堆外内存相关的java对象,比如,引用一段堆外内存可能就需要一个java引用,然后这个引用上面还有其他对象的引用,这些就是分配在java heap上的

本文标题:一次netty"引发的"诡异old gc问题

本文链接:https://www.haomeiwen.com/subject/jeuuettx.html