美文网首页Springboot
Netty + websocket聊天室

Netty + websocket聊天室

作者: Cool_Pomelo | 来源:发表于2020-05-06 15:52 被阅读0次

    Netty + websocket聊天室

    程序处理逻辑

    图1.png

    启用websocket

    从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;它可能会发生在启动时,也可能会发生在请求了某个特定的URL之后

    约定:

    • 如果被请求的 URL 以/ws 结尾,那么我们将会把该协议升级为 WebSocket;

    • 否则,服务器将使用基本的 HTTP/S

    服务器逻辑:

    图2.png

    处理http请求以及websocket

    图3.png
    
    //扩展 SimpleChannel-InboundHandler 以处理FullHttpRequest 消息
    public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
        private final String wsUri;
        private static final File INDEX;
    
        static {
            URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
            try {
                String path = location.toURI() + "index.html";
                path = !path.contains("file:") ? path : path.substring(5);
                INDEX = new File(path);
            } catch (URISyntaxException e) {
                throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
            }
        }
    
        public HttpRequestHandler(String wsUri) {
            this.wsUri = wsUri;
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
    
            if (wsUri.equalsIgnoreCase(request.uri())) {//如果请求了 WebSocket协议升级,则增加引用 计数(调用retain()方法),并将它传递给下一个ChannelInboundHandler
                ctx.fireChannelRead(request.retain()); //
            } else {
                if (HttpUtil.is100ContinueExpected(request)) {//处理 100 Continue请求以符合 HTTP 1.1 规范
                    send100Continue(ctx); //
                }
    
                RandomAccessFile file = new RandomAccessFile(INDEX, "r");//
    
                HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
    
                boolean keepAlive = HttpUtil.isKeepAlive(request);
    
                if (keepAlive) { //如果请求了keep-alive,则添加所需要的HTTP 头信息
                    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                }
                ctx.write(response); //将 HttpResponse写到客户端
    
                if (ctx.pipeline().get(SslHandler.class) == null) { //将 index.html写到客户端
                    ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
                } else {
                    ctx.write(new ChunkedNioFile(file.getChannel()));
                }
                ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//写LastHttpContent并冲刷至客户端
                if (!keepAlive) {//如果没有请求keep-alive,则在写操作完成后关闭 Channel
                    future.addListener(ChannelFutureListener.CLOSE); //
                }
    
                file.close();
            }
        }
    
        private static void send100Continue(ChannelHandlerContext ctx) {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("Client:" + incoming.remoteAddress() + "异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    
    
    
    public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
    
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 
            Channel incoming = ctx.channel();
            for (Channel channel : channels) {
                if (channel != incoming) {
                    channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
                } else {
                    channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text()));
                }
            }
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 
            Channel incoming = ctx.channel();
    
            // 广播
            channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
    
            channels.add(incoming);
            System.out.println("Client:" + incoming.remoteAddress() + "加入");
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 
            Channel incoming = ctx.channel();
    
            // 广播
            channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
    
            System.out.println("Client:" + incoming.remoteAddress() + "离开");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception { // 
            Channel incoming = ctx.channel();
            System.out.println("Client:" + incoming.remoteAddress() + "在线");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 
            Channel incoming = ctx.channel();
            System.out.println("Client:" + incoming.remoteAddress() + "掉线");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // 
                throws Exception {
            Channel incoming = ctx.channel();
            System.out.println("Client:" + incoming.remoteAddress() + "异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    
    
    }
    
    
    
    public class WebSocketChatServerInitializer extends ChannelInitializer<SocketChannel> {
    
    
        @Override
        public void initChannel(SocketChannel ch) throws Exception {//(2)
            ChannelPipeline pipeline = ch.pipeline();
    
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new HttpObjectAggregator(64*1024));
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new HttpRequestHandler("/ws"));
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            pipeline.addLast(new TextWebSocketFrameHandler());
    
        }
    }
    
    
    

    Netty文档

    ChannelGroup

    线程安全的Set,包含开放的Channel,并在其上提供各种批量操作。 使用ChannelGroup,您可以将Channels划分为有意义的组(例如,基于每个服务或每个状态)。一个封闭的Channels会自动从集合中删除,因此您不必担心它的生命周期。 添加频道。 一个Channel可以属于多个ChannelGroup。

    将消息广播到多个频道

    如果需要将消息广播到多个频道,则可以添加与收件人关联的频道并调用write(Object):

     ChannelGroup recipients =
             new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     recipients.add(channelA);
     recipients.add(channelB);
     ..
     recipients.write(Unpooled.copiedBuffer(
             "Service will shut down for maintenance in 5 minutes.",
             CharsetUtil.UTF_8));
    
    

    使用ChannelGroup简化关机过程

    如果ServerChannels和非ServerChannels都存在于同一ChannelGroup中,则首先对ServerChannels执行此组上所有请求的I / O操作,然后对其他Channels执行。

    一次关闭服务器时,此规则非常有用:

     ChannelGroup allChannels =
             new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
     public static void main(String[] args) throws Exception {
         ServerBootstrap b = new ServerBootstrap(..);
         ...
         b.childHandler(new MyHandler());
    
         // Start the server
         b.getPipeline().addLast("handler", new MyHandler());
         Channel serverChannel = b.bind(..).sync();
         allChannels.add(serverChannel);
    
         ... Wait until the shutdown signal reception ...
    
         // Close the serverChannel and then all accepted connections.
         allChannels.close().awaitUninterruptibly();
     }
    
     public class MyHandler extends ChannelInboundHandlerAdapter {
          @Override
         public void channelActive(ChannelHandlerContext ctx) {
             // closed on shutdown.
             allChannels.add(ctx.channel());
             super.channelActive(ctx);
         }
     }
    
    

    参考

    << Netty实战 >>

    相关文章

      网友评论

        本文标题:Netty + websocket聊天室

        本文链接:https://www.haomeiwen.com/subject/qgewghtx.html