美文网首页
2021-08-15_NettyWebsocketServer协

2021-08-15_NettyWebsocketServer协

作者: kikop | 来源:发表于2021-08-15 23:25 被阅读0次

    20210815_NettyWebsocketServer协议学习笔记2

    1概述

    WebSocket是为了解决HTTP协议中通信只能由客户端发起这个弊端而出现的,WebSocket基于HTTP5协议,借用HTTP进行握手、升级,能够做到轻量的、高效的、双向的在客户端和服务端之间传输文本数据。

    1.1功能模块

    实现核心的聊天功能,包括单发、群发、文件发送。

    系统只包括两个模块:登录模块和聊天管理模块。

    • 登录模块:既然作为一个系统,那么登录的角色认证是必不可少的,这里使用简单、传统的Session方式维持登录状态
    • 注销模块:当然也有对应的注销功能,但这里的注销除了清空Session对象,还要释放WebSocket连接,否则造成内存泄露。
    • 聊天管理模块:系统的核心模块,这部分主要使用Netty框架实现,功能包括信息、文件的单条和多条发送,也支持表情发送。
    • 其他模块:如好友管理模块、聊天记录管理、注册模块等,我并没有实现,有兴趣的话可以自行实现,与传统的开发方式类似。

    1.2http协议升级原理

    socket = new WebSocket("ws://localhost:8088");
    
    Request URL: ws://localhost:8088/
    Request Method: GET
    Status Code: 101 Switching Protocols
    
    // 协议请求头
    Accept-Encoding: gzip, deflate, br
    Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
    Cache-Control: no-cache
    Connection: Upgrade
    Host: localhost:8088
    Origin: http://localhost:8085
    Pragma: no-cache
    Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
    // Sec-WebSocket-Key
    Sec-WebSocket-Key: TR/ZKx61zqyqIVN7QbeJPQ==
    Sec-WebSocket-Version: 13
    // 协议升级标识:Upgrade
    Upgrade: websocket
    User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36
    
    // 协议响应
    connection: upgrade
    sec-websocket-accept: aRgINM6rA896S0mXaVCp+6yj99A=
    upgrade: websocket
    

    1.2.1源码分析

    1.2.1.2首次http请求

    DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
    GET / HTTP/1.1
    Host: localhost:8088
    Connection: Upgrade
    Pragma: no-cache
    Cache-Control: no-cache
    User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36
    Upgrade: websocket
    Origin: http://localhost:8085
    Sec-WebSocket-Version: 13
    Accept-Encoding: gzip, deflate, br
    Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
    Sec-WebSocket-Key: tWdJLe/IFtQ+RcA2MGbnXw==
    Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
    content-length: 0
    
    // 旧的channelPipeline:
    head:
    DefaultChannelPipeline$HeadContext
    -->第一个入站处理器:HttpServerCodec
    ---->HttpObjectAggregator
    ------>ChunkedWriteHandler
    -------->MyHttpRequestHandler
    ---------->MyWebSocketHandler
    ------------>DefaultChannelPipeline$TailContext
    

    image-20210815225454695.png

    1.2.1.3协议升级算法

    // 真正的协议升级
    webSocketServerHandshaker.handshake(channelHandlerContext.channel(), fullHttpRequest);
    
    // WebSocketServerHandshaker
    public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                            HttpHeaders responseHeaders, final ChannelPromise promise) {
    
        if (logger.isDebugEnabled()) {
            logger.debug("{} WebSocket version {} server handshake", channel, version());
        }
        FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
        ChannelPipeline p = channel.pipeline();
        // 1.移除HttpObjectAggregator
        if (p.get(HttpObjectAggregator.class) != null) {
            p.remove(HttpObjectAggregator.class);
        }
        // 2.移除HttpContentCompressor
        if (p.get(HttpContentCompressor.class) != null) {
            p.remove(HttpContentCompressor.class);
        }
        // 3.ctx==null
        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
        final String encoderName;
        if (ctx == null) {
            // this means the user use a HttpServerCodec
            ctx = p.context(HttpServerCodec.class);
            if (ctx == null) { // 前提要有:HttpServerCodec
                promise.setFailure(
                        new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                return promise;
            }
            // 4.增加websocket的编解码器,在 http-codec之前
            // newWebsocketDecoder
            p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
            p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
            encoderName = ctx.name();
        } else {
            p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
    
            encoderName = p.context(HttpResponseEncoder.class).name();
            p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
        }
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ChannelPipeline p = future.channel().pipeline();
                    // 5.移除 http-codec
                    p.remove(encoderName);
                    promise.setSuccess();
                } else {
                    promise.setFailure(future.cause());
                }
            }
        });
        return promise;
    }
    
    // 新的channelPipeline:
    head:
    DefaultChannelPipeline$HeadContext
    -->WebSocketFrame13Decoder
    ---->WebSocketFrame13EnDecoder
    ------>ChunkedWriteHandler
    -------->MyHttpRequestHandler
    ---------->MyWebSocketHandler
    ------------>DefaultChannelPipeline$TailContext
    

    1.2.1.4handlerWebSocketFrame

    /**
         * 描述:处理WebSocketFrame
         * @param ctx
         * @param frame
         * @throws Exception
         */
        private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
            // 关闭请求
            if (frame instanceof CloseWebSocketFrame) {
                WebSocketServerHandshaker handshaker =
                        Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
                if (handshaker == null) {
                    sendErrorMessage(ctx, "不存在的客户端连接!");
                } else {
                    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                }
                return;
            }
            // ping请求
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 只支持文本格式,不支持二进制消息
            if (!(frame instanceof TextWebSocketFrame)) {
                sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息");
            }
    
            // 客服端发送过来的消息
            // {"fromUserId":"002","toUserId":"001","content":"11","type":"SINGLE_SENDING"}
            String request = ((TextWebSocketFrame)frame).text();
            LOGGER.info("服务端收到新信息:" + request);
            JSONObject param = null;
            try {
                param = JSONObject.parseObject(request);
            } catch (Exception e) {
                sendErrorMessage(ctx, "JSON字符串转换出错!");
                e.printStackTrace();
            }
            if (param == null) {
                sendErrorMessage(ctx, "参数为空!");
                return;
            }
    
            String type = (String) param.get("type");
            switch (type) {
                case "REGISTER":
                    chatService.register(param, ctx);
                    break;
                case "SINGLE_SENDING":
                    chatService.singleSend(param, ctx);
                    break;
                case "GROUP_SENDING":
                    chatService.groupSend(param, ctx);
                    break;
                case "FILE_MSG_SINGLE_SENDING":
                    chatService.FileMsgSingleSend(param, ctx);
                    break;
                case "FILE_MSG_GROUP_SENDING":
                    chatService.FileMsgGroupSend(param, ctx);
                    break;
                default:
                    chatService.typeError(ctx);
                    break;
            }
        }
    

    2代码实战

    2.1maven依赖

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <artifactId>technicaltools</artifactId>
            <groupId>com.kikop</groupId>
            <version>1.0-SNAPSHOT</version>
            <!--定义依赖的父pom文件-->
            <relativePath>../pom.xml</relativePath>
        </parent>
    
        <groupId>com.kikop</groupId>
        <artifactId>mynettywebsocketserverdemo</artifactId>
    
        <properties>
    
        </properties>
    
        <dependencies>
    
            <!--1.共用模块-->
            <dependency>
                <groupId>com.kikop</groupId>
                <artifactId>mytechcommon</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
    
    
            <!--2.spring-webmvc-->
            <!--2.1.spring对web的支持,依赖 spring-webmvc,此时jar包会自动下载(spring-context、spring-web、spring-webmvc)-->
            <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${springframework.version}</version>
            </dependency>
    
            <!--2.2.spring dao层依赖(jdbc),内置tx-->
            <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-jdbc</artifactId>-->
            <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
    
    
            <!--&lt;!&ndash; Spring &ndash;&gt;-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-core</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-context</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-web</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-webmvc</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-orm</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-tx</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-test</artifactId>-->
                <!--<version>${springframework.version}</version>-->
                <!--<scope>test</scope>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-jdbc</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-expression</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-aop</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
            <!--<dependency>-->
                <!--<groupId>org.springframework</groupId>-->
                <!--<artifactId>spring-context-support</artifactId>-->
                <!--<version>${springframework.version}</version>-->
            <!--</dependency>-->
    
    
            <!--<dependency>-->
                <!--<groupId>javax.servlet</groupId>-->
                <!--<artifactId>javax.servlet-api</artifactId>-->
                <!--<version>3.0.1</version>-->
                <!--<scope>provided</scope>-->
            <!--</dependency>-->
    
    
            <!-- 通用工具包:commons-lang3 -->
            <!--<dependency>-->
                <!--<groupId>org.apache.commons</groupId>-->
                <!--<artifactId>commons-lang3</artifactId>-->
                <!--<version>3.4</version>-->
                <!--&lt;!&ndash;对于scope=compile的情况(默认scope),也就是说这个项目在编译,测试,运行阶段都需要这个artifact对应的jar包在classpath中。&ndash;&gt;-->
                <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
            <!--</dependency>-->
    
    
            <!-- 8.文件上传:commons-fileupload -->
            <!-- used by <bean id="multipartResolver"-->
            <dependency>
                <groupId>commons-fileupload</groupId>
                <artifactId>commons-fileupload</artifactId>
                <version>1.3.3</version>
            </dependency>
    
    
            <!--5.netty-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <!--<version>${netty.version}</version>-->
                <version>4.1.10.Final</version>
                <!--<version>4.1.2.Final</version>-->
            </dependency>
    
            <!-- 6.jackson包 -->
            <!--配合Spring @ResponseBody注解,以及json工具包-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.9.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.9.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.9.0</version>
            </dependency>
    
            <!-- 7.log4j日志包 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.6.6</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.6.6</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
    
        </dependencies>
    
    
        <!--<build>-->
        <!--<finalName>WebSocket</finalName>-->
        <!--<plugins>-->
        <!--<plugin>-->
        <!--<groupId>org.apache.maven.plugins</groupId>-->
        <!--<artifactId>maven-compiler-plugin</artifactId>-->
        <!--<version>3.6.2</version>-->
        <!--<configuration>-->
        <!--<source>1.8</source>-->
        <!--<target>1.8</target>-->
        <!--<optimize>true</optimize>-->
        <!--</configuration>-->
        <!--</plugin>-->
        <!--</plugins>-->
        <!--</build>-->
    
    
    </project>
    

    2.2http协议升级处理器

    package com.kikop.websocket.handler;
    
    import com.kikop.utils.Constant;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    import org.springframework.stereotype.Component;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettywebsocketserverdemo
     * @file Name: MyHttpRequestHandler
     * @desc http协议升级处理器
     * @date 2021/6/28
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    
    @Component("myHttpRequestHandler")
    
    // 标有@Sharable的Handler,代表了他是一个可以被分享的handler,
    // 这就是说服务器注册了这个handler后,
    // 可以分享给多个客户端使用,如果没有使用该注解,
    // 则每次客户端请求时,都必须重新创建一个handler。
    // https://blog.csdn.net/qq_34354257/article/details/90901850
    @ChannelHandler.Sharable
    public class MyHttpRequestHandler extends SimpleChannelInboundHandler<Object> {
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
            if (msg instanceof FullHttpRequest) {
                handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {
                channelHandlerContext.fireChannelRead(((WebSocketFrame) msg).retain());
            }
        }
    
    
        /**
         * http 到 websocket协议的升级
         *
         * @param channelHandlerContext
         * @param fullHttpRequest
         */
        private void handleHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    
            if (!fullHttpRequest.getDecoderResult().isSuccess()) {
                sendHttpResponse(channelHandlerContext, fullHttpRequest,
                        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }
    
            // 握手实例管理
            WebSocketServerHandshakerFactory webSocketServerHandshakerFactory = new WebSocketServerHandshakerFactory("ws:/" + channelHandlerContext.channel() + "/websocket",
                    null, false);
    
    
            WebSocketServerHandshaker webSocketServerHandshaker = webSocketServerHandshakerFactory.newHandshaker(fullHttpRequest);
            Constant.webSocketHandshakerMap.put(channelHandlerContext.channel().id().asLongText(), webSocketServerHandshaker);
    
            if (webSocketServerHandshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channelHandlerContext.channel());
            } else {
                webSocketServerHandshaker.handshake(channelHandlerContext.channel(), fullHttpRequest);
    
            }
        }
    
        private void sendHttpResponse(ChannelHandlerContext channelHandlerContext,
                                      FullHttpRequest fullHttpRequest,
                                      DefaultFullHttpResponse defaultFullHttpResponse) {
    
    
            if (defaultFullHttpResponse.status().code() != 200) {
                ByteBuf byteBuf = Unpooled.copiedBuffer(defaultFullHttpResponse.status().toString(), CharsetUtil.UTF_8);
                defaultFullHttpResponse.content().writeBytes(byteBuf);
                byteBuf.release();
            }
    
    //        非 keeplive
    
            boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
            ChannelFuture channelFuture = channelHandlerContext.channel().writeAndFlush(defaultFullHttpResponse);
            if (!keepAlive) {
                channelFuture.addListener(ChannelFutureListener.CLOSE);
            }
    
        }
    }
    

    2.3websocket协议数据处理器

    package com.kikop.websocket.handler;
    
    import com.alibaba.fastjson.JSONObject;
    import com.kikop.model.vo.MyResponseJson;
    import com.kikop.service.IChatService;
    import com.kikop.service.impl.ChatServiceImpl;
    import com.kikop.utils.Constant;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.websocketx.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettywebsocketserverdemo
     * @file Name: MyWebSocketTask
     * @desc websocket协议数据处理器
     * @date 2021/6/28
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    @Component("myWebSocketHandler")
    @ChannelHandler.Sharable
    public class MyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyWebSocketHandler.class);
    
        @Autowired
        private IChatService chatService;
    
        /**
         * 描述:读取完连接的消息后,对消息进行处理。
         *      这里主要是处理WebSocket请求
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
            handlerWebSocketFrame(ctx, msg);
        }
    
        /**
         * 描述:处理WebSocketFrame
         * @param ctx
         * @param frame
         * @throws Exception
         */
        private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
            // 关闭请求
            if (frame instanceof CloseWebSocketFrame) {
                WebSocketServerHandshaker handshaker =
                        Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
                if (handshaker == null) {
                    sendErrorMessage(ctx, "不存在的客户端连接!");
                } else {
                    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                }
                return;
            }
            // ping请求
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 只支持文本格式,不支持二进制消息
            if (!(frame instanceof TextWebSocketFrame)) {
                sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息");
            }
    
            // 客服端发送过来的消息
            String request = ((TextWebSocketFrame)frame).text();
            LOGGER.info("服务端收到新信息:" + request);
            JSONObject param = null;
            try {
                param = JSONObject.parseObject(request);
            } catch (Exception e) {
                sendErrorMessage(ctx, "JSON字符串转换出错!");
                e.printStackTrace();
            }
            if (param == null) {
                sendErrorMessage(ctx, "参数为空!");
                return;
            }
    
            String type = (String) param.get("type");
            switch (type) {
                case "REGISTER":
                    chatService.register(param, ctx);
                    break;
                case "SINGLE_SENDING":
                    chatService.singleSend(param, ctx);
                    break;
                case "GROUP_SENDING":
                    chatService.groupSend(param, ctx);
                    break;
                case "FILE_MSG_SINGLE_SENDING":
                    chatService.FileMsgSingleSend(param, ctx);
                    break;
                case "FILE_MSG_GROUP_SENDING":
                    chatService.FileMsgGroupSend(param, ctx);
                    break;
                default:
                    chatService.typeError(ctx);
                    break;
            }
        }
    
        /**
         * 描述:客户端断开连接
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            chatService.remove(ctx);
        }
    
        /**
         * 异常处理:关闭channel
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    
        private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) {
            String responseJson = new MyResponseJson()
                    .error(errorMsg)
                    .toString();
            ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson));
        }
    }
    

    2.4websocket通道处理器初始化

    package com.kikop.websocket.init;
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.util.CharsetUtil;
    import org.springframework.stereotype.Component;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettywebsocketserverdemo
     * @file Name: MyWebSocketChannelInitializer
     * @desc websocket通道处理器初始化
     * @date 2021/6/28
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    @Component("myWebSocketChannelInitializer")
    public class MyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    
        @Resource(name = "myHttpRequestHandler")
        private ChannelHandler myHttpRequestHandler;
    
        @Resource(name = "myWebSocketHandler")
        private ChannelHandler myWebSocketHandler;
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
    
            // 当有客户端到来时,创建对应的 socketChannel,设定通道对应的 pipeline
    
            ChannelPipeline pipeline = ch.pipeline();
    
    
            // HttpServerCodec只能获取 uri中参数,所以需要加上 HttpObjectAggregator
            ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP编码解码器
    
    
            // HttpObjectAggregator:主要是为了解析post请求中的message body
            // https://www.cnblogs.com/bihanghang/p/10218738.html
            // 把 HTTP头、HTTP体拼成完整的HTTP请求
    
            // Get请求包括两个部分:
            // request line(包括method,request uri,protocol version))、header
    
            // POST请求包括三个部分
            // request line(包括method,request uri,protocol version))
            // header
            // message body
            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
    
    
            // 原理是我们先将我们的file变成ChunkedFile,指定chunkSize和file的startoffset以及endoffset
            // 那么我们每次最多生成chunkSize大小的message,每次分配堆内bytebuf,然后将该bytebuf填充满
            // 最后发出去,然后只要channel一直可写就一直发送,
            // 所以这边的有点就算每次只发送指定大小,且可以利用pool 来避免内存无限大。
    
            // 可以掌控文件发送的细节过程
    
            // 方便大文件传输,不过实质上都是短的文本数据
            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
    
            // 主要是完成HTTP协议到 Websocket协议的升级
            // 处理的数据结构:Object
            ch.pipeline().addLast("http-handler", myHttpRequestHandler);
    
            // 处理的数据结构:WebSocketFrame
            ch.pipeline().addLast("websocket-handler",myWebSocketHandler);
    
        }
    }
    

    2.5MyWebSocketServer服务线程

    package com.kikop.websocket.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettywebsocketserverdemo
     * @file Name: MyWebSocketServer服务线程
     * @desc netty服务端具体初始化逻辑, 独立的线程启动
     * @date 2021/6/28
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    @Service
    public class MyWebSocketServer implements Runnable {
    
        private final Logger logger = LoggerFactory.getLogger(MyWebSocketServer.class);
    
        @Autowired
        private EventLoopGroup bossGroup;
    
        //  <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"/>
        @Autowired
        private EventLoopGroup workerGroup;
    
        @Autowired
        private ServerBootstrap serverBootstrap;
    
    
        //    通过配置文件属性注入
        private int port = 3333;
        private ChannelHandler myWebSocketChannelInitializer;
    
    
        private ChannelFuture serverChannelFuture;
    
    
        public MyWebSocketServer() {
    
        }
    
        /**
         * 启动 Netty Websocket服务器
         */
        @Override
        public void run() {
            build();
        }
    
        /**
         * 启动 netty websocket 服务端
         */
        private void build() {
    
    
            try {
    
                long begin = System.currentTimeMillis();
                // 1.netty服务端初始化
    
    //            // 1.1.创建boss线程,默认线程数:2*cpu
    //            boss = new NioEventLoopGroup(2);
    //            // 1.2.创建worker线程,默认线程数:2*cpu
    //            worker = new NioEventLoopGroup();
    
                // 1.3.创建 serverBootstrap(prototype)
                serverBootstrap = new ServerBootstrap();
    
                // 1.3.1设置 serverBootstrap 参数
                serverBootstrap.group(bossGroup, workerGroup) // boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
                        .channel(NioServerSocketChannel.class) // 配置连接客户端的channel类型
                        .option(ChannelOption.SO_BACKLOG, 1024) // 配置TCP参数,握手字符串长度设置
                        .option(ChannelOption.TCP_NODELAY, true) // TCP_NODELAY算法,尽可能发送大块数据,减少充斥的小块数据
                        .childOption(ChannelOption.SO_KEEPALIVE, true)// 开启心跳包活机制,就是客户端、服务端建立连接处于 ESTABLISHED状态,超过2小时没有交流,机制会被启动
                        .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))//配置固定长度接收缓存区分配器
                        .childHandler(myWebSocketChannelInitializer); // 绑定I/O事件的处理类,WebSocketChildChannelHandler中定义
    
                long end = System.currentTimeMillis();
                logger.info("Netty Websocket服务器启动完成,耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接");
    
                serverChannelFuture = serverBootstrap.bind(port).sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger.info(e.getMessage());
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                e.printStackTrace();
            } finally {
            }
        }
    
    
        /**
         * 描述:关闭 Netty Websocket服务器,主要是释放连接
         * 连接包括:服务器连接 serverChannel,
         * 客户端TCP处理连接 bossGroup,
         * 客户端I/O操作连接 workerGroup
         * <p>
         * 若只使用
         * bossGroupFuture = bossGroup.shutdownGracefully();
         * workerGroupFuture = workerGroup.shutdownGracefully();
         * 会造成内存泄漏。
         */
        public void close() {
            serverChannelFuture.channel().close();
            io.netty.util.concurrent.Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
            io.netty.util.concurrent.Future<?> workerGroupFuture = workerGroup.shutdownGracefully();
    
            try {
                bossGroupFuture.await();
                workerGroupFuture.await();
            } catch (InterruptedException ignore) {
                ignore.printStackTrace();
            }
        }
    
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public ChannelHandler getMyWebSocketChannelInitializer() {
            return myWebSocketChannelInitializer;
        }
    
        public void setMyWebSocketChannelInitializer(ChannelHandler myWebSocketChannelInitializer) {
            this.myWebSocketChannelInitializer = myWebSocketChannelInitializer;
        }
    }
    

    2.10测试

    登录url:

    http://localhost:8085/WebSocket/

    http://localhost:8085/WebSocket/login

    聊天url:

    http://localhost:8085/WebSocket/chatroom

    总结

    3.1 java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener

    其实是你的jar文件没有同步发布到自己项目的WEB-INF\lib目录中,手动建:lib

    参考

    1.在SpringBoot中整合使用Netty框架

    https://www.cnblogs.com/kevinblandy/p/13177944.html

    2【SpringBoot框架篇】18.使用Netty加websocket实现在线聊天功能

    https://blog.csdn.net/ming19951224/article/details/108555917

    3B站【Netty项目实战基于Java分布式实现IM聊天室服务器推送技术-哔哩哔哩】

    https://b23.tv/LHLZ5a

    4高逼格开源聊天系统

    相关文章

      网友评论

          本文标题:2021-08-15_NettyWebsocketServer协

          本文链接:https://www.haomeiwen.com/subject/idvpbltx.html