美文网首页JAVA进阶
6. Disruptor与Netty实现百万级长连接接入

6. Disruptor与Netty实现百万级长连接接入

作者: 香沙小熊 | 来源:发表于2021-06-24 10:14 被阅读0次

    1. Disruptor与Netty 架构

    image.png
    image.png
    与Netty网络通信框架整合提升性能:
    • 在使用Netty进行接收处理数据的时候,我们尽量都不要在工作线程(Handler)上编写自己的代码逻辑
    • 我们需要利用异步的机制,比如使用线程池异步处理,如果使用线程池就意味着使用阻塞队列,这里可以替换为Disruptor提高性能

    2.核心代码

    Commonlib
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.65.Final</version>
            </dependency>  
            <!-- 序列化框架marshalling -->
            <dependency> 
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling</artifactId>
                <version>2.0.12.Final</version>
            </dependency>           
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>2.0.12.Final</version>
            </dependency> 
            <!-- 序列化框架protobuf -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>2.5.0</version>
            </dependency>       
            <!-- netty end -->
          <dependency>
              <groupId>com.lmax</groupId>
              <artifactId>disruptor</artifactId>
              <version>3.4.4</version>
          </dependency>
    
    public class MessageProducer {
    
        private String producerId;
        
        private RingBuffer<TranslatorDataWapper> ringBuffer;
        
        public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
            this.producerId = producerId;
            this.ringBuffer = ringBuffer;
        }
        
        public void onData(TranslatorData data, ChannelHandlerContext ctx) {
            long sequence = ringBuffer.next();
            try {
                TranslatorDataWapper wapper = ringBuffer.get(sequence);
                wapper.setData(data);
                wapper.setCtx(ctx);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    
    }
    
    public class RingBufferWorkerPoolFactory {
    
        private static final Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
        private static final Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();
        private RingBuffer<TranslatorDataWapper> ringBuffer;
        private SequenceBarrier sequenceBarrier;
        private WorkerPool<TranslatorDataWapper> workerPool;
    
        private RingBufferWorkerPoolFactory() {
    
        }
    
        public static RingBufferWorkerPoolFactory getInstance() {
            return SingletonHolder.instance;
        }
    
        public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
            //1. 构建ringBuffer对象
            this.ringBuffer = RingBuffer.create(type,
                    new EventFactory<TranslatorDataWapper>() {
                        public TranslatorDataWapper newInstance() {
                            return new TranslatorDataWapper();
                        }
                    },
                    bufferSize,
                    waitStrategy);
            //2.设置序号栅栏
            this.sequenceBarrier = this.ringBuffer.newBarrier();
            //3.设置工作池
            this.workerPool = new WorkerPool<TranslatorDataWapper>(this.ringBuffer,
                    this.sequenceBarrier,
                    new EventExceptionHandler(), messageConsumers);
            //4 把所构建的消费者置入池中
            for (MessageConsumer mc : messageConsumers) {
                consumers.put(mc.getConsumerId(), mc);
            }
            //5 添加我们的sequences
            this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
            //6 启动我们的工作池
            this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2));
        }
    
        public MessageProducer getMessageProducer(String producerId) {
            MessageProducer messageProducer = producers.get(producerId);
            if (null == messageProducer) {
                messageProducer = new MessageProducer(producerId, this.ringBuffer);
                producers.put(producerId, messageProducer);
            }
            return messageProducer;
        }
    
        private static class SingletonHolder {
            static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
        }
    
        /**
         * 异常静态类
         *
         * @author Alienware
         */
        static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
            public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
            }
    
            public void handleOnStartException(Throwable ex) {
            }
    
            public void handleOnShutdownException(Throwable ex) {
            }
        }
        
    }
    
    Client端
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            
            /**
            try {
                TranslatorData response = (TranslatorData)msg;
                System.err.println("Client端: id= " + response.getId() 
                        + ", name= " + response.getName()
                        + ", message= " + response.getMessage());
            } finally {
                //一定要注意 用完了缓存 要进行释放
                ReferenceCountUtil.release(msg);
            }
            */
            TranslatorData response = (TranslatorData)msg;
            String producerId = "code:seesionId:002";
            MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
            messageProducer.onData(response, ctx);
            
            
        }
    }
    
    public class MessageConsumerImpl4Client extends MessageConsumer {
    
        public MessageConsumerImpl4Client(String consumerId) {
            super(consumerId);
        }
    
        public void onEvent(TranslatorDataWapper event) throws Exception {
            TranslatorData response = event.getData();
            ChannelHandlerContext ctx = event.getCtx();
            //业务逻辑处理:
            try {
                System.err.println("Client端: id= " + response.getId() 
                + ", name= " + response.getName()
                + ", message= " + response.getMessage());
            } finally {
                ReferenceCountUtil.release(response);
            }
        }
    
    }
    
    public class NettyClient {
    
        public static final String HOST = "127.0.0.1";
        public static final int PORT = 8765;
    
        //扩展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel> 
        private Channel channel;
    
        //1. 创建工作线程组: 用于实际处理业务的线程组
        private final EventLoopGroup workGroup = new NioEventLoopGroup();
    
        private ChannelFuture cf;
    
        public NettyClient() {
            this.connect(HOST, PORT);
        }
    
        private void connect(String host, int port) {
            //2 辅助类(注意Client 和 Server 不一样)
            Bootstrap bootstrap = new Bootstrap();
            try {
    
                bootstrap.group(workGroup)
                        .channel(NioSocketChannel.class)
                        //表示缓存区动态调配(自适应)
                        .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                        //缓存区 池化操作
                        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel sc) throws Exception {
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                                sc.pipeline().addLast(new ClientHandler());
                            }
                        });
                //绑定端口,同步等等请求连接
                this.cf = bootstrap.connect(host, port).sync();
                System.err.println("Client connected...");
    
                //接下来就进行数据的发送, 但是首先我们要获取channel:
                this.channel = cf.channel();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void sendData() {
    
            for (int i = 0; i < 10; i++) {
                TranslatorData request = new TranslatorData();
                request.setId("" + i);
                request.setName("请求消息名称 " + i);
                request.setMessage("请求消息内容 " + i);
                this.channel.writeAndFlush(request);
            }
        }
    
        public void close() throws Exception {
            cf.channel().closeFuture().sync();
            //优雅停机
            workGroup.shutdownGracefully();
            System.err.println("Sever ShutDown...");
        }
        
    
    }
    
    @SpringBootApplication
    public class NettyClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyClientApplication.class, args);
            
            MessageConsumer[] conusmers = new MessageConsumer[4];
            for(int i =0; i < conusmers.length; i++) {
                MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
                conusmers[i] = messageConsumer;
            }
            RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
                    1024*1024,
                    //new YieldingWaitStrategy(),
                    new BlockingWaitStrategy(),
                    conusmers);
            
            //建立连接 并发送消息
            new NettyClient().sendData();
        }
    }
    
    Server端
    public class MessageConsumerImpl4Server extends MessageConsumer {
    
        public MessageConsumerImpl4Server(String consumerId) {
            super(consumerId);
        }
    
        public void onEvent(TranslatorDataWapper event) throws Exception {
            TranslatorData request = event.getData();
            ChannelHandlerContext ctx = event.getCtx();
            //1.业务处理逻辑:
            System.err.println("Sever端: id= " + request.getId() 
            + ", name= " + request.getName() 
            + ", message= " + request.getMessage());
            
            //2.回送响应信息:
            TranslatorData response = new TranslatorData();
            response.setId("resp: " + request.getId());
            response.setName("resp: " + request.getName());
            response.setMessage("resp: " + request.getMessage());
            //写出response响应信息:
            ctx.writeAndFlush(response);
        }
    
    }
    
    public class NettyServer {
    
        public NettyServer() {
            //1. 创建两个工作线程组: 一个用于接受网络请求的线程组. 另一个用于实际处理业务的线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            //2 辅助类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            try {
                
                serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                //表示缓存区动态调配(自适应)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                //缓存区 池化操作
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });
                //绑定端口,同步等等请求连接
                ChannelFuture cf = serverBootstrap.bind(8765).sync();
                System.err.println("Server Startup...");
                cf.channel().closeFuture().sync();
            
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //优雅停机
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
                System.err.println("Sever ShutDown...");
            }
        }
        
    }
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            /**
            TranslatorData request = (TranslatorData)msg;
            System.err.println("Sever端: id= " + request.getId() 
                            + ", name= " + request.getName() 
                            + ", message= " + request.getMessage());
            //数据库持久化操作 IO读写 ---> 交给一个线程池 去异步的调用执行
            TranslatorData response = new TranslatorData();
            response.setId("resp: " + request.getId());
            response.setName("resp: " + request.getName());
            response.setMessage("resp: " + request.getMessage());
            //写出response响应信息:
            ctx.writeAndFlush(response);
            */
            TranslatorData request = (TranslatorData)msg;
            //自已的应用服务应该有一个ID生成规则
            String producerId = "code:sessionId:001";
            MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
            messageProducer.onData(request, ctx);
            
            
        }
        
    }
    
    @SpringBootApplication
    public class NettyServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyServerApplication.class, args);
            
            MessageConsumer[] conusmers = new MessageConsumer[4];
            for(int i =0; i < conusmers.length; i++) {
                MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
                conusmers[i] = messageConsumer;
            }
            RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
                    1024*1024,
                    //new YieldingWaitStrategy(),
                    new BlockingWaitStrategy(),
                    conusmers);
            
            new NettyServer();
        }
    }
    
    运行
    1. 先启动server
    2. 再启动client
    Client connected...
    Client端: id= resp: 3, name= resp: 请求消息名称 3, message= resp: 请求消息内容 3
    Client端: id= resp: 1, name= resp: 请求消息名称 1, message= resp: 请求消息内容 1
    Client端: id= resp: 2, name= resp: 请求消息名称 2, message= resp: 请求消息内容 2
    Client端: id= resp: 0, name= resp: 请求消息名称 0, message= resp: 请求消息内容 0
    Client端: id= resp: 6, name= resp: 请求消息名称 6, message= resp: 请求消息内容 6
    Client端: id= resp: 7, name= resp: 请求消息名称 7, message= resp: 请求消息内容 7
    Client端: id= resp: 4, name= resp: 请求消息名称 4, message= resp: 请求消息内容 4
    Client端: id= resp: 5, name= resp: 请求消息名称 5, message= resp: 请求消息内容 5
    Client端: id= resp: 8, name= resp: 请求消息名称 8, message= resp: 请求消息内容 8
    Client端: id= resp: 9, name= resp: 请求消息名称 9, message= resp: 请求消息内容 9
    
    特别感谢:

    阿神

    相关文章

      网友评论

        本文标题:6. Disruptor与Netty实现百万级长连接接入

        本文链接:https://www.haomeiwen.com/subject/wremyltx.html