美文网首页IT@程序员猿媛SpringBoot精选
SpringBoot整合Netty与websocket客户端进行

SpringBoot整合Netty与websocket客户端进行

作者: 程就人生 | 来源:发表于2019-10-09 22:09 被阅读0次

都说Netty高性能,别人说再怎么说也只是别人的经历,和自己并没有半毛钱关系,可不是吗?怎么才能证明Netty是高性能的框架呢,据说Netty可以结合Websocket一起使用,那就先整合Websocket做个聊天后台服务器试一试,感觉一下吧。

这个整合还需要分几步走,第一步是SpringBoot和Netty的整合,第二步才是Netty和Websocket整合,最后再实现前端HTML5对聊天服务器信息的收发。

环境技术说明:
SpringBoot2.1.4
Thmeleaf
Netty
Websocket

首先,在pom文件中引入必须的架包;

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <!-- netty架包依赖 -->
        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

第二步,Netty、Websocket整合成HTML5 Websocket可以接收的服务端;

import java.util.concurrent.TimeUnit;

import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * Netty整合websocket 服务端
 * 运行流程:
 * 1.创建一个ServerBootstrap的实例引导和绑定服务器。
 * 2.创建并分配一个NioEventLoopGroup实例以进行事件的处理,比如接受连接以及读写数据。
 * 3.指定服务器绑定的本地的InetSocketAddress。
 * 4.使用一个NettyServerHandler的实例初始化每一个新的Channel。
 * 5.调用ServerBootstrap.bind()方法以绑定服务器。
 * 
 * Netty 服务端
 * @author 程就人生
 * @date 2019年10月8日
 * @Description 
 *
 */
@Component
public class NettyWebsocketServer {
    
        
    /**
     * EventLoop接口
     * NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
     * I/O任务
     * 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
     * 非IO任务
     * 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
     * 两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。
     */
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    
    private final EventLoopGroup workGroup = new NioEventLoopGroup();
    /**
     * Channel
     * Channel类似Socket,它代表一个实体(如一个硬件设备、一个网络套接字)的开放连接,如读写操作。通俗地讲,Channel字面意思就是通道,每一个客户端与服务端之间进行通信的一个双向通道。
     * Channel主要工作:
     * 1.当前网络连接的通道的状态(例如是否打开?是否已连接?)
     * 2.网络连接的配置参数 (例如接收缓冲区大小)
     * 3.提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
     *  调用立即返回一个 ChannelFuture 实例,通过注册监听器到ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
     * 4.支持关联 I/O 操作与对应的处理程序。
     * 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型
     * NioSocketChannel,异步的客户端 TCP Socket 连接
     * NioServerSocketChannel,异步的服务器端 TCP Socket 连接
     * NioDatagramChannel,异步的 UDP 连接
     * NioSctpChannel,异步的客户端 Sctp 连接
     * NioSctpServerChannel,异步的 Sctp 服务器端连接
     * 这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.
     */
    private Channel channel;
    
    /**
     * 启动服务
     * @param port
     */
    public void start(int port){
        
        /**
         * Future
         * Future提供了另外一种在操作完成是通知应用程序的方式。这个对象可以看作一个异步操作的结果占位符。
         * 通俗地讲,它相当于一位指挥官,发送了一个请求建立完连接,通信完毕了,你通知一声它回来关闭各项IO通道,整个过程,它是不阻塞的,异步的。
         * 在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,
         * 他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
         */
        try {
            /**
             * Bootstrap
             * Bootstrap是引导的意思,一个Netty应用通常由一个Bootstrap开始,
             * 主要作用是配置整个Netty程序,串联各个组件,
             * Netty中Bootstrap类是客户端程序的启动引导类,
             * ServerBootstrap是服务端启动引导类。
             */
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup,workGroup)
                  //非阻塞
                  .channel(NioServerSocketChannel.class)
                  //设置为前端websocket可以连接
                  .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    // HttpServerCodec:将请求和应答消息解码为HTTP消息
                    pipeline.addLast("http-codec",new HttpServerCodec());
                    //将HTTP消息的多个部分合成一条完整的HTTP消息
                    pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                    //向客户端发送HTML5文件
                    socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                    // 进行设置心跳检测
                    socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
                    //配置通道处理  来进行业务处理
                    pipeline.addLast("handler", new WebSocketServerHandler());
                }
                
            });
            channel = server.bind(port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 停止服务
     */
    public void destroy(){
        if(channel != null) { 
            channel.close();
        }
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }

    public static void main(String[] args) {
        NettyWebsocketServer server = new NettyWebsocketServer();
        server.start(7788);
    }
}

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 信息接收的处理
 * @author 程就人生
 * @date 2019年10月9日
 */
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
    
    //在线人存储
    private static final Map<String, NioSocketChannel> channelMap = new ConcurrentHashMap<>(16);
    
    //保存全局的,连接上服务器的客户
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    private static Logger log = LoggerFactory.getLogger(WebSocketServerHandler.class);

    private WebSocketServerHandshaker handshaker;
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //http请求和tcp请求分开处理
        if(msg instanceof HttpRequest){
            handlerHttpRequest(ctx,(HttpRequest) msg);
        }else if(msg instanceof WebSocketFrame){
            handlerWebSocketFrame(ctx,(WebSocketFrame) msg);
        }
    }
    
    
    /**
     * 
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    
    /**
     * websocket消息处理
     * @param ctx
     * @param msg
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //判断是否是关闭链路的指令
        if(frame instanceof CloseWebSocketFrame){
            log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
            //移除channel
            removeCannel((NioSocketChannel)ctx.channel());
            return;
        }
        //判断是否是ping消息
        if(frame instanceof PingWebSocketFrame){
            log.info("【ping】");
            //PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
            //ctx.channel().writeAndFlush(pong);
            return ;
        }
        //判断实时是pong消息
        if(frame instanceof PongWebSocketFrame){
            log.info("【pong】");
            return ;
        }
        //本例子只支持文本,不支持二进制
        if(!(frame instanceof TextWebSocketFrame)){
            log.info("【不支持二进制】");
            throw new UnsupportedOperationException("不支持二进制");
        }
        //返回信息应答
        JSONObject object = JSONObject.parseObject(((TextWebSocketFrame) frame).text().toString());
        
        //接收信息的人是否在线
        if(channelMap.containsKey(object.getString("toUser"))){
            //在线时直接发送,已送达
            //只支持文本形式,信息必须以文本形式发送
            channelMap.get(object.getString("toUser")).writeAndFlush(new TextWebSocketFrame(object.toString()));
        }
            
        
    }
    
     /**
     * wetsocket第一次连接握手
     * @param ctx
     * @param msg
     */
    @SuppressWarnings("deprecation")
    private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
        String userUid = null;
        if (req.getMethod().toString().equals("GET")) {
            userUid = req.getUri().substring(req.getUri().indexOf("/", 2)+1);
            //对用户信息进行存储
            channelMap.put(userUid, (NioSocketChannel)ctx.channel());           
        }
        
        // http 解码失败
        if(!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
            sendHttpResponse(ctx, (FullHttpRequest) req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
        }
        //可以通过url获取其他参数
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
                "ws://"+req.headers().get("Host")+"/"+req.getUri()+"",null,false
        );
        handshaker = factory.newHandshaker(req);
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }else{
            //进行连接
            handshaker.handshake(ctx.channel(), (FullHttpRequest) req);    
            //拉取未发送的数据
            //TODO
        }
    }

    @SuppressWarnings("deprecation")
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
    
     /**
     * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            //PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()){
                //读空闲(服务器端)
                case READER_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
                    //ctx.writeAndFlush(ping);
                    break;
                    //写空闲(客户端)
                case WRITER_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
                    //ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
                    break;
            }
        }
    }

    /**
     * 出现异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        //移除channel
        removeCannel((NioSocketChannel)ctx.channel());
        ctx.close();
        log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
    }
    
    /**
     * 从缓存中移除已关闭的channel
     * @param nioSocketChannel
     */
    private void removeCannel(NioSocketChannel nioSocketChannel){
        //从当前在线中移除
        if(channelMap.containsValue(nioSocketChannel)){
            for(Map.Entry<String, NioSocketChannel> entry : channelMap.entrySet()){
                if(entry.getValue() == nioSocketChannel){
                    channelMap.remove(entry.getKey());
                    break;
                }
            }
        }
    }

}

第三步,在Springboot启动类里,增加Netty服务端的启动;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.example.demo.netty.websocket.NettyWebsocketServer;

/**
 * 实现CommandLineRunner接口,启动Netty服务端
 * @author 程就人生
 * @date 2019年10月9日
 */
@SpringBootApplication
public class NettyDemoApplication implements CommandLineRunner{

    //需要在配置文件里配置
    @Value("${im.server.port}")
    private int imServerPort ;
    
    @Autowired
    private NettyWebsocketServer nettyServer;
    
    public static void main(String[] args) {
        SpringApplication.run(NettyDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        //netty服务启动的端口号不可和SpringBoot启动类的端口号重复
        nettyServer.start(imServerPort);
        //服务停止时关闭nettyServer
        Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
    }

}

第四步,整个Controller,渲染到页面;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

/**
 * 简易的测试页面
 * @author 程就人生
 * @date 2019年10月9日
 */
@RestController
public class IndexController {
    
    //需要在配置文件里进行配置
    @Value("${im.server.port}")
    private int imServerPort ;
    
    /**
     * 最简易的
     * @return
     */
    @GetMapping("/index/{userUid}")
    public ModelAndView index(@PathVariable("userUid") String userUid){
        //指定模板路径
        ModelAndView modelAndView = new ModelAndView("/index");
        modelAndView.addObject("userUid", userUid);
        modelAndView.addObject("imServerPort", imServerPort);
        return modelAndView;
    }
}

第五步,前端页面简易制作;

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org" >
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
</head>
<body>
<p>发送信息:<input type="text" id="txt" ></input></p>
<p>接收人:<input type="text" id="toUser" ></input></p>
<p><button id="button" >发送消息</button></p>
<p id="recvContent">

</p>
<script src="http://www.jq22.com/jquery/jquery-1.10.2.js"></script>
<script th:inline="javascript" >
    var userUid=[[${userUid}]];
    var imServerPort=[[${imServerPort}]];
    <!-- ws客户端   -->
    var socket;  
    var wsUrl = "ws://localhost:"+imServerPort+"/websocket/"+userUid;
    //避免重复连接
    var lockReconnect = false;
    var tt;
    //创建websocket
    createWebSocket();  
    //发送信息回车键
    $("#txt").keydown(function(event){ 
        if(event.keyCode==13){ 
            $("#button").click();
        } 
    }); 
    //创建连接
    function createWebSocket() {
        try {
            if(typeof(WebSocket) == "undefined") {  
                console.log("您的浏览器不支持WebSocket");  
            }else{
                console.log("您的浏览器支持WebSocket");  
            }
            socket = new WebSocket(wsUrl);
            //初始化
            init();
        } catch(e) {
            console.log('catch');
            //异常后重新连接
            reconnect();
        }
    }
    //初始化
    function init() {
        socket.onclose = function () {
            console.log('链接关闭');
            //关闭后重新连接
            reconnect();
        };
        socket.onerror = function() {
            console.log('发生异常了');
            //出错后重新连接
            reconnect();
        };
        socket.onopen = function () {
            //心跳检测重置
            heartCheck.start();
        };
        socket.onmessage = function (event) {
            // 将json字符串转换为对象
            var resData = JSON.parse(event.data);
            console.log(resData);  
            //好友列表初始化
            if(resData!=undefined) {
                $("#recvContent").append('<div style="width:300px;text-align:left;"><span >'+resData.fromUser + '发送:' + resData.content + '</span></div><br/>');
            } 
            heartCheck.start();
        }
    }
    //重新连接
    function reconnect() {
      if(lockReconnect) {
        return;
      };
      lockReconnect = true;
      //没连接上会一直重连,设置延迟避免请求过多
      tt && clearTimeout(tt);
      tt = setTimeout(function () {
        createWebSocket();
        lockReconnect = false;
      }, 4000);
    }
    //心跳检测
    var heartCheck = {
        timeout: 210000,
        timeoutObj: null,
        serverTimeoutObj: null,
        start: function(){
              console.log(getNowTime() +" Socket 心跳检测");  
            var self = this;
            this.timeoutObj && clearTimeout(this.timeoutObj);
            this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
            this.timeoutObj = setTimeout(function(){
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                console.log(getNowTime() +' Socket 连接重试');
                //socket.send("连接成功");
                self.serverTimeoutObj = setTimeout(function() {
                    console.log(socket);
                    socket.close();
                }, self.timeout);
            }, this.timeout)
        }
    }
    //按钮点击事件
    $("#button").click(function(){
        var object={}
        object.content = $("#txt").val();
        object.toUser = $("#toUser").val();
        object.fromUser= userUid;
        $("#txt").val("");
        $("#recvContent").append('<div style="width:300px;text-align:right;"><span >发送给'+object.toUser + ':' + object.content + '</span></div><br/>');
        socket.send(JSON.stringify(object));
    });
    /**
     * 获取系统当前时间
     * @returns
     */
    function p(s) {
        return s < 10 ? '0' + s : s;
    }
    function getNowTime() {
        var myDate = new Date();
        //获取当前年
        var year = myDate.getFullYear();
        //获取当前月
        var month = myDate.getMonth() + 1;
        //获取当前日
        var date = myDate.getDate();
        var h = myDate.getHours();       //获取当前小时数(0-23)
        var m = myDate.getMinutes();     //获取当前分钟数(0-59)
        var s = myDate.getSeconds();
        return year + '-' + p(month) + "-" + p(date) + " " + p(h) + ':' + p(m) + ":" + p(s);
    }
</script>

</body>
</html>

最后,启动项目,通过网址打开两个简易版聊天对话框,然后进行信息的发送和接收;

测试窗口1 测试窗口2

总结
通过这次整合,可以看出来,原来用Websocket写的服务端和通过Netty写的服务端,在逻辑上是不变的,唯独变化的是技术处理的细节上。也就是说,不管使用的是什么技术,要解决的问题没变,关键就是在技术处理上如何让问题不再是问题。

相关文章

网友评论

    本文标题:SpringBoot整合Netty与websocket客户端进行

    本文链接:https://www.haomeiwen.com/subject/cvknpctx.html