美文网首页
SpringBoot整合Netty简单Demo之网页聊天室

SpringBoot整合Netty简单Demo之网页聊天室

作者: 徐森威 | 来源:发表于2017-10-13 13:55 被阅读0次

    利用WebSocket实现

    说到网页聊天室一般都是使用WebSocket长连接进行数据交互和双端数据发送,本人也已经整合了一整套依赖于springboot-websocket包的网络交互Demo,具体功能如下:

    1. 多用户群聊
    2. 点对点私聊
    3. 实时消息通知
    4. 在线用户显示
    5. 上线、断线等实时监听
    6. 其他在线通讯
    WebSocket依赖包
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    

    SpringBoot简单整合Netty

    在Netty中可以集成WebSocket,以下Demo只实现了用户群聊,其他功能可加逻辑处理自行扩展

    • NettyApplication(启动类)
      @PropertySource(value= "classpath:/nettyserver.properties")
      @SpringBootApplication
      public class NettyApplication {
    
        @Value("${tcp.port}")
        private int tcpPort;
    
        @Value("${boss.thread.count}")
        private int bossCount;
    
        @Value("${worker.thread.count}")
        private int workerCount;
    
        @Value("${so.keepalive}")
        private boolean keepAlive;
    
        @Value("${so.backlog}")
        private int backlog;
    
        @Bean(name = "serverBootstrap")
        public ServerBootstrap bootstrap() {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup(), workerGroup())
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(nettyWebSocketChannelInitializer);
            Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
            Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
            for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
                b.option(option, tcpChannelOptions.get(option));
            }
            return b;
        }
    
        @Autowired
        @Qualifier("somethingChannelInitializer")
        private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;
    
        @Bean(name = "tcpChannelOptions")
        public Map<ChannelOption<?>, Object> tcpChannelOptions() {
            Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
            options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
            options.put(ChannelOption.SO_BACKLOG, backlog);
            return options;
        }
    
        @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
        public NioEventLoopGroup bossGroup() {
            return new NioEventLoopGroup(bossCount);
        }
    
        @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
        public NioEventLoopGroup workerGroup() {
            return new NioEventLoopGroup(workerCount);
        }
    
        @Bean(name = "tcpSocketAddress")
        public InetSocketAddress tcpPort() {
            return new InetSocketAddress(tcpPort);
        }
    
        public static void main(String[] args) throws Exception{
            ConfigurableApplicationContext context = SpringApplication.run(NettyApplication.class, args);
            TCPServer tcpServer = context.getBean(TCPServer.class);
            tcpServer.start();
        }
    }
    
    • TCPServer(启动Netty服务)
    @Component
    public class TCPServer {
    
       @Autowired
       @Qualifier("serverBootstrap")
       private ServerBootstrap serverBootstrap;
    
       @Autowired
       @Qualifier("tcpSocketAddress")
       private InetSocketAddress tcpPort;
    
       private Channel serverChannel;
    
       public void start() throws Exception {
           serverChannel =  serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
       }
    
       @PreDestroy
       public void stop() throws Exception {
           serverChannel.close();
           serverChannel.parent().close();
       }
    
       public ServerBootstrap getServerBootstrap() {
           return serverBootstrap;
       }
    
       public void setServerBootstrap(ServerBootstrap serverBootstrap) {
           this.serverBootstrap = serverBootstrap;
       }
    
       public InetSocketAddress getTcpPort() {
           return tcpPort;
       }
    
       public void setTcpPort(InetSocketAddress tcpPort) {
           this.tcpPort = tcpPort;
       }
    }
    
    • NettyWebSocketChannelInitializer(添加自定义handler)
    @Component
    @Qualifier("somethingChannelInitializer")
    public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Autowired
        private TextWebSocketFrameHandler textWebSocketFrameHandler;
    
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new HttpObjectAggregator(65536));
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            pipeline.addLast(textWebSocketFrameHandler);   //这里不能使用new,不然在handler中不能注入依赖
    
        }
    }
    
    • TextWebSocketFrameHandler(自定义操作类)
    @Component
    @Qualifier("textWebSocketFrameHandler")
    @ChannelHandler.Sharable
    public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Autowired
        private RedisDao redisDao;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx,
                                    TextWebSocketFrame msg) throws Exception {
            Channel incoming = ctx.channel();
            String uName = redisDao.getString(incoming.id()+"");
            for (Channel channel : channels) {
                if (channel != incoming){
                    channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text()));
                } else {
                    channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
                }
            }
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
            System.out.println(ctx.channel().remoteAddress());
            String uName = new RandomName().getRandomName();  //用来获取一个随机的用户名,可以用其他方式代替
    
            Channel incoming = ctx.channel();
            for (Channel channel : channels) {
                channel.writeAndFlush(new TextWebSocketFrame("[新用户] - " + uName + " 加入"));
            }
            redisDao.saveString(incoming.id()+"",uName);   //存储用户
            channels.add(ctx.channel());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 
            Channel incoming = ctx.channel();
            String uName = redisDao.getString(String.valueOf(incoming.id()));
            for (Channel channel : channels) {
                channel.writeAndFlush(new TextWebSocketFrame("[用户] - " + uName + " 离开"));
            }
            redisDao.deleteString(String.valueOf(incoming.id()));   //删除用户
            redisDao.saveString("cacheName",redisDao.getString("cacheName").replaceAll(uName,""));   //标准已经使用的用户名
            channels.remove(ctx.channel());  
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception { 
            Channel incoming = ctx.channel();
            System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"在线");
        }
    
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
            Channel incoming = ctx.channel();
            System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"掉线");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"异常");
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    

    这边使用Redis保存用户名和ChannelId来不同浏览器登录的用户

    • channelRead0:定义接收到消息的操作
    • handlerAdded:定义新用户连接的操作
    • handlerRemoved:定义用户离开的操作
    • channelActive:定义用户在线的操作
    • channelInactive:定义用户离线的操作
    • exceptionCaught:定义用户异常的操作

    如果要在Controller中使用Channel向客户端发送数据,只要注入TextWebSocketFrameHandler,取得其中的ChannelGroup,再通过自己逻辑处理后存储的ChannelId来取得对应的Channel,即可向客户端发送消息

    Netty依赖包
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.16.Final</version>
    </dependency>
    
    • 前端代码
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>WebSocket Chat</title>
    </head>
    <body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8090/ws");
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data
            };
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "连接开启!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "连接被关闭";
            };
        } else {
            alert("你的浏览器不支持 WebSocket!");
        }
    
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("连接没有开启.");
            }
        }
        window.onbeforeunload = function(event) {
            event.returnValue = "刷新提醒";
        };
    </script>
    <form onsubmit="return false;">
        <h3>netty 聊天室:</h3>
        <textarea id="responseText" style="width: 400px; height: 300px;"></textarea>
        <br>
        <input type="text" name="message"  style="width: 300px" value="测试数据">
        <input type="button" value="发送消息" onclick="send(this.form.message.value)">
    </form>
    <br>
    <br>
    </body>
    </html>
    
    • nettyserver.properties
    tcp.port=8090
    boss.thread.count=2
    worker.thread.count=2
    so.keepalive=true
    so.backlog=100
    

    效果截图

    群聊效果截图

    Git地址
    https://github.com/zyf970617/springboot-netty-demo

    相关文章

      网友评论

          本文标题:SpringBoot整合Netty简单Demo之网页聊天室

          本文链接:https://www.haomeiwen.com/subject/dspnyxtx.html