美文网首页
基于netty手写消息推送系统

基于netty手写消息推送系统

作者: 剑道_7ffc | 来源:发表于2020-04-19 15:16 被阅读0次

    自定义报文协议

    上行命令

    指服务器向客户端发送的消息内容
    SYSTEM 系统命令,例如[命令][命令发送时间][接收人] - 系统提示内容例如:[SYSTEM][124343423123][Edward] – Student加入聊天室

    下行命令

    指客户端想服务器发送的命令
    LOGIN登录动作:[命令][命令发送时间][命令发送人][终端类型]例如:[LOGIN][124343423123][Edward][WebSocket]
    LOGOUT 退出登录动作:[命令][命令发送时间][命令发送人]例如:[LOGOUT][124343423123][Edward]
    CHAT聊天:[命令][命令发送时间][命令发送人][命令接收人] – 聊天内容例如:[CHAT][124343423123][Edward][ALL] - 大家好,我是Edward!
    FLOWER 发送送鲜花特效:[命令][命令发送时间][命令发送人][终端类型][命令接收人]例如:[FLOWER][124343423123][you][WebSocket][ALL]

    pom配置文件

    <dependencies>
            <dependency>
              <groupId>io.netty</groupId>
              <artifactId>netty-all</artifactId>
              <version>4.1.6.Final</version>
            </dependency>
            
            <dependency>
              <groupId>org.msgpack</groupId>
              <artifactId>msgpack</artifactId>
              <version>0.6.12</version>
            </dependency>
    
             <dependency>
              <groupId>com.alibaba</groupId>
              <artifactId>fastjson</artifactId>
              <version>1.2.4</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.10</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
            </dependency>
        </dependencies>
    

    核心代码

    基础类
    /**
     * 自定义IM协议,Instant Messaging Protocol即时通信协议
     *
     */
    public enum IMP {
        /** 系统消息 */
        SYSTEM("SYSTEM"),
        /** 登录指令 */
        LOGIN("LOGIN"),
        /** 登出指令 */
        LOGOUT("LOGOUT"),
        /** 聊天消息 */
        CHAT("CHAT"),
        /** 送鲜花 */
        FLOWER("FLOWER");
    
        private String name;
        
        public static boolean isIMP(String content){
            return content.matches("^\\[(SYSTEM|LOGIN|LOGIN|CHAT)\\]");
        }
        
        IMP(String name){
            this.name = name;
        }
        
        public String getName(){
            return this.name;
        }
        
        public String toString(){
            return this.name;
        }
        
    }
    /**
     * 自定义消息实体类
     *
     */
    @Message
    @Data
    public class IMMessage{
        
        private String addr;        //IP地址及端口
        private String cmd;     //命令类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
        private long time;      //命令发送时间
        private int online;     //当前在线人数
        private String sender;  //发送人
        private String receiver;    //接收人
        private String content; //消息内容
        private String terminal; //终端
        public IMMessage(){}
        public IMMessage(String cmd,long time,int online,String content){
            this.cmd = cmd;
            this.time = time;
            this.online = online;
            this.content = content;
            this.terminal = terminal;
        }
        public IMMessage(String cmd,String terminal,long time,String sender){
            this.cmd = cmd;
            this.time = time;
            this.sender = sender;
            this.terminal = terminal;
        }
        public IMMessage(String cmd,long time,String sender,String content){
            this.cmd = cmd;
            this.time = time;
            this.sender = sender;
            this.content = content;
            this.terminal = terminal;
        }
    
        @Override
        public String toString() {
            return "IMMessage{" +
                    "addr='" + addr + '\'' +
                    ", cmd='" + cmd + '\'' +
                    ", time=" + time +
                    ", online=" + online +
                    ", sender='" + sender + '\'' +
                    ", receiver='" + receiver + '\'' +
                    ", content='" + content + '\'' +
                    '}';
        }
    }
    
    handler
    /**
     * 主要用于自定义协议内容的逻辑处理
     *
     */
    public class MsgProcessor {
        
        //记录在线用户
        private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        
        //定义一些扩展属性
        public static final AttributeKey<String> NICK_NAME = AttributeKey.valueOf("nickName");
        public static final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
        public static final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
        public static final AttributeKey<String> FROM = AttributeKey.valueOf("from");
        
        //自定义解码器
        private IMDecoder decoder = new IMDecoder();
        //自定义编码器
        private IMEncoder encoder = new IMEncoder();
        
        /**
         * 获取用户昵称
         * @param client
         * @return
         */
        public String getNickName(Channel client){
            return client.attr(NICK_NAME).get();
        }
        /**
         * 获取用户远程IP地址
         * @param client
         * @return
         */
        public String getAddress(Channel client){
            return client.remoteAddress().toString().replaceFirst("/","");
        }
        
        /**
         * 获取扩展属性
         * @param client
         * @return
         */
        public JSONObject getAttrs(Channel client){
            try{
                return client.attr(ATTRS).get();
            }catch(Exception e){
                return null;
            }
        }
        
        /**
         * 获取扩展属性
         * @param client
         * @return
         */
        private void setAttrs(Channel client,String key,Object value){
            try{
                JSONObject json = client.attr(ATTRS).get();
                json.put(key, value);
                client.attr(ATTRS).set(json);
            }catch(Exception e){
                JSONObject json = new JSONObject();
                json.put(key, value);
                client.attr(ATTRS).set(json);
            }
        }
        
        /**
         * 登出通知
         * @param client
         */
        public void logout(Channel client){
            //如果nickName为null,没有遵从聊天协议的连接,表示未非法登录
            if(getNickName(client) == null){ return; }
            for (Channel channel : onlineUsers) {
                IMMessage request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "离开");
                String content = encoder.encode(request);
                channel.writeAndFlush(new TextWebSocketFrame(content));
            }
            onlineUsers.remove(client);
        }
        
        /**
         * 发送消息
         * @param client
         * @param msg
         */
        public void sendMsg(Channel client,IMMessage msg){
    
            sendMsg(client,encoder.encode(msg));
        }
        
        /**
         * 发送消息
         * @param client
         * @param msg
         */
        public void sendMsg(Channel client,String msg){
    
            IMMessage request = decoder.decode(msg);
            if(null == request){ return; }
            
            String addr = getAddress(client);
            
            if(request.getCmd().equals(IMP.LOGIN.getName())){
                client.attr(NICK_NAME).getAndSet(request.getSender());
                client.attr(IP_ADDR).getAndSet(addr);
                client.attr(FROM).getAndSet(request.getTerminal());
                onlineUsers.add(client);
                
                for (Channel channel : onlineUsers) {
                    boolean isself = (channel == client);
                    if(!isself){
                        request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "加入");
                    }else{
                        request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), "已与服务器建立连接!");
                    }
    
                    if("Console".equals(channel.attr(FROM).get())){
                        channel.writeAndFlush(request);
                        continue;
                    }
                    String content = encoder.encode(request);
                    channel.writeAndFlush(new TextWebSocketFrame(content));
                }
            }else if(request.getCmd().equals(IMP.CHAT.getName())){
                for (Channel channel : onlineUsers) {
                    boolean isself = (channel == client);
                    if (isself) {
                        request.setSender("you");
                    }else{
                        request.setSender(getNickName(client));
                    }
                    request.setTime(sysTime());
    
                    if("Console".equals(channel.attr(FROM).get()) & !isself){
                        channel.writeAndFlush(request);
                        continue;
                    }
                    String content = encoder.encode(request);
                    channel.writeAndFlush(new TextWebSocketFrame(content));
                }
            }else if(request.getCmd().equals(IMP.FLOWER.getName())){
                JSONObject attrs = getAttrs(client);
                long currTime = sysTime();
                if(null != attrs){
                    long lastTime = attrs.getLongValue("lastFlowerTime");
                    //60秒之内不允许重复刷鲜花
                    int secends = 10;
                    long sub = currTime - lastTime;
                    if(sub < 1000 * secends){
                        request.setSender("you");
                        request.setCmd(IMP.SYSTEM.getName());
                        request.setContent("您送鲜花太频繁," + (secends - Math.round(sub / 1000)) + "秒后再试");
    
                        String content = encoder.encode(request);
                        client.writeAndFlush(new TextWebSocketFrame(content));
                        return;
                    }
                }
                
                //正常送花
                for (Channel channel : onlineUsers) {
                    if (channel == client) {
                        request.setSender("you");
                        request.setContent("你给大家送了一波鲜花雨");
                        setAttrs(client, "lastFlowerTime", currTime);
                    }else{
                        request.setSender(getNickName(client));
                        request.setContent(getNickName(client) + "送来一波鲜花雨");
                    }
                    request.setTime(sysTime());
                    
                    String content = encoder.encode(request);
                    channel.writeAndFlush(new TextWebSocketFrame(content));
                }
            }
        }
        
        /**
         * 获取系统时间
         * @return
         */
        private Long sysTime(){
            return System.currentTimeMillis();
        }
    }
    /**
     * 自定义IM协议的编码器
     */
    public class IMDecoder extends ByteToMessageDecoder {
    
        //解析IM写一下请求内容的正则
        private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");
        
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {
            try{
                IMMessage message = new MessagePack().read(in.nioBuffer(), IMMessage.class);
                out.add(message);
                in.clear();
            }catch(MessageTypeException e){
                ctx.channel().pipeline().remove(this);
            }
        }
        
        /**
         * 字符串解析成自定义即时通信协议
         * @param msg
         * @return
         */
        public IMMessage decode(String msg){
            if(null == msg || "".equals(msg.trim())){ return null; }
            try{
                Matcher m = pattern.matcher(msg);
                String header = "";
                String content = "";
                if(m.matches()){
                    header = m.group(1);
                    content = m.group(3);
                }
                
                String [] heards = header.split("\\]\\[");
                long time = 0;
                try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
                String nickName = heards[2];
                //昵称最多十个字
                nickName = nickName.length() < 10 ? nickName : nickName.substring(0, 9);
                
                if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
                    return new IMMessage(heards[0],heards[3],time,nickName);
                }else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
                    return new IMMessage(heards[0],time,nickName,content);
                }else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
                    return new IMMessage(heards[0],heards[3],time,nickName);
                }else{
                    return null;
                }
            }catch(Exception e){
                e.printStackTrace();
                return null;
            }
        }
    }
    public class IMEncoder extends MessageToByteEncoder<IMMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
                throws Exception {
            out.writeBytes(new MessagePack().write(msg));
        }
        
        public String encode(IMMessage msg){
            if(null == msg){ return ""; }
            String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
            if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
                IMP.FLOWER.getName().equals(msg.getCmd())){
                prex += ("[" + msg.getSender() + "][" + msg.getTerminal() + "]");
            }else if(IMP.CHAT.getName().equals(msg.getCmd())){
                prex += ("[" + msg.getSender() + "]");
            }else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
                prex += ("[" + msg.getOnline() + "]");
            }
            if(!(null == msg.getContent() || "".equals(msg.getContent()))){
                prex += (" - " + msg.getContent());
            }
            return prex;
        }
    
    }
    
    服务端
    @Slf4j
    public class ChatServer{
        
        private int port = 8080;
        public void start(int port){
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline pipeline = ch.pipeline();
    
                                /**
                                 *输入:
                                 * 前端是符合自定义格式的字符串 --> IMMessage --> 向chanel写(TerminalServerHandler)
                                 * http请求 --> HttpServerCodec(HttpMessage) -->  HttpObjectAggregator(FullHttpRequest) --> HttpServerHandler
                                 * webSocket请求(Web TCP) --> WebSocketServerProtocolHandler(WebSocketFrame) --> WebSocketServerHandler
                                 * 输出:
                                 * IMMessage --> IMEncoder
                                 * http: HttpResponse --> HttpServerCodec
                                 */
    
                                /** 解析自定义协议 */
                                pipeline.addLast(new IMDecoder());  //Inbound
                                pipeline.addLast(new IMEncoder());  //Outbound
    
                                pipeline.addLast(new TerminalServerHandler());  //Inbound
    
                                /** 解析Http请求 */
                                pipeline.addLast(new HttpServerCodec());  //Outbound
    
                                //主要是将同一个http请求或响应的多个消息对象变成一个 fullHttpRequest完整的消息对象
                                pipeline.addLast(new HttpObjectAggregator(64 * 1024));//Inbound
    
                                //主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的 ,加上这个handler我们就不用考虑这个问题了
                                pipeline.addLast(new ChunkedWriteHandler());//Inbound、Outbound
    
                                pipeline.addLast(new HttpServerHandler());//Inbound
    
                                /** 解析WebSocket请求 */
                                pipeline.addLast(new WebSocketServerProtocolHandler("/im"));    //Inbound
    
                                pipeline.addLast(new WebSocketServerHandler()); //Inbound
    
                            }
                        });
                ChannelFuture f = b.bind(this.port).sync();
                log.info("服务已启动,监听端口" + this.port);
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
        public void start() {
            start(this.port);
        }
        
        
        public static void main(String[] args) throws IOException{
            if(args.length > 0) {
                new ChatServer().start(Integer.valueOf(args[0]));
            }else{
                new ChatServer().start();
            }
        }
    }
    @Slf4j
    public class TerminalServerHandler extends SimpleChannelInboundHandler<IMMessage>{
    
        private MsgProcessor processor = new MsgProcessor();
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws Exception {
            processor.sendMsg(ctx.channel(), msg);
        }
    
        /**
         * 异常处理
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.info("Socket Client: 与客户端断开连接:" + cause.getMessage());
            ctx.close();
        }
    
    }
    @Slf4j
    public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        private MsgProcessor processor = new MsgProcessor();
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
            processor.sendMsg(ctx.channel(), msg.text());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            Channel client = ctx.channel();
            String addr = processor.getAddress(client);
            log.info("WebSocket Client:" + addr + "异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    @Slf4j
    public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        
        //获取class路径
        private URL baseURL = HttpServerHandler.class.getResource("");
        private final String webroot = "webroot";
        
        private File getResource(String fileName) throws Exception{
            String basePath = baseURL.toURI().toString();
            int start = basePath.indexOf("classes/");
            basePath = (basePath.substring(0,start) + "/" + "classes/").replaceAll("/+","/");
    
            String path = basePath + webroot + "/" + fileName;
            path = !path.contains("file:") ? path : path.substring(5);
            path = path.replaceAll("//", "/");
            return new File(path);
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
                String uri = request.getUri();
            
            RandomAccessFile file = null;
            try{
                    String page = uri.equals("/") ? "chat.html" : uri;
                    file =  new RandomAccessFile(getResource(page), "r");
            }catch(Exception e){
                    ctx.fireChannelRead(request.retain());
                    return;
            }
    
            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            String contextType = "text/html;";
            if(uri.endsWith(".css")){
                    contextType = "text/css;";
            }else if(uri.endsWith(".js")){
                    contextType = "text/javascript;";
            }else if(uri.toLowerCase().matches(".*\\.(jpg|png|gif)$")){
                    String ext = uri.substring(uri.lastIndexOf("."));
                    contextType = "image/" + ext;
            }
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, contextType + "charset=utf-8;");
    
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
    
            if (keepAlive) {
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);
            
            ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
    
            file.close();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            Channel client = ctx.channel();
            log.info("Client:"+client.remoteAddress()+"异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    客户端
    /**
     * 客户端
     */
    public class ChatClient  {
        
        private ChatClientHandler clientHandler;
        private String host;
        private int port;
        
        public ChatClient(String nickName){
            this.clientHandler = new ChatClientHandler(nickName);
        }
        
        public void connect(String host,int port){
                this.host = host;
                this.port = port;
            /**
             *输入:
             * byte --> IMMessage -->  向chanel写(ChatClientHandler)
             * 输出:
             * ChatClientHandler --> IMMessage --> IMEncoder
             * http: HttpResponse --> HttpServerCodec
             */
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new IMDecoder());
                        ch.pipeline().addLast(new IMEncoder());
    
                        ch.pipeline().addLast(clientHandler);
                    }
                });
                ChannelFuture f = b.connect(this.host, this.port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws IOException{
            new ChatClient("edward").connect("127.0.0.1",8080);
            
        }
    }
    /**
     * 聊天客户端逻辑实现
     */
    @Slf4j
    public class ChatClientHandler extends SimpleChannelInboundHandler<IMMessage> {
    
        private ChannelHandlerContext ctx;
        private String nickName;
        public ChatClientHandler(String nickName){
            this.nickName = nickName;
        }
        
        /**启动客户端控制台*/
        private void session() throws IOException {
                new Thread(){
                    public void run(){
                        System.out.println(nickName + ",你好,请在控制台输入对话内容");
                        IMMessage message = null;
                        Scanner scanner = new Scanner(System.in);
                        do{
                                if(scanner.hasNext()){
                                    String input = scanner.nextLine();
                                    if("exit".equals(input)){
                                        message = new IMMessage(IMP.LOGOUT.getName(),"Console",System.currentTimeMillis(),nickName);
                                    }else{
                                        message = new IMMessage(IMP.CHAT.getName(),System.currentTimeMillis(),nickName,input);
                                    }
                                }
                        }
                        while (sendMsg(message));
                        scanner.close();
                    }
                }.start();
        }
        
        /**
         * tcp链路建立成功后调用
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                this.ctx = ctx;
            IMMessage message = new IMMessage(IMP.LOGIN.getName(),"Console",System.currentTimeMillis(),this.nickName);
            sendMsg(message);
            log.info("成功连接服务器,已执行登录动作");
            session();
        }
        /**
         * 发送消息
         * @param msg
         * @return
         * @throws IOException 
         */
        private boolean sendMsg(IMMessage msg){
            ctx.channel().writeAndFlush(msg);
            System.out.println("继续输入开始对话...");
            return msg.getCmd().equals(IMP.LOGOUT) ? false : true;
        }
        /**
         * 收到消息后调用
         * @throws IOException 
         */
        @Override
        public void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws IOException {
            IMMessage m = (IMMessage)msg;
            System.out.println((null == m.getSender() ? "" : (m.getSender() + ":")) + removeHtmlTag(m.getContent()));
        }
    
    
        public static String removeHtmlTag(String htmlStr){
            String regEx_script="<script[^>]*?>[\\s\\S]*?<\\/script>"; //定义script的正则表达式
            String regEx_style="<style[^>]*?>[\\s\\S]*?<\\/style>"; //定义style的正则表达式
            String regEx_html="<[^>]+>"; //定义HTML标签的正则表达式
    
            Pattern p_script=Pattern.compile(regEx_script,Pattern.CASE_INSENSITIVE);
            Matcher m_script=p_script.matcher(htmlStr);
            htmlStr=m_script.replaceAll(""); //过滤script标签
    
            Pattern p_style=Pattern.compile(regEx_style,Pattern.CASE_INSENSITIVE);
            Matcher m_style=p_style.matcher(htmlStr);
            htmlStr=m_style.replaceAll(""); //过滤style标签
    
            Pattern p_html=Pattern.compile(regEx_html,Pattern.CASE_INSENSITIVE);
            Matcher m_html=p_html.matcher(htmlStr);
            htmlStr=m_html.replaceAll(""); //过滤html标签
    
            return htmlStr.trim(); //返回文本字符串
        }
    
        /**
         * 发生异常时调用
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.info("与服务器断开连接:"+cause.getMessage());
            ctx.close();
        }
    }
    

    相关文章

      网友评论

          本文标题:基于netty手写消息推送系统

          本文链接:https://www.haomeiwen.com/subject/bkcqvhtx.html