美文网首页
grpc使用自定义业务线程池

grpc使用自定义业务线程池

作者: 江江的大猪 | 来源:发表于2021-05-21 10:24 被阅读0次
    • grpc默认的业务线程池是无限线程,大流量下容易线程爆炸
    @Slf4j
    @Component
    public class GrpcServer {
        @Resource
        private Config config;
        @Resource
        private GrpcMetadataHandler metadataHandler;
    
        private ThreadPoolExecutor executorService;
        private Server server;
    
        // executorService拒绝任务时服务端会有错误日志,返回给客户端 http2.0 internal error,客户端会抛异常
        @PostConstruct
        private void init() {
            log.info("GrpcServer init");
            executorService = new ThreadPoolExecutor(
                    config.getGrpcThreadCount(),
                    config.getGrpcThreadCount(),
                    0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(config.getGrpcThreadQueue()),
                    new CustomizableThreadFactory("GrpcServer"));
            server = NettyServerBuilder.forPort(config.getGrpcPort())
    //                .addService(metadataHandler)
                    .executor(executorService)
                    .maxConnectionIdle(10, TimeUnit.MINUTES)
                    .build();
            GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(() -> log.info("GrpcServer queue count:{}", executorService.getQueue().size()), 0, 1, TimeUnit.SECONDS);
        }
    
        @PreDestroy
        private void destroy() throws InterruptedException {
            log.info("GrpcServer shutdown ...");
            // 会阻塞到所有任务执行完,除非和客户端的连接断了,断了的话不会执行线程池中未完成的任务
            server.shutdown().awaitTermination();
            executorService.shutdown();
            while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                log.error("GrpcServer shutdown wait queue:{}", executorService.getQueue().size());
            }
            log.info("GrpcServer shutdown finished");
        }
    
        public void start() throws IOException {
            server.start();
        }
    }
    
    • 异常信息
    2021-05-19 22:24:01.868  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Exception in onHeadersRead()
    
    java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
        at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
        at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
        at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    
    2021-05-19 22:24:01.873  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error
    
    io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:167)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.newStreamException(NettyServerHandler.java:774)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:465)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
        at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
        at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
        at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
        at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
        ... 31 common frames omitted
    
    2021-05-19 22:24:01.874 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] OUTBOUND RST_STREAM: streamId=13 errorCode=2
    2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xe5f50b5a, L:/127.0.0.1:54061 - R:/127.0.0.1:10051] INBOUND RST_STREAM: streamId=13 errorCode=2
    2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] INBOUND DATA: streamId=13 padding=0 endStream=true length=50 bytes=000000002d0a0868615f6461696c7912214d515f494e53545f313937333238313236393636313136305f4742746439557878
    2021-05-19 22:24:01.876  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error
    
    io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an unknown stream 13
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    
    

    相关文章

      网友评论

          本文标题:grpc使用自定义业务线程池

          本文链接:https://www.haomeiwen.com/subject/cipmjltx.html