前段时间,做项目的时候,让我搭建一个websocket服务,实现微信小程序聊天。但之前没接触过,然后网上各种找资料,看别人的代码。最终,根据别人提供的代码总算可以运行了。
这里,要感谢白马湖小龙王,后面的代码都是他提供的。转载自:
https://blog.csdn.net/weixin_39168678/article/details/79453585
接下来,我就把主要的代码上传上去。如果有需要的自己复制粘贴。
1.首先,我因为用的公司的核格平台(核格平台是一个可视化的快速开发平台,有兴趣的可以去这个论坛下载,http://bbs.hearker.com/,有各种版本的平台),利用平台里的一个jboss服务器启动时候调用的注解类,如下:
package com.sunsheen.websocket.start;
import javax.servlet.ServletContext;
import com.sunsheen.jfids.system.servlet.Listener;
import com.sunsheen.jfids.system.servlet.SystemStartupListener;
/**
* 服务启动的时候就执行监听
* @author heWanLi
* 2018-01-03
*
*/
@Listener
public class TaskStartupListener implements SystemStartupListener {
@Override
public void init(ServletContext param) {
new Task().startTask();
}
}
Task类,具体实现开启websocket的逻辑。这样,就能满足在一个jboss服务器下搭建websocket,而不用单独再去弄个服务器。
package com.sunsheen.websocket.start;
import com.sunsheen.websocket.netty.ImServer;
import com.sunsheen.websocket.netty.NettyUtil;
/**
* 利用启动的时候启动websocket
* @author HeWanLi
*
*/
public class Task {
public void timingTask(){
}
/**
* 定时执行任务
*/
public void startTask(){
/**
* 开启一个线程,让websocket服务也启动起来
*/
/*检测端口是否被占用*/
int port = new ImServer().getPort();
if(NettyUtil.isPortAvailable(port)){
System.out.println("启动了websocket");
Runnable run=new Runnable() {
@SuppressWarnings("static-access")
public void run(){
try {
new ImServer(port).run();
System.out.println("启动了websocket");
} catch (Exception e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
};
new Thread(run).start();
}
}
}
接下来是netty实现的主要代码:
2.有一个处理消息是否为空的类:
public class BlankUtil {
public static boolean isBlank(String messges){
if (null == messges || "".equals(messges)) {
return true;
}
return false;
}
}
3.一个netty的工具类:
public class NettyUtil {
/**
* @param channel
* @return
*/
public static String parseChannelRemoteAddr(Channel channel) {
// TODO 自动生成的方法存根
return channel.remoteAddress().toString().substring(1);
}
/**
* 将字符串转成Map
* @param text {"room_id":"1","uid":"1","sender":"2","type":"1","message":"1"} 这样的格式
* @return
*/
public static Map<String, Object> changeStringToMap(String text){
Gson gson = new Gson();
Map<String, Object> paramMap = gson.fromJson(text, Map.class);
return paramMap;
}
private static void bindPort(String host, int port) throws Exception {
Socket s = new Socket();
s.bind(new InetSocketAddress(host, port));
s.close();
}
public static boolean isPortAvailable(int port) {
Socket s = new Socket();
try {
bindPort("0.0.0.0", port);
bindPort(InetAddress.getLocalHost().getHostAddress(), port);
return true;
} catch (Exception e) {
return false;
}
}
}
4.用户的一个实体类:
public class UserInfo {
private String userId; // UID
private String addr; // 地址
private Channel channel;// 通道
private String sender; // 消息接收人
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
/**
* @return sender
*/
public String getSender() {
return sender;
}
/**
* @param sender 要设置的 sender
*/
public void setSender(String sender) {
this.sender = sender;
}
}
5.用户处理类,用来添加用户,删除聊天用户,以及处理用户消息:
public class UserInfoManager {
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
private static ConcurrentHashMap<Channel, UserInfo> userInfos = new ConcurrentHashMap<>();
/**
* 登录注册 channel
*
*
*/
public static void addChannel(Channel channel,String uid) {
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(uid);
userInfo.setAddr(remoteAddr);
userInfo.setChannel(channel);
userInfos.put(channel, userInfo);
}
/**
* 普通消息
*
* @param message
*/
public static void broadcastMess(String uid,String message,String sender) {
if (!BlankUtil.isBlank(message)) {
try {
rwLock.readLock().lock();
Set<Channel> keySet = userInfos.keySet();
for (Channel ch : keySet) {
UserInfo userInfo = userInfos.get(ch);
if (!userInfo.getUserId().equals(uid) ) continue;
String backmessage=sender+","+message;
ch.writeAndFlush(new TextWebSocketFrame(backmessage));
/* responseToClient(ch,message);*/
}
} finally {
rwLock.readLock().unlock();
}
}
}
/**
* @param channel
*/
public static void removeChannel(Channel channel) {
// TODO 自动生成的方法存根
userInfos.remove(channel);
}
/**
* @param channel
* @return
*/
public static UserInfo getUserInfo(Channel channel) {
// TODO 自动生成的方法存根
return userInfos.get(channel);
}
/**
* 添加房间号信息,注册信息(这个可以不要,主要是为了满足我的业务,而重载的一个方法)
* @param channel
* @param changeStringToMap
*/
public static void addChannel(Channel channel,
Map<String, Object> paramMap) {
removeChannel(channel);
/*if(!rooms.containsKey(paramMap.get("room_id"))){
// 并将聊天室添加到数据库 判断没有的情况下,才添加到数据库
//new WebsocketService().save(paramMap);
}*/
// 并将聊天室添加到数据库 判断没有的情况下,才添加到数据库
//new WebsocketService().save(paramMap);
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(paramMap.get("uid").toString());
userInfo.setAddr(remoteAddr);
userInfo.setSender(paramMap.get("sender").toString());
userInfo.setChannel(channel);
userInfos.put(channel, userInfo);
// rooms.put(paramMap.get("room_id").toString(), userInfos); // 添加一个聊天室
System.out.print("connect user:"+paramMap.get("uid").toString());
// 将消息添加到数据库
//new WebsocketService().saveMessage(paramMap);
}
}
6.一个接收前端数据的处理类,包括页面信息等:
/**
* @author oj
* 消息处理类
*/
public class SocketHandler extends ChannelInboundHandlerAdapter {
//ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private WebSocketServerHandshaker handshaker;
private final String wsUri = "/ws";
//websocket握手升级绑定页面
String wsFactoryUri = "";
/*
* 握手建立
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.add(incoming);
}
/*
* 握手取消
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.remove(incoming);
}
/*
* channelAction
*
* channel 通道 action 活跃的
*
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。
*
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel());
System.out.println(ctx.channel().localAddress().toString() + " 通道已激活!");
}
/*
* channelInactive
*
* channel 通道 Inactive 不活跃的
*
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。
*
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
}
/*
* 功能:读取 h5页面发送过来的信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/*
* 功能:读空闲时移除Channel
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent) evt;
// 判断Channel是否读空闲, 读空闲时移除Channel
if (evnet.state().equals(IdleState.READER_IDLE)) {
UserInfoManager.removeChannel(ctx.channel());
}
}
ctx.fireUserEventTriggered(evt);
}
/*
* 功能:处理HTTP的代码
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException {
// 如果HTTP解码失败,返回HHTP异常
if (req instanceof HttpRequest) {
HttpMethod method = req.getMethod();
// 如果是websocket请求就握手升级
if (wsUri.equalsIgnoreCase(req.getUri())) {
System.out.println(" req instanceof HttpRequest");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
wsFactoryUri, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
}
}
/*
* 处理Websocket的代码
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
//System.out.println("websocket get");
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本消息,不支持二进制消息
if (frame instanceof TextWebSocketFrame) {
// 返回应答消息
String requestmsg = ((TextWebSocketFrame) frame).text();
System.out.println("websocket消息======"+requestmsg);
String[] array= requestmsg.split(",");
// 将通道加入通道管理器
UserInfoManager.addChannel(ctx.channel(),array[0]);
UserInfo userInfo = UserInfoManager.getUserInfo(ctx.channel());
if (array.length== 3) {
// 将信息返回给h5
String sendid=array[0];String friendid=array[1];String messageid=array[2];
UserInfoManager.broadcastMess(friendid,messageid,sendid);
}
}
}
/**
* 功能:服务端发生异常的操作
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
7.一个用于启动服务的类:
public class ImServer {
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup = new NioEventLoopGroup();
private static int port=8080;
/**
* @return port
*/
public static int getPort() {
return port;
}
public ImServer() {
}
//线程池设计的定时任务类
public ImServer(int port) {
}
public void run() throws Exception {
try {
//创建ServerBootstrap实例
ServerBootstrap b = new ServerBootstrap();
//设置并绑定Reactor线程池
b.group(bossGroup, workerGroup)
//设置并绑定服务端Channel
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
pipeline.addLast(new SocketHandler());//自定义处理类
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// System.out.println("WebsocketChatServer Start:" + port);
try {
ChannelFuture f = b.bind(port).sync();//// 服务器异步创建绑定
channel = f.channel();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
}
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
System.out.println("WebsocketChatServer Stop:" + port);
}
}
public static void main(String[] args) throws Exception {
new ImServer(port).run();
}
}
需要的jar包分别是:gson-2.2.4.jar,netty-all-4.1.22.Final.jar
需要一个html页面chat.html,如下:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>简单聊天室</title>
</head>
<script type="text/javascript" src="jquery.min.js"></script>
<body>
<div id="content" class="row-center">
<div id="chat-box" class="row-center">
</div>
<div id="input-box">
<input class="chat-input" id="chat-input" placeholder="message"></input>
<input id="myid" placeholder="myid">
<button id="login-button" onclick="login()">登录</button>
<input id="friendid" placeholder="friendid">
<button class="chat-button" id="send" onclick="send()">发送</button>
</div>
</div>
</body>
</html>
<script type="text/javascript">
var ipaddress="127.0.0.1";
//新建socket对象
window.socket = new WebSocket("ws://"+ipaddress+":8080/ws");
//监听netty服务器消息并打印到页面上
socket.onmessage = function(event) {
var datas=event.data.split(",");
console.log("服务器消息===="+datas);
$("#chat-box").text(datas);
}
//将发送人接收人的id和要发生的消息发送出去
function send(){
console.log($("#chat-input").val())
var data=$("#myid").val()+","+$("#friendid").val()+","+$("#chat-input").val()
socket.send(data)
}
//登录事件
function login(){
var data=$("#myid").val();socket.send(data);
}
</script>
最后部署到jboss服务器,启动服务器,然后去websocket测试网站去输入:
ws://localhost:8080/ws 点击连接,看看是否测试成功。如果显示如下,证明可以使用了:
以上就是全部代码了。但是这个代码还能有 更多优化,比如发图片,发视频,发语音,这些我都还没实现,还没有去尝试。
如果需要源代码的,可以到这个去下载:
https://github.com/wishyoukew/netty-websocket-demo
如果有不好的地方,望见谅,只是从别人那里搬运过来补充完整的。
网友评论