自定义报文协议
上行命令
指服务器向客户端发送的消息内容
SYSTEM 系统命令,例如[命令][命令发送时间][接收人] - 系统提示内容例如:[SYSTEM][124343423123][Edward] – Student加入聊天室
下行命令
指客户端想服务器发送的命令
LOGIN登录动作:[命令][命令发送时间][命令发送人][终端类型]例如:[LOGIN][124343423123][Edward][WebSocket]
LOGOUT 退出登录动作:[命令][命令发送时间][命令发送人]例如:[LOGOUT][124343423123][Edward]
CHAT聊天:[命令][命令发送时间][命令发送人][命令接收人] – 聊天内容例如:[CHAT][124343423123][Edward][ALL] - 大家好,我是Edward!
FLOWER 发送送鲜花特效:[命令][命令发送时间][命令发送人][终端类型][命令接收人]例如:[FLOWER][124343423123][you][WebSocket][ALL]
pom配置文件
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
核心代码
基础类
/**
* 自定义IM协议,Instant Messaging Protocol即时通信协议
*
*/
public enum IMP {
/** 系统消息 */
SYSTEM("SYSTEM"),
/** 登录指令 */
LOGIN("LOGIN"),
/** 登出指令 */
LOGOUT("LOGOUT"),
/** 聊天消息 */
CHAT("CHAT"),
/** 送鲜花 */
FLOWER("FLOWER");
private String name;
public static boolean isIMP(String content){
return content.matches("^\\[(SYSTEM|LOGIN|LOGIN|CHAT)\\]");
}
IMP(String name){
this.name = name;
}
public String getName(){
return this.name;
}
public String toString(){
return this.name;
}
}
/**
* 自定义消息实体类
*
*/
@Message
@Data
public class IMMessage{
private String addr; //IP地址及端口
private String cmd; //命令类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
private long time; //命令发送时间
private int online; //当前在线人数
private String sender; //发送人
private String receiver; //接收人
private String content; //消息内容
private String terminal; //终端
public IMMessage(){}
public IMMessage(String cmd,long time,int online,String content){
this.cmd = cmd;
this.time = time;
this.online = online;
this.content = content;
this.terminal = terminal;
}
public IMMessage(String cmd,String terminal,long time,String sender){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.terminal = terminal;
}
public IMMessage(String cmd,long time,String sender,String content){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.content = content;
this.terminal = terminal;
}
@Override
public String toString() {
return "IMMessage{" +
"addr='" + addr + '\'' +
", cmd='" + cmd + '\'' +
", time=" + time +
", online=" + online +
", sender='" + sender + '\'' +
", receiver='" + receiver + '\'' +
", content='" + content + '\'' +
'}';
}
}
handler
/**
* 主要用于自定义协议内容的逻辑处理
*
*/
public class MsgProcessor {
//记录在线用户
private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//定义一些扩展属性
public static final AttributeKey<String> NICK_NAME = AttributeKey.valueOf("nickName");
public static final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
public static final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
public static final AttributeKey<String> FROM = AttributeKey.valueOf("from");
//自定义解码器
private IMDecoder decoder = new IMDecoder();
//自定义编码器
private IMEncoder encoder = new IMEncoder();
/**
* 获取用户昵称
* @param client
* @return
*/
public String getNickName(Channel client){
return client.attr(NICK_NAME).get();
}
/**
* 获取用户远程IP地址
* @param client
* @return
*/
public String getAddress(Channel client){
return client.remoteAddress().toString().replaceFirst("/","");
}
/**
* 获取扩展属性
* @param client
* @return
*/
public JSONObject getAttrs(Channel client){
try{
return client.attr(ATTRS).get();
}catch(Exception e){
return null;
}
}
/**
* 获取扩展属性
* @param client
* @return
*/
private void setAttrs(Channel client,String key,Object value){
try{
JSONObject json = client.attr(ATTRS).get();
json.put(key, value);
client.attr(ATTRS).set(json);
}catch(Exception e){
JSONObject json = new JSONObject();
json.put(key, value);
client.attr(ATTRS).set(json);
}
}
/**
* 登出通知
* @param client
*/
public void logout(Channel client){
//如果nickName为null,没有遵从聊天协议的连接,表示未非法登录
if(getNickName(client) == null){ return; }
for (Channel channel : onlineUsers) {
IMMessage request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "离开");
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
onlineUsers.remove(client);
}
/**
* 发送消息
* @param client
* @param msg
*/
public void sendMsg(Channel client,IMMessage msg){
sendMsg(client,encoder.encode(msg));
}
/**
* 发送消息
* @param client
* @param msg
*/
public void sendMsg(Channel client,String msg){
IMMessage request = decoder.decode(msg);
if(null == request){ return; }
String addr = getAddress(client);
if(request.getCmd().equals(IMP.LOGIN.getName())){
client.attr(NICK_NAME).getAndSet(request.getSender());
client.attr(IP_ADDR).getAndSet(addr);
client.attr(FROM).getAndSet(request.getTerminal());
onlineUsers.add(client);
for (Channel channel : onlineUsers) {
boolean isself = (channel == client);
if(!isself){
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "加入");
}else{
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), "已与服务器建立连接!");
}
if("Console".equals(channel.attr(FROM).get())){
channel.writeAndFlush(request);
continue;
}
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}else if(request.getCmd().equals(IMP.CHAT.getName())){
for (Channel channel : onlineUsers) {
boolean isself = (channel == client);
if (isself) {
request.setSender("you");
}else{
request.setSender(getNickName(client));
}
request.setTime(sysTime());
if("Console".equals(channel.attr(FROM).get()) & !isself){
channel.writeAndFlush(request);
continue;
}
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}else if(request.getCmd().equals(IMP.FLOWER.getName())){
JSONObject attrs = getAttrs(client);
long currTime = sysTime();
if(null != attrs){
long lastTime = attrs.getLongValue("lastFlowerTime");
//60秒之内不允许重复刷鲜花
int secends = 10;
long sub = currTime - lastTime;
if(sub < 1000 * secends){
request.setSender("you");
request.setCmd(IMP.SYSTEM.getName());
request.setContent("您送鲜花太频繁," + (secends - Math.round(sub / 1000)) + "秒后再试");
String content = encoder.encode(request);
client.writeAndFlush(new TextWebSocketFrame(content));
return;
}
}
//正常送花
for (Channel channel : onlineUsers) {
if (channel == client) {
request.setSender("you");
request.setContent("你给大家送了一波鲜花雨");
setAttrs(client, "lastFlowerTime", currTime);
}else{
request.setSender(getNickName(client));
request.setContent(getNickName(client) + "送来一波鲜花雨");
}
request.setTime(sysTime());
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}
}
/**
* 获取系统时间
* @return
*/
private Long sysTime(){
return System.currentTimeMillis();
}
}
/**
* 自定义IM协议的编码器
*/
public class IMDecoder extends ByteToMessageDecoder {
//解析IM写一下请求内容的正则
private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {
try{
IMMessage message = new MessagePack().read(in.nioBuffer(), IMMessage.class);
out.add(message);
in.clear();
}catch(MessageTypeException e){
ctx.channel().pipeline().remove(this);
}
}
/**
* 字符串解析成自定义即时通信协议
* @param msg
* @return
*/
public IMMessage decode(String msg){
if(null == msg || "".equals(msg.trim())){ return null; }
try{
Matcher m = pattern.matcher(msg);
String header = "";
String content = "";
if(m.matches()){
header = m.group(1);
content = m.group(3);
}
String [] heards = header.split("\\]\\[");
long time = 0;
try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
String nickName = heards[2];
//昵称最多十个字
nickName = nickName.length() < 10 ? nickName : nickName.substring(0, 9);
if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
return new IMMessage(heards[0],heards[3],time,nickName);
}else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
return new IMMessage(heards[0],time,nickName,content);
}else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
return new IMMessage(heards[0],heards[3],time,nickName);
}else{
return null;
}
}catch(Exception e){
e.printStackTrace();
return null;
}
}
}
public class IMEncoder extends MessageToByteEncoder<IMMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
throws Exception {
out.writeBytes(new MessagePack().write(msg));
}
public String encode(IMMessage msg){
if(null == msg){ return ""; }
String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
IMP.FLOWER.getName().equals(msg.getCmd())){
prex += ("[" + msg.getSender() + "][" + msg.getTerminal() + "]");
}else if(IMP.CHAT.getName().equals(msg.getCmd())){
prex += ("[" + msg.getSender() + "]");
}else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
prex += ("[" + msg.getOnline() + "]");
}
if(!(null == msg.getContent() || "".equals(msg.getContent()))){
prex += (" - " + msg.getContent());
}
return prex;
}
}
服务端
@Slf4j
public class ChatServer{
private int port = 8080;
public void start(int port){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/**
*输入:
* 前端是符合自定义格式的字符串 --> IMMessage --> 向chanel写(TerminalServerHandler)
* http请求 --> HttpServerCodec(HttpMessage) --> HttpObjectAggregator(FullHttpRequest) --> HttpServerHandler
* webSocket请求(Web TCP) --> WebSocketServerProtocolHandler(WebSocketFrame) --> WebSocketServerHandler
* 输出:
* IMMessage --> IMEncoder
* http: HttpResponse --> HttpServerCodec
*/
/** 解析自定义协议 */
pipeline.addLast(new IMDecoder()); //Inbound
pipeline.addLast(new IMEncoder()); //Outbound
pipeline.addLast(new TerminalServerHandler()); //Inbound
/** 解析Http请求 */
pipeline.addLast(new HttpServerCodec()); //Outbound
//主要是将同一个http请求或响应的多个消息对象变成一个 fullHttpRequest完整的消息对象
pipeline.addLast(new HttpObjectAggregator(64 * 1024));//Inbound
//主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的 ,加上这个handler我们就不用考虑这个问题了
pipeline.addLast(new ChunkedWriteHandler());//Inbound、Outbound
pipeline.addLast(new HttpServerHandler());//Inbound
/** 解析WebSocket请求 */
pipeline.addLast(new WebSocketServerProtocolHandler("/im")); //Inbound
pipeline.addLast(new WebSocketServerHandler()); //Inbound
}
});
ChannelFuture f = b.bind(this.port).sync();
log.info("服务已启动,监听端口" + this.port);
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void start() {
start(this.port);
}
public static void main(String[] args) throws IOException{
if(args.length > 0) {
new ChatServer().start(Integer.valueOf(args[0]));
}else{
new ChatServer().start();
}
}
}
@Slf4j
public class TerminalServerHandler extends SimpleChannelInboundHandler<IMMessage>{
private MsgProcessor processor = new MsgProcessor();
@Override
protected void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws Exception {
processor.sendMsg(ctx.channel(), msg);
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("Socket Client: 与客户端断开连接:" + cause.getMessage());
ctx.close();
}
}
@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private MsgProcessor processor = new MsgProcessor();
@Override
protected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
processor.sendMsg(ctx.channel(), msg.text());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel client = ctx.channel();
String addr = processor.getAddress(client);
log.info("WebSocket Client:" + addr + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
//获取class路径
private URL baseURL = HttpServerHandler.class.getResource("");
private final String webroot = "webroot";
private File getResource(String fileName) throws Exception{
String basePath = baseURL.toURI().toString();
int start = basePath.indexOf("classes/");
basePath = (basePath.substring(0,start) + "/" + "classes/").replaceAll("/+","/");
String path = basePath + webroot + "/" + fileName;
path = !path.contains("file:") ? path : path.substring(5);
path = path.replaceAll("//", "/");
return new File(path);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String uri = request.getUri();
RandomAccessFile file = null;
try{
String page = uri.equals("/") ? "chat.html" : uri;
file = new RandomAccessFile(getResource(page), "r");
}catch(Exception e){
ctx.fireChannelRead(request.retain());
return;
}
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
String contextType = "text/html;";
if(uri.endsWith(".css")){
contextType = "text/css;";
}else if(uri.endsWith(".js")){
contextType = "text/javascript;";
}else if(uri.toLowerCase().matches(".*\\.(jpg|png|gif)$")){
String ext = uri.substring(uri.lastIndexOf("."));
contextType = "image/" + ext;
}
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, contextType + "charset=utf-8;");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
file.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel client = ctx.channel();
log.info("Client:"+client.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
客户端
/**
* 客户端
*/
public class ChatClient {
private ChatClientHandler clientHandler;
private String host;
private int port;
public ChatClient(String nickName){
this.clientHandler = new ChatClientHandler(nickName);
}
public void connect(String host,int port){
this.host = host;
this.port = port;
/**
*输入:
* byte --> IMMessage --> 向chanel写(ChatClientHandler)
* 输出:
* ChatClientHandler --> IMMessage --> IMEncoder
* http: HttpResponse --> HttpServerCodec
*/
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IMDecoder());
ch.pipeline().addLast(new IMEncoder());
ch.pipeline().addLast(clientHandler);
}
});
ChannelFuture f = b.connect(this.host, this.port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws IOException{
new ChatClient("edward").connect("127.0.0.1",8080);
}
}
/**
* 聊天客户端逻辑实现
*/
@Slf4j
public class ChatClientHandler extends SimpleChannelInboundHandler<IMMessage> {
private ChannelHandlerContext ctx;
private String nickName;
public ChatClientHandler(String nickName){
this.nickName = nickName;
}
/**启动客户端控制台*/
private void session() throws IOException {
new Thread(){
public void run(){
System.out.println(nickName + ",你好,请在控制台输入对话内容");
IMMessage message = null;
Scanner scanner = new Scanner(System.in);
do{
if(scanner.hasNext()){
String input = scanner.nextLine();
if("exit".equals(input)){
message = new IMMessage(IMP.LOGOUT.getName(),"Console",System.currentTimeMillis(),nickName);
}else{
message = new IMMessage(IMP.CHAT.getName(),System.currentTimeMillis(),nickName,input);
}
}
}
while (sendMsg(message));
scanner.close();
}
}.start();
}
/**
* tcp链路建立成功后调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
IMMessage message = new IMMessage(IMP.LOGIN.getName(),"Console",System.currentTimeMillis(),this.nickName);
sendMsg(message);
log.info("成功连接服务器,已执行登录动作");
session();
}
/**
* 发送消息
* @param msg
* @return
* @throws IOException
*/
private boolean sendMsg(IMMessage msg){
ctx.channel().writeAndFlush(msg);
System.out.println("继续输入开始对话...");
return msg.getCmd().equals(IMP.LOGOUT) ? false : true;
}
/**
* 收到消息后调用
* @throws IOException
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws IOException {
IMMessage m = (IMMessage)msg;
System.out.println((null == m.getSender() ? "" : (m.getSender() + ":")) + removeHtmlTag(m.getContent()));
}
public static String removeHtmlTag(String htmlStr){
String regEx_script="<script[^>]*?>[\\s\\S]*?<\\/script>"; //定义script的正则表达式
String regEx_style="<style[^>]*?>[\\s\\S]*?<\\/style>"; //定义style的正则表达式
String regEx_html="<[^>]+>"; //定义HTML标签的正则表达式
Pattern p_script=Pattern.compile(regEx_script,Pattern.CASE_INSENSITIVE);
Matcher m_script=p_script.matcher(htmlStr);
htmlStr=m_script.replaceAll(""); //过滤script标签
Pattern p_style=Pattern.compile(regEx_style,Pattern.CASE_INSENSITIVE);
Matcher m_style=p_style.matcher(htmlStr);
htmlStr=m_style.replaceAll(""); //过滤style标签
Pattern p_html=Pattern.compile(regEx_html,Pattern.CASE_INSENSITIVE);
Matcher m_html=p_html.matcher(htmlStr);
htmlStr=m_html.replaceAll(""); //过滤html标签
return htmlStr.trim(); //返回文本字符串
}
/**
* 发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("与服务器断开连接:"+cause.getMessage());
ctx.close();
}
}
网友评论