美文网首页spring boot 相关
springboot+netty,webSocket与modbu

springboot+netty,webSocket与modbu

作者: 帷幕丶归心 | 来源:发表于2021-06-08 08:50 被阅读0次

    准备工作

      springboot 2.4.2
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.59.Final</version>
     </dependency>
    

    1.创建NettyServer.java,关键代码如下

    public class NettyServer {
        private static final Logger logger = LoggerFactory.getLogger("-----NettyServer-----");
        private RedisUtil redisUtil;
        private HandlerService handlerService;
    
        private static ChannelGroup deviceChannelGroup;
        private static Map<String, ChannelId> deviceMap = new ConcurrentHashMap<>();
        /**
         * WEB-SOCKET
         */
        private static ChannelGroup socketChannelGroup;
        private static Map<String, ChannelId> socketMap = new ConcurrentHashMap<>();
    
        /**
         * bossGroup就是parentGroup,是负责处理TCP/IP连接的
         */
        private EventLoopGroup bossGroup = null;
        /**
         * workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
         */
    
        private EventLoopGroup workerGroup = null;
    
        public NettyServer(RedisUtil redisUtil, HandlerService handlerService) {
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup();
            this.redisUtil = redisUtil;
            this.handlerService = handlerService;
            deviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            socketChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    
        public void bind(int tcp,int socket) throws Exception {
            ServerBootstrap device = new ServerBootstrap();
            device.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //初始化服务端可连接队列,指定了队列的大小128
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    // 绑定客户端连接时候触发操作
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sh) throws Exception {
                            InetSocketAddress address = sh.remoteAddress();
    
                            logger.debug("TCP 客户端IP:" + address.getAddress() + ":" + address.getPort());
                            sh.pipeline()
                                    //项目需要,定长消息,可以替换为其他的
                                    .addLast(new FixedLengthFrameDecoder(10))
                                    //消息处理
                                    .addLast("HeartBeat", new HeartBeatHandler(redisUtil, handlerService));
                        }
                    });
            //绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
    
            ServerBootstrap webSocket = new ServerBootstrap();
            webSocket.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //初始化服务端可连接队列,指定了队列的大小128
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    // 绑定客户端连接时候触发操作
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sh) throws Exception {
                            InetSocketAddress address = sh.remoteAddress();
    
                            logger.debug("WEB SOCKET客户端IP:" + address.getAddress() + ":" + address.getPort());
                            sh.pipeline()
                                    .addLast(new HttpServerCodec())
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(new HttpObjectAggregator(65535))
                                    .addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65535))
                                    .addLast(new WebSocketHandler());
                        }
                    });
            //绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
            ChannelFuture futureDevice = device.bind(tcp).sync();
            ChannelFuture futureWebSocket = webSocket.bind(socket).sync();
            if (futureDevice.isSuccess()) {
                logger.debug("TCP 服务端启动成功");
            } else {
                logger.debug("TCP 服务端启动失败");
                futureDevice.cause().printStackTrace();
                bossGroup.shutdownGracefully(); //关闭线程组
                workerGroup.shutdownGracefully();
            }
            if (futureWebSocket.isSuccess()) {
                logger.debug("WEB-SOCKET服务端启动成功");
            } else {
                logger.debug("WEB-SOCKET服务端启动失败");
                futureWebSocket.cause().printStackTrace();
                bossGroup.shutdownGracefully(); //关闭线程组
                workerGroup.shutdownGracefully();
            }
            //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
            futureDevice.channel().closeFuture().sync();
            futureWebSocket.channel().closeFuture().sync();
    
        }
    
        public void unbind() {
            if (null != bossGroup && !bossGroup.isShutdown()) {
                bossGroup.shutdownGracefully();
                bossGroup = null;
            }
            if (null != workerGroup && !workerGroup.isShutdown()) {
                workerGroup.shutdownGracefully();
                workerGroup = null;
            }
        }
    
        /**
         * WEB-SOCKET 操作 开始
         */
        public static void socketAdd(Channel channel) {
            socketChannelGroup.add(channel);
        }
    
        public static void socketRemove(Channel channel) {
            socketChannelGroup.remove(channel);
            removeSocketChannelId(channel.id());
        }
    
        public static ChannelGroup socketChannelGroup() {
            return socketChannelGroup;
        }
    
        public static void putSocketChannelId(String code, ChannelId channelId) {
            socketMap.put(code, channelId);
        }
    
        public static void removeSocketChannelId(ChannelId channelId) {
            socketMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
        }
        public static ChannelId socketChannelId(String code) {
            return socketMap.getOrDefault(code, null);
        }
    
        public static Channel socketChannel(ChannelId channelId){
            return socketChannelGroup.find(channelId);
        }
        public static Map<String,ChannelId> socketMap(){
            return socketMap;
        }
    
        /**
         * WEB-SOCKET 操作结束
         * DEVICE 操作 开始
         */
        public static void deviceAdd(Channel channel) {
            deviceChannelGroup.add(channel);
        }
    
        public static void deviceRemove(Channel channel) {
            deviceChannelGroup.remove(channel);
            removeDeviceChannelId(channel.id());
        }
    
        public static ChannelGroup deviceChannelGroup() {
            return deviceChannelGroup;
        }
    
        public static void putDeviceChannelId(String code, ChannelId channelId) {
            deviceMap.put(code, channelId);
        }
    
        public static void removeDeviceChannelId(ChannelId channelId) {
            deviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
        }
    
        public static ChannelId deviceChannelId(String code) {
            return deviceMap.getOrDefault(code, null);
        }
    
        public static Channel deviceChannel(ChannelId channelId){
            return deviceChannelGroup.find(channelId);
        }
        public static Map<String,ChannelId> deviceMap(){
            return deviceMap;
        }
        /**
         * DEVICE 操作 结束
         */
    }
    

    2.创建对应消息处理类

    2.1(ModBus)消息处理类 HeartBeatHandler.java
    @ChannelHandler.Sharable
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
        private static final Logger logger = LoggerFactory.getLogger("-----HeartBeatHandler-----");
        private RedisUtil redisUtil;
        private HandlerService handlerService;
    
        public HeartBeatHandler(RedisUtil redisUtil, HandlerService handlerService) {
            this.redisUtil = redisUtil;
            this.handlerService = handlerService;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //以下为示例代码,具体按实际功能需求来;
            String code = "具体获取code操作";
            sendMessageToWebSocket(code,"发送消息");
        }
    
        public void sendMessageToWebSocket(String code, String message) {
            ChannelId channelId = NettyServer.socketChannelId(code);
            if (channelId != null) {
                Channel socketChannel = NettyServer.socketChannel(channelId);
                if (socketChannel != null) {
                    socketChannel.writeAndFlush(new TextWebSocketFrame(message)).addListener((ChannelFutureListener) future -> {
                        logger.info("WEB SOCKET {},{}", code, message);
                        logger.info("WEB SOCKET DONE:{}", future.isDone());
                        logger.info("WEB SOCKET SUCCESS:{}", future.isSuccess());
                    });
                } else {
                    logger.info("channels is null");
                }
            } else {
                logger.info("channelsId is null");
            }
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            logger.info("接收到客户端信息完成");
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof Exception) {
                logger.info("异常捕获");
                cause.printStackTrace();
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("CLIENT" + getRemoteAddress(ctx) + " 接入连接");
            NettyServer.deviceAdd(ctx.channel());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            logger.info("CLIENT" + getRemoteAddress(ctx) + " 断开连接");
            NettyServer.deviceRemove(ctx.channel());
            ctx.close();
        }
    
        public static String getRemoteAddress(ChannelHandlerContext ctx) {
            return ctx.channel().remoteAddress().toString();
        }
    
    2.2(WebSocket)消息处理类 WebSocketHandler.java
    public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channel.id();
            logger.info("与客户端建立连接,通道开启!channelId:{}",channel.id());
            // 添加到channelGroup通道组
            NettyServer.socketAdd(ctx.channel());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            logger.info("与客户端建立连接,通道关闭!");
            NettyServer.socketRemove(ctx.channel());
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            logger.info("服务器收到的数据:" + msg.text());
            NettyServer.putSocketChannelId(msg.text(),ctx.channel().id());
            //简易的保持心跳
            sendMessage(ctx);
        }
    
        private void sendMessage(ChannelHandlerContext ctx) {
            logger.info("服务器回复:0");
            ctx.channel().writeAndFlush(new TextWebSocketFrame("0")).addListener((ChannelFutureListener) future -> {
                logger.info("WEB-SOCKET 心跳回复:0");
                logger.info("WEB SOCKET DONE:{}",future.isDone());
                logger.info("WEB SOCKET SUCCESS:{}",future.isSuccess());
            });;
        }
    
        private void sendAllMessage() {
            String message = "发送群消息";
            NettyServer.socketChannelGroup().writeAndFlush(new TextWebSocketFrame(message));
        }
    

    3.使用方法

    在对应的SpringBoot 启动类中使用
    @Component
    public static class StartApplication implements ApplicationRunner {
      private NettyServer nettyServer;
          @Resource
          private HandlerService handlerService;
          @Resource
          private RedisUtil redis;
          @Override
          public void run(ApplicationArguments args) throws Exception {
              logger.info("进程开启!");
              nettyServer = new NettyServer(redis, handlerService);
              nettyServer.bind(port1,port2);
          }
          @PreDestroy
          public void destroy() throws Exception {
              logger.info("进程关闭!");
              nettyServer.unbind();
            }
    }
    

    4.至此,功能完成

    感谢您的关注,有使用不当的地方,请指正,愿共勉··· ···
    不接收辱骂,如有不适,请移步··· ···
    

    相关文章

      网友评论

        本文标题:springboot+netty,webSocket与modbu

        本文链接:https://www.haomeiwen.com/subject/ncrosltx.html