都说Netty高性能,别人说再怎么说也只是别人的经历,和自己并没有半毛钱关系,可不是吗?怎么才能证明Netty是高性能的框架呢,据说Netty可以结合Websocket一起使用,那就先整合Websocket做个聊天后台服务器试一试,感觉一下吧。
这个整合还需要分几步走,第一步是SpringBoot和Netty的整合,第二步才是Netty和Websocket整合,最后再实现前端HTML5对聊天服务器信息的收发。
环境技术说明:
SpringBoot2.1.4
Thmeleaf
Netty
Websocket
首先,在pom文件中引入必须的架包;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- netty架包依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
第二步,Netty、Websocket整合成HTML5 Websocket可以接收的服务端;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
/**
* Netty整合websocket 服务端
* 运行流程:
* 1.创建一个ServerBootstrap的实例引导和绑定服务器。
* 2.创建并分配一个NioEventLoopGroup实例以进行事件的处理,比如接受连接以及读写数据。
* 3.指定服务器绑定的本地的InetSocketAddress。
* 4.使用一个NettyServerHandler的实例初始化每一个新的Channel。
* 5.调用ServerBootstrap.bind()方法以绑定服务器。
*
* Netty 服务端
* @author 程就人生
* @date 2019年10月8日
* @Description
*
*/
@Component
public class NettyWebsocketServer {
/**
* EventLoop接口
* NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
* I/O任务
* 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
* 非IO任务
* 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
* 两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。
*/
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workGroup = new NioEventLoopGroup();
/**
* Channel
* Channel类似Socket,它代表一个实体(如一个硬件设备、一个网络套接字)的开放连接,如读写操作。通俗地讲,Channel字面意思就是通道,每一个客户端与服务端之间进行通信的一个双向通道。
* Channel主要工作:
* 1.当前网络连接的通道的状态(例如是否打开?是否已连接?)
* 2.网络连接的配置参数 (例如接收缓冲区大小)
* 3.提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
* 调用立即返回一个 ChannelFuture 实例,通过注册监听器到ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
* 4.支持关联 I/O 操作与对应的处理程序。
* 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型
* NioSocketChannel,异步的客户端 TCP Socket 连接
* NioServerSocketChannel,异步的服务器端 TCP Socket 连接
* NioDatagramChannel,异步的 UDP 连接
* NioSctpChannel,异步的客户端 Sctp 连接
* NioSctpServerChannel,异步的 Sctp 服务器端连接
* 这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.
*/
private Channel channel;
/**
* 启动服务
* @param port
*/
public void start(int port){
/**
* Future
* Future提供了另外一种在操作完成是通知应用程序的方式。这个对象可以看作一个异步操作的结果占位符。
* 通俗地讲,它相当于一位指挥官,发送了一个请求建立完连接,通信完毕了,你通知一声它回来关闭各项IO通道,整个过程,它是不阻塞的,异步的。
* 在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,
* 他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
*/
try {
/**
* Bootstrap
* Bootstrap是引导的意思,一个Netty应用通常由一个Bootstrap开始,
* 主要作用是配置整个Netty程序,串联各个组件,
* Netty中Bootstrap类是客户端程序的启动引导类,
* ServerBootstrap是服务端启动引导类。
*/
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup,workGroup)
//非阻塞
.channel(NioServerSocketChannel.class)
//设置为前端websocket可以连接
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// HttpServerCodec:将请求和应答消息解码为HTTP消息
pipeline.addLast("http-codec",new HttpServerCodec());
//将HTTP消息的多个部分合成一条完整的HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
//向客户端发送HTML5文件
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 进行设置心跳检测
socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
//配置通道处理 来进行业务处理
pipeline.addLast("handler", new WebSocketServerHandler());
}
});
channel = server.bind(port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 停止服务
*/
public void destroy(){
if(channel != null) {
channel.close();
}
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
public static void main(String[] args) {
NettyWebsocketServer server = new NettyWebsocketServer();
server.start(7788);
}
}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 信息接收的处理
* @author 程就人生
* @date 2019年10月9日
*/
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
//在线人存储
private static final Map<String, NioSocketChannel> channelMap = new ConcurrentHashMap<>(16);
//保存全局的,连接上服务器的客户
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static Logger log = LoggerFactory.getLogger(WebSocketServerHandler.class);
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//http请求和tcp请求分开处理
if(msg instanceof HttpRequest){
handlerHttpRequest(ctx,(HttpRequest) msg);
}else if(msg instanceof WebSocketFrame){
handlerWebSocketFrame(ctx,(WebSocketFrame) msg);
}
}
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* websocket消息处理
* @param ctx
* @param msg
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
//判断是否是关闭链路的指令
if(frame instanceof CloseWebSocketFrame){
log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
//移除channel
removeCannel((NioSocketChannel)ctx.channel());
return;
}
//判断是否是ping消息
if(frame instanceof PingWebSocketFrame){
log.info("【ping】");
//PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
//ctx.channel().writeAndFlush(pong);
return ;
}
//判断实时是pong消息
if(frame instanceof PongWebSocketFrame){
log.info("【pong】");
return ;
}
//本例子只支持文本,不支持二进制
if(!(frame instanceof TextWebSocketFrame)){
log.info("【不支持二进制】");
throw new UnsupportedOperationException("不支持二进制");
}
//返回信息应答
JSONObject object = JSONObject.parseObject(((TextWebSocketFrame) frame).text().toString());
//接收信息的人是否在线
if(channelMap.containsKey(object.getString("toUser"))){
//在线时直接发送,已送达
//只支持文本形式,信息必须以文本形式发送
channelMap.get(object.getString("toUser")).writeAndFlush(new TextWebSocketFrame(object.toString()));
}
}
/**
* wetsocket第一次连接握手
* @param ctx
* @param msg
*/
@SuppressWarnings("deprecation")
private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
String userUid = null;
if (req.getMethod().toString().equals("GET")) {
userUid = req.getUri().substring(req.getUri().indexOf("/", 2)+1);
//对用户信息进行存储
channelMap.put(userUid, (NioSocketChannel)ctx.channel());
}
// http 解码失败
if(!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx, (FullHttpRequest) req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
}
//可以通过url获取其他参数
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
"ws://"+req.headers().get("Host")+"/"+req.getUri()+"",null,false
);
handshaker = factory.newHandshaker(req);
if(handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else{
//进行连接
handshaker.handshake(ctx.channel(), (FullHttpRequest) req);
//拉取未发送的数据
//TODO
}
}
@SuppressWarnings("deprecation")
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 这里是保持服务器与客户端长连接 进行心跳检测 避免连接断开
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
//PingWebSocketFrame ping = new PingWebSocketFrame();
switch (stateEvent.state()){
//读空闲(服务器端)
case READER_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
//ctx.writeAndFlush(ping);
break;
//写空闲(客户端)
case WRITER_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
//ctx.writeAndFlush(ping);
break;
case ALL_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
break;
}
}
}
/**
* 出现异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
//移除channel
removeCannel((NioSocketChannel)ctx.channel());
ctx.close();
log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
}
/**
* 从缓存中移除已关闭的channel
* @param nioSocketChannel
*/
private void removeCannel(NioSocketChannel nioSocketChannel){
//从当前在线中移除
if(channelMap.containsValue(nioSocketChannel)){
for(Map.Entry<String, NioSocketChannel> entry : channelMap.entrySet()){
if(entry.getValue() == nioSocketChannel){
channelMap.remove(entry.getKey());
break;
}
}
}
}
}
第三步,在Springboot启动类里,增加Netty服务端的启动;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.example.demo.netty.websocket.NettyWebsocketServer;
/**
* 实现CommandLineRunner接口,启动Netty服务端
* @author 程就人生
* @date 2019年10月9日
*/
@SpringBootApplication
public class NettyDemoApplication implements CommandLineRunner{
//需要在配置文件里配置
@Value("${im.server.port}")
private int imServerPort ;
@Autowired
private NettyWebsocketServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(NettyDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//netty服务启动的端口号不可和SpringBoot启动类的端口号重复
nettyServer.start(imServerPort);
//服务停止时关闭nettyServer
Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
}
}
第四步,整个Controller,渲染到页面;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
/**
* 简易的测试页面
* @author 程就人生
* @date 2019年10月9日
*/
@RestController
public class IndexController {
//需要在配置文件里进行配置
@Value("${im.server.port}")
private int imServerPort ;
/**
* 最简易的
* @return
*/
@GetMapping("/index/{userUid}")
public ModelAndView index(@PathVariable("userUid") String userUid){
//指定模板路径
ModelAndView modelAndView = new ModelAndView("/index");
modelAndView.addObject("userUid", userUid);
modelAndView.addObject("imServerPort", imServerPort);
return modelAndView;
}
}
第五步,前端页面简易制作;
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org" >
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
</head>
<body>
<p>发送信息:<input type="text" id="txt" ></input></p>
<p>接收人:<input type="text" id="toUser" ></input></p>
<p><button id="button" >发送消息</button></p>
<p id="recvContent">
</p>
<script src="http://www.jq22.com/jquery/jquery-1.10.2.js"></script>
<script th:inline="javascript" >
var userUid=[[${userUid}]];
var imServerPort=[[${imServerPort}]];
<!-- ws客户端 -->
var socket;
var wsUrl = "ws://localhost:"+imServerPort+"/websocket/"+userUid;
//避免重复连接
var lockReconnect = false;
var tt;
//创建websocket
createWebSocket();
//发送信息回车键
$("#txt").keydown(function(event){
if(event.keyCode==13){
$("#button").click();
}
});
//创建连接
function createWebSocket() {
try {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else{
console.log("您的浏览器支持WebSocket");
}
socket = new WebSocket(wsUrl);
//初始化
init();
} catch(e) {
console.log('catch');
//异常后重新连接
reconnect();
}
}
//初始化
function init() {
socket.onclose = function () {
console.log('链接关闭');
//关闭后重新连接
reconnect();
};
socket.onerror = function() {
console.log('发生异常了');
//出错后重新连接
reconnect();
};
socket.onopen = function () {
//心跳检测重置
heartCheck.start();
};
socket.onmessage = function (event) {
// 将json字符串转换为对象
var resData = JSON.parse(event.data);
console.log(resData);
//好友列表初始化
if(resData!=undefined) {
$("#recvContent").append('<div style="width:300px;text-align:left;"><span >'+resData.fromUser + '发送:' + resData.content + '</span></div><br/>');
}
heartCheck.start();
}
}
//重新连接
function reconnect() {
if(lockReconnect) {
return;
};
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
tt = setTimeout(function () {
createWebSocket();
lockReconnect = false;
}, 4000);
}
//心跳检测
var heartCheck = {
timeout: 210000,
timeoutObj: null,
serverTimeoutObj: null,
start: function(){
console.log(getNowTime() +" Socket 心跳检测");
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log(getNowTime() +' Socket 连接重试');
//socket.send("连接成功");
self.serverTimeoutObj = setTimeout(function() {
console.log(socket);
socket.close();
}, self.timeout);
}, this.timeout)
}
}
//按钮点击事件
$("#button").click(function(){
var object={}
object.content = $("#txt").val();
object.toUser = $("#toUser").val();
object.fromUser= userUid;
$("#txt").val("");
$("#recvContent").append('<div style="width:300px;text-align:right;"><span >发送给'+object.toUser + ':' + object.content + '</span></div><br/>');
socket.send(JSON.stringify(object));
});
/**
* 获取系统当前时间
* @returns
*/
function p(s) {
return s < 10 ? '0' + s : s;
}
function getNowTime() {
var myDate = new Date();
//获取当前年
var year = myDate.getFullYear();
//获取当前月
var month = myDate.getMonth() + 1;
//获取当前日
var date = myDate.getDate();
var h = myDate.getHours(); //获取当前小时数(0-23)
var m = myDate.getMinutes(); //获取当前分钟数(0-59)
var s = myDate.getSeconds();
return year + '-' + p(month) + "-" + p(date) + " " + p(h) + ':' + p(m) + ":" + p(s);
}
</script>
</body>
</html>
最后,启动项目,通过网址打开两个简易版聊天对话框,然后进行信息的发送和接收;
总结
通过这次整合,可以看出来,原来用Websocket写的服务端和通过Netty写的服务端,在逻辑上是不变的,唯独变化的是技术处理的细节上。也就是说,不管使用的是什么技术,要解决的问题没变,关键就是在技术处理上如何让问题不再是问题。
网友评论