美文网首页
2021-08-15_NettyWebsocketServer协

2021-08-15_NettyWebsocketServer协

作者: kikop | 来源:发表于2021-08-15 23:25 被阅读0次

20210815_NettyWebsocketServer协议学习笔记2

1概述

WebSocket是为了解决HTTP协议中通信只能由客户端发起这个弊端而出现的,WebSocket基于HTTP5协议,借用HTTP进行握手、升级,能够做到轻量的、高效的、双向的在客户端和服务端之间传输文本数据。

1.1功能模块

实现核心的聊天功能,包括单发、群发、文件发送。

系统只包括两个模块:登录模块和聊天管理模块。

  • 登录模块:既然作为一个系统,那么登录的角色认证是必不可少的,这里使用简单、传统的Session方式维持登录状态
  • 注销模块:当然也有对应的注销功能,但这里的注销除了清空Session对象,还要释放WebSocket连接,否则造成内存泄露。
  • 聊天管理模块:系统的核心模块,这部分主要使用Netty框架实现,功能包括信息、文件的单条和多条发送,也支持表情发送。
  • 其他模块:如好友管理模块、聊天记录管理、注册模块等,我并没有实现,有兴趣的话可以自行实现,与传统的开发方式类似。

1.2http协议升级原理

socket = new WebSocket("ws://localhost:8088");

Request URL: ws://localhost:8088/
Request Method: GET
Status Code: 101 Switching Protocols
// 协议请求头
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cache-Control: no-cache
Connection: Upgrade
Host: localhost:8088
Origin: http://localhost:8085
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
// Sec-WebSocket-Key
Sec-WebSocket-Key: TR/ZKx61zqyqIVN7QbeJPQ==
Sec-WebSocket-Version: 13
// 协议升级标识:Upgrade
Upgrade: websocket
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36
// 协议响应
connection: upgrade
sec-websocket-accept: aRgINM6rA896S0mXaVCp+6yj99A=
upgrade: websocket

1.2.1源码分析

1.2.1.2首次http请求

DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
GET / HTTP/1.1
Host: localhost:8088
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36
Upgrade: websocket
Origin: http://localhost:8085
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Sec-WebSocket-Key: tWdJLe/IFtQ+RcA2MGbnXw==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
content-length: 0
// 旧的channelPipeline:
head:
DefaultChannelPipeline$HeadContext
-->第一个入站处理器:HttpServerCodec
---->HttpObjectAggregator
------>ChunkedWriteHandler
-------->MyHttpRequestHandler
---------->MyWebSocketHandler
------------>DefaultChannelPipeline$TailContext

image-20210815225454695.png

1.2.1.3协议升级算法

// 真正的协议升级
webSocketServerHandshaker.handshake(channelHandlerContext.channel(), fullHttpRequest);
// WebSocketServerHandshaker
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                        HttpHeaders responseHeaders, final ChannelPromise promise) {

    if (logger.isDebugEnabled()) {
        logger.debug("{} WebSocket version {} server handshake", channel, version());
    }
    FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
    ChannelPipeline p = channel.pipeline();
    // 1.移除HttpObjectAggregator
    if (p.get(HttpObjectAggregator.class) != null) {
        p.remove(HttpObjectAggregator.class);
    }
    // 2.移除HttpContentCompressor
    if (p.get(HttpContentCompressor.class) != null) {
        p.remove(HttpContentCompressor.class);
    }
    // 3.ctx==null
    ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
    final String encoderName;
    if (ctx == null) {
        // this means the user use a HttpServerCodec
        ctx = p.context(HttpServerCodec.class);
        if (ctx == null) { // 前提要有:HttpServerCodec
            promise.setFailure(
                    new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
            return promise;
        }
        // 4.增加websocket的编解码器,在 http-codec之前
        // newWebsocketDecoder
        p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
        p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
        encoderName = ctx.name();
    } else {
        p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());

        encoderName = p.context(HttpResponseEncoder.class).name();
        p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
    }
    channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ChannelPipeline p = future.channel().pipeline();
                // 5.移除 http-codec
                p.remove(encoderName);
                promise.setSuccess();
            } else {
                promise.setFailure(future.cause());
            }
        }
    });
    return promise;
}
// 新的channelPipeline:
head:
DefaultChannelPipeline$HeadContext
-->WebSocketFrame13Decoder
---->WebSocketFrame13EnDecoder
------>ChunkedWriteHandler
-------->MyHttpRequestHandler
---------->MyWebSocketHandler
------------>DefaultChannelPipeline$TailContext

1.2.1.4handlerWebSocketFrame

/**
     * 描述:处理WebSocketFrame
     * @param ctx
     * @param frame
     * @throws Exception
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 关闭请求
        if (frame instanceof CloseWebSocketFrame) {
            WebSocketServerHandshaker handshaker =
                    Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
            if (handshaker == null) {
                sendErrorMessage(ctx, "不存在的客户端连接!");
            } else {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
            return;
        }
        // ping请求
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 只支持文本格式,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息");
        }

        // 客服端发送过来的消息
        // {"fromUserId":"002","toUserId":"001","content":"11","type":"SINGLE_SENDING"}
        String request = ((TextWebSocketFrame)frame).text();
        LOGGER.info("服务端收到新信息:" + request);
        JSONObject param = null;
        try {
            param = JSONObject.parseObject(request);
        } catch (Exception e) {
            sendErrorMessage(ctx, "JSON字符串转换出错!");
            e.printStackTrace();
        }
        if (param == null) {
            sendErrorMessage(ctx, "参数为空!");
            return;
        }

        String type = (String) param.get("type");
        switch (type) {
            case "REGISTER":
                chatService.register(param, ctx);
                break;
            case "SINGLE_SENDING":
                chatService.singleSend(param, ctx);
                break;
            case "GROUP_SENDING":
                chatService.groupSend(param, ctx);
                break;
            case "FILE_MSG_SINGLE_SENDING":
                chatService.FileMsgSingleSend(param, ctx);
                break;
            case "FILE_MSG_GROUP_SENDING":
                chatService.FileMsgGroupSend(param, ctx);
                break;
            default:
                chatService.typeError(ctx);
                break;
        }
    }

2代码实战

2.1maven依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>technicaltools</artifactId>
        <groupId>com.kikop</groupId>
        <version>1.0-SNAPSHOT</version>
        <!--定义依赖的父pom文件-->
        <relativePath>../pom.xml</relativePath>
    </parent>

    <groupId>com.kikop</groupId>
    <artifactId>mynettywebsocketserverdemo</artifactId>

    <properties>

    </properties>

    <dependencies>

        <!--1.共用模块-->
        <dependency>
            <groupId>com.kikop</groupId>
            <artifactId>mytechcommon</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>


        <!--2.spring-webmvc-->
        <!--2.1.spring对web的支持,依赖 spring-webmvc,此时jar包会自动下载(spring-context、spring-web、spring-webmvc)-->
        <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webmvc</artifactId>
        <version>${springframework.version}</version>
        </dependency>

        <!--2.2.spring dao层依赖(jdbc),内置tx-->
        <!--<dependency>-->
        <!--<groupId>org.springframework</groupId>-->
        <!--<artifactId>spring-jdbc</artifactId>-->
        <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->


        <!--&lt;!&ndash; Spring &ndash;&gt;-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-core</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-context</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-web</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-webmvc</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-orm</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-tx</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-test</artifactId>-->
            <!--<version>${springframework.version}</version>-->
            <!--<scope>test</scope>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-jdbc</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-expression</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-aop</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.springframework</groupId>-->
            <!--<artifactId>spring-context-support</artifactId>-->
            <!--<version>${springframework.version}</version>-->
        <!--</dependency>-->


        <!--<dependency>-->
            <!--<groupId>javax.servlet</groupId>-->
            <!--<artifactId>javax.servlet-api</artifactId>-->
            <!--<version>3.0.1</version>-->
            <!--<scope>provided</scope>-->
        <!--</dependency>-->


        <!-- 通用工具包:commons-lang3 -->
        <!--<dependency>-->
            <!--<groupId>org.apache.commons</groupId>-->
            <!--<artifactId>commons-lang3</artifactId>-->
            <!--<version>3.4</version>-->
            <!--&lt;!&ndash;对于scope=compile的情况(默认scope),也就是说这个项目在编译,测试,运行阶段都需要这个artifact对应的jar包在classpath中。&ndash;&gt;-->
            <!--&lt;!&ndash;<scope>compile</scope>&ndash;&gt;-->
        <!--</dependency>-->


        <!-- 8.文件上传:commons-fileupload -->
        <!-- used by <bean id="multipartResolver"-->
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
            <version>1.3.3</version>
        </dependency>


        <!--5.netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <!--<version>${netty.version}</version>-->
            <version>4.1.10.Final</version>
            <!--<version>4.1.2.Final</version>-->
        </dependency>

        <!-- 6.jackson包 -->
        <!--配合Spring @ResponseBody注解,以及json工具包-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- 7.log4j日志包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.6</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>

    </dependencies>


    <!--<build>-->
    <!--<finalName>WebSocket</finalName>-->
    <!--<plugins>-->
    <!--<plugin>-->
    <!--<groupId>org.apache.maven.plugins</groupId>-->
    <!--<artifactId>maven-compiler-plugin</artifactId>-->
    <!--<version>3.6.2</version>-->
    <!--<configuration>-->
    <!--<source>1.8</source>-->
    <!--<target>1.8</target>-->
    <!--<optimize>true</optimize>-->
    <!--</configuration>-->
    <!--</plugin>-->
    <!--</plugins>-->
    <!--</build>-->


</project>

2.2http协议升级处理器

package com.kikop.websocket.handler;

import com.kikop.utils.Constant;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import org.springframework.stereotype.Component;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettywebsocketserverdemo
 * @file Name: MyHttpRequestHandler
 * @desc http协议升级处理器
 * @date 2021/6/28
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */

@Component("myHttpRequestHandler")

// 标有@Sharable的Handler,代表了他是一个可以被分享的handler,
// 这就是说服务器注册了这个handler后,
// 可以分享给多个客户端使用,如果没有使用该注解,
// 则每次客户端请求时,都必须重新创建一个handler。
// https://blog.csdn.net/qq_34354257/article/details/90901850
@ChannelHandler.Sharable
public class MyHttpRequestHandler extends SimpleChannelInboundHandler<Object> {


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            channelHandlerContext.fireChannelRead(((WebSocketFrame) msg).retain());
        }
    }


    /**
     * http 到 websocket协议的升级
     *
     * @param channelHandlerContext
     * @param fullHttpRequest
     */
    private void handleHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {

        if (!fullHttpRequest.getDecoderResult().isSuccess()) {
            sendHttpResponse(channelHandlerContext, fullHttpRequest,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        // 握手实例管理
        WebSocketServerHandshakerFactory webSocketServerHandshakerFactory = new WebSocketServerHandshakerFactory("ws:/" + channelHandlerContext.channel() + "/websocket",
                null, false);


        WebSocketServerHandshaker webSocketServerHandshaker = webSocketServerHandshakerFactory.newHandshaker(fullHttpRequest);
        Constant.webSocketHandshakerMap.put(channelHandlerContext.channel().id().asLongText(), webSocketServerHandshaker);

        if (webSocketServerHandshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channelHandlerContext.channel());
        } else {
            webSocketServerHandshaker.handshake(channelHandlerContext.channel(), fullHttpRequest);

        }
    }

    private void sendHttpResponse(ChannelHandlerContext channelHandlerContext,
                                  FullHttpRequest fullHttpRequest,
                                  DefaultFullHttpResponse defaultFullHttpResponse) {


        if (defaultFullHttpResponse.status().code() != 200) {
            ByteBuf byteBuf = Unpooled.copiedBuffer(defaultFullHttpResponse.status().toString(), CharsetUtil.UTF_8);
            defaultFullHttpResponse.content().writeBytes(byteBuf);
            byteBuf.release();
        }

//        非 keeplive

        boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
        ChannelFuture channelFuture = channelHandlerContext.channel().writeAndFlush(defaultFullHttpResponse);
        if (!keepAlive) {
            channelFuture.addListener(ChannelFutureListener.CLOSE);
        }

    }
}

2.3websocket协议数据处理器

package com.kikop.websocket.handler;

import com.alibaba.fastjson.JSONObject;
import com.kikop.model.vo.MyResponseJson;
import com.kikop.service.IChatService;
import com.kikop.service.impl.ChatServiceImpl;
import com.kikop.utils.Constant;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettywebsocketserverdemo
 * @file Name: MyWebSocketTask
 * @desc websocket协议数据处理器
 * @date 2021/6/28
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */
@Component("myWebSocketHandler")
@ChannelHandler.Sharable
public class MyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {


    private static final Logger LOGGER = LoggerFactory.getLogger(MyWebSocketHandler.class);

    @Autowired
    private IChatService chatService;

    /**
     * 描述:读取完连接的消息后,对消息进行处理。
     *      这里主要是处理WebSocket请求
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
        handlerWebSocketFrame(ctx, msg);
    }

    /**
     * 描述:处理WebSocketFrame
     * @param ctx
     * @param frame
     * @throws Exception
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 关闭请求
        if (frame instanceof CloseWebSocketFrame) {
            WebSocketServerHandshaker handshaker =
                    Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
            if (handshaker == null) {
                sendErrorMessage(ctx, "不存在的客户端连接!");
            } else {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
            return;
        }
        // ping请求
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 只支持文本格式,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息");
        }

        // 客服端发送过来的消息
        String request = ((TextWebSocketFrame)frame).text();
        LOGGER.info("服务端收到新信息:" + request);
        JSONObject param = null;
        try {
            param = JSONObject.parseObject(request);
        } catch (Exception e) {
            sendErrorMessage(ctx, "JSON字符串转换出错!");
            e.printStackTrace();
        }
        if (param == null) {
            sendErrorMessage(ctx, "参数为空!");
            return;
        }

        String type = (String) param.get("type");
        switch (type) {
            case "REGISTER":
                chatService.register(param, ctx);
                break;
            case "SINGLE_SENDING":
                chatService.singleSend(param, ctx);
                break;
            case "GROUP_SENDING":
                chatService.groupSend(param, ctx);
                break;
            case "FILE_MSG_SINGLE_SENDING":
                chatService.FileMsgSingleSend(param, ctx);
                break;
            case "FILE_MSG_GROUP_SENDING":
                chatService.FileMsgGroupSend(param, ctx);
                break;
            default:
                chatService.typeError(ctx);
                break;
        }
    }

    /**
     * 描述:客户端断开连接
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        chatService.remove(ctx);
    }

    /**
     * 异常处理:关闭channel
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


    private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) {
        String responseJson = new MyResponseJson()
                .error(errorMsg)
                .toString();
        ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson));
    }
}

2.4websocket通道处理器初始化

package com.kikop.websocket.init;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettywebsocketserverdemo
 * @file Name: MyWebSocketChannelInitializer
 * @desc websocket通道处理器初始化
 * @date 2021/6/28
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */
@Component("myWebSocketChannelInitializer")
public class MyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Resource(name = "myHttpRequestHandler")
    private ChannelHandler myHttpRequestHandler;

    @Resource(name = "myWebSocketHandler")
    private ChannelHandler myWebSocketHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        // 当有客户端到来时,创建对应的 socketChannel,设定通道对应的 pipeline

        ChannelPipeline pipeline = ch.pipeline();


        // HttpServerCodec只能获取 uri中参数,所以需要加上 HttpObjectAggregator
        ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP编码解码器


        // HttpObjectAggregator:主要是为了解析post请求中的message body
        // https://www.cnblogs.com/bihanghang/p/10218738.html
        // 把 HTTP头、HTTP体拼成完整的HTTP请求

        // Get请求包括两个部分:
        // request line(包括method,request uri,protocol version))、header

        // POST请求包括三个部分
        // request line(包括method,request uri,protocol version))
        // header
        // message body
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));


        // 原理是我们先将我们的file变成ChunkedFile,指定chunkSize和file的startoffset以及endoffset
        // 那么我们每次最多生成chunkSize大小的message,每次分配堆内bytebuf,然后将该bytebuf填充满
        // 最后发出去,然后只要channel一直可写就一直发送,
        // 所以这边的有点就算每次只发送指定大小,且可以利用pool 来避免内存无限大。

        // 可以掌控文件发送的细节过程

        // 方便大文件传输,不过实质上都是短的文本数据
        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());

        // 主要是完成HTTP协议到 Websocket协议的升级
        // 处理的数据结构:Object
        ch.pipeline().addLast("http-handler", myHttpRequestHandler);

        // 处理的数据结构:WebSocketFrame
        ch.pipeline().addLast("websocket-handler",myWebSocketHandler);

    }
}

2.5MyWebSocketServer服务线程

package com.kikop.websocket.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettywebsocketserverdemo
 * @file Name: MyWebSocketServer服务线程
 * @desc netty服务端具体初始化逻辑, 独立的线程启动
 * @date 2021/6/28
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */
@Service
public class MyWebSocketServer implements Runnable {

    private final Logger logger = LoggerFactory.getLogger(MyWebSocketServer.class);

    @Autowired
    private EventLoopGroup bossGroup;

    //  <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"/>
    @Autowired
    private EventLoopGroup workerGroup;

    @Autowired
    private ServerBootstrap serverBootstrap;


    //    通过配置文件属性注入
    private int port = 3333;
    private ChannelHandler myWebSocketChannelInitializer;


    private ChannelFuture serverChannelFuture;


    public MyWebSocketServer() {

    }

    /**
     * 启动 Netty Websocket服务器
     */
    @Override
    public void run() {
        build();
    }

    /**
     * 启动 netty websocket 服务端
     */
    private void build() {


        try {

            long begin = System.currentTimeMillis();
            // 1.netty服务端初始化

//            // 1.1.创建boss线程,默认线程数:2*cpu
//            boss = new NioEventLoopGroup(2);
//            // 1.2.创建worker线程,默认线程数:2*cpu
//            worker = new NioEventLoopGroup();

            // 1.3.创建 serverBootstrap(prototype)
            serverBootstrap = new ServerBootstrap();

            // 1.3.1设置 serverBootstrap 参数
            serverBootstrap.group(bossGroup, workerGroup) // boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
                    .channel(NioServerSocketChannel.class) // 配置连接客户端的channel类型
                    .option(ChannelOption.SO_BACKLOG, 1024) // 配置TCP参数,握手字符串长度设置
                    .option(ChannelOption.TCP_NODELAY, true) // TCP_NODELAY算法,尽可能发送大块数据,减少充斥的小块数据
                    .childOption(ChannelOption.SO_KEEPALIVE, true)// 开启心跳包活机制,就是客户端、服务端建立连接处于 ESTABLISHED状态,超过2小时没有交流,机制会被启动
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))//配置固定长度接收缓存区分配器
                    .childHandler(myWebSocketChannelInitializer); // 绑定I/O事件的处理类,WebSocketChildChannelHandler中定义

            long end = System.currentTimeMillis();
            logger.info("Netty Websocket服务器启动完成,耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接");

            serverChannelFuture = serverBootstrap.bind(port).sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
            logger.info(e.getMessage());
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            e.printStackTrace();
        } finally {
        }
    }


    /**
     * 描述:关闭 Netty Websocket服务器,主要是释放连接
     * 连接包括:服务器连接 serverChannel,
     * 客户端TCP处理连接 bossGroup,
     * 客户端I/O操作连接 workerGroup
     * <p>
     * 若只使用
     * bossGroupFuture = bossGroup.shutdownGracefully();
     * workerGroupFuture = workerGroup.shutdownGracefully();
     * 会造成内存泄漏。
     */
    public void close() {
        serverChannelFuture.channel().close();
        io.netty.util.concurrent.Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
        io.netty.util.concurrent.Future<?> workerGroupFuture = workerGroup.shutdownGracefully();

        try {
            bossGroupFuture.await();
            workerGroupFuture.await();
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }


    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ChannelHandler getMyWebSocketChannelInitializer() {
        return myWebSocketChannelInitializer;
    }

    public void setMyWebSocketChannelInitializer(ChannelHandler myWebSocketChannelInitializer) {
        this.myWebSocketChannelInitializer = myWebSocketChannelInitializer;
    }
}

2.10测试

登录url:

http://localhost:8085/WebSocket/

http://localhost:8085/WebSocket/login

聊天url:

http://localhost:8085/WebSocket/chatroom

总结

3.1 java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener

其实是你的jar文件没有同步发布到自己项目的WEB-INF\lib目录中,手动建:lib

参考

1.在SpringBoot中整合使用Netty框架

https://www.cnblogs.com/kevinblandy/p/13177944.html

2【SpringBoot框架篇】18.使用Netty加websocket实现在线聊天功能

https://blog.csdn.net/ming19951224/article/details/108555917

3B站【Netty项目实战基于Java分布式实现IM聊天室服务器推送技术-哔哩哔哩】

https://b23.tv/LHLZ5a

4高逼格开源聊天系统

相关文章

网友评论

      本文标题:2021-08-15_NettyWebsocketServer协

      本文链接:https://www.haomeiwen.com/subject/idvpbltx.html