美文网首页Netty
用Netty构建自定义协议

用Netty构建自定义协议

作者: 程序员修炼笔记 | 来源:发表于2017-08-28 10:53 被阅读0次

    在复杂的网络世界中,各种应用之间通信需要依赖各种各样的协议,比如:HTTP,Telnet,FTP,SMTP等等。
    在开发过程中,有时候我们需要构建一些适应自己业务的应用层协议,比如银行业中通用的8583报文,Netty作为目前Java-NIO方向最优秀的框架,可以帮助我们快速构建自定议协议,本文将以一个简洁的例子帮助大家来了解一下。

    协议约定

    协议名称: FF

    image.png

    协议规则:
    如图所示,分为Header和Content两部分,Content的长度为变长,由header中的content-length来定义。

    定义消息对象

    FFHeader.java

    package com.jack.study.netty01.customJianShu.mesage;
    
    //消息的头部
    public class FFHeader {
    
        // 协议版本
        private int version;
        // 消息内容长度
        private int contentLength;
        // 服务名称
        private String sessionId;
    
        public FFHeader(int version, int contentLength, String sessionId) {
            this.version = version;
            this.sessionId = sessionId;
            this.contentLength = contentLength;
        }
    
        public int getVersion() {
            return version;
        }
    
        public void setVersion(int version) {
            this.version = version;
        }
    
        public int getContentLength() {
            return contentLength;
        }
    
        public void setContentLength(int contentLength) {
            this.contentLength = contentLength;
        }
    
        public String getSessionId() {
            return sessionId;
        }
    
        public void setSessionId(String sessionId) {
            this.sessionId = sessionId;
        }
    }
    
    

    FFMessage.java

    package com.jack.study.netty01.customJianShu.mesage;
    
    //消息的主体
    public class FFMessage {
    
        private FFHeader luckHeader;
        private String content;
    
        public FFMessage(FFHeader luckHeader, String content) {
            this.luckHeader = luckHeader;
            this.content = content;
        }
    
        public FFHeader getLuckHeader() {
            return luckHeader;
        }
    
        public void setLuckHeader(FFHeader luckHeader) {
            this.luckHeader = luckHeader;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        @Override
        public String toString() {
            return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]", luckHeader.getVersion(),
                    luckHeader.getContentLength(), luckHeader.getSessionId(), content);
        }
    }
    

    定义编码器

    package com.jack.study.netty01.customJianShu.codec;
    
    import com.jack.study.netty01.customJianShu.mesage.FFHeader;
    import com.jack.study.netty01.customJianShu.mesage.FFMessage;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 编码器
     *
     */
    public class FFEncoder extends MessageToByteEncoder<FFMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, FFMessage message, ByteBuf out) throws Exception {
            // 将Message转换成二进制数据
            FFHeader header = message.getLuckHeader();
            // 写入Header信息
            out.writeInt(header.getVersion());
            out.writeInt(header.getContentLength());
            out.writeBytes(header.getSessionId().getBytes());
    
            // 写入消息主体信息
            out.writeBytes(message.getContent().getBytes());
        }
    }
    

    这里没有什么好说的,就是按定义好的顺序输出即可。

    定义解码器

    package com.jack.study.netty01.customJianShu.codec;
    
    import java.util.List;
    
    import com.jack.study.netty01.customJianShu.mesage.FFHeader;
    import com.jack.study.netty01.customJianShu.mesage.FFMessage;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    /**
     * 解码器
     *
     */
    public class FFDecoder extends ByteToMessageDecoder {
    
        private final static int HEADER_LENGTH = 44;// header的长度
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 长度不足,退出
            if (in.readableBytes() < HEADER_LENGTH) {
                return;
            }
            // 获取协议的版本
            int version = in.readInt();
            // 获取消息长度
            int contentLength = in.readInt();
            // 获取SessionId
            byte[] sessionByte = new byte[36];
            in.readBytes(sessionByte);
            String sessionId = new String(sessionByte);
            // 组装协议头
            FFHeader header = new FFHeader(version, contentLength, sessionId);
    
            // 长度不足重置读index,退出
            if (in.readableBytes() < contentLength) {
                in.setIndex(in.readerIndex() - HEADER_LENGTH, in.writerIndex());
                return;
            }
    
            byte[] content = new byte[contentLength];
            // 读取消息内容
            in.readBytes(content);
    
            FFMessage message = new FFMessage(header, new String(content));
    
            out.add(message);
        }
    }
    这个类是核心的处理了,其中两个IF代码段的处理是为了解决拆包粘包的问题。如果没有这两段消息的解析在多条消息时就会产生错乱。
    
    

    Server监听消息

    package com.jack.study.netty01.customJianShu.server;
    
    import com.jack.study.netty01.customJianShu.codec.FFDecoder;
    import com.jack.study.netty01.customJianShu.codec.FFEncoder;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class Server {
    
        // 指定端口号
        private static final int PORT = 8888;
    
        public static void main(String args[]) throws InterruptedException {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 指定socket的一些属性
                serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)  // 指定是一个NIO连接通道
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>(){
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
    
                                // 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
                                // 这里必须给每个Handler都添加一个独立的Decoder.
                                pipeline.addLast(new FFEncoder());
                                pipeline.addLast(new FFDecoder());
    
                                // 添加逻辑控制层
                                pipeline.addLast(new ServerHandler());
                                
                            }
                            
                        });
    
                // 绑定对应的端口号,并启动开始监听端口上的连接
                Channel ch = serverBootstrap.bind(PORT).sync().channel();
    
                System.out.printf("luck协议启动地址:127.0.0.1:%d/\n", PORT);
    
                // 等待关闭,同步端口
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    package com.jack.study.netty01.customJianShu.server;
    
    import com.jack.study.netty01.customJianShu.mesage.FFMessage;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class ServerHandler extends SimpleChannelInboundHandler<FFMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FFMessage msg) throws Exception {
            // 简单地打印出server接收到的消息
            System.out.println("接收:"+msg);
        }
    }
    
    

    Client端发送消息

    package com.jack.study.netty01.customJianShu.client;
    
    import java.util.UUID;
    
    import com.jack.study.netty01.customJianShu.codec.FFDecoder;
    import com.jack.study.netty01.customJianShu.codec.FFEncoder;
    import com.jack.study.netty01.customJianShu.mesage.FFHeader;
    import com.jack.study.netty01.customJianShu.mesage.FFMessage;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client {
    
        public static void main(String args[]) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 添加编码器
                        pipeline.addLast(new FFEncoder());
                        // 添加解码器
                        pipeline.addLast(new FFDecoder());
                        // 业务处理类(只打印了消息内容)
                        pipeline.addLast(new ClientHandler());
                    }
                });
    
                // 连接服务端
                Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
                int version = 1;
                String sessionId = UUID.randomUUID().toString();
                String str = "Hello!";
    
                // 发送1000000条消息
                for (int i = 0; i < 100000; i++) {
                    String content = str + "----" + i;
                    FFHeader header = new FFHeader(version, content.length(), sessionId);
                    FFMessage message = new FFMessage(header, content);
                    ch.writeAndFlush(message);
                }
    
                ch.closeFuture().sync();
    
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    
    package com.jack.study.netty01.customJianShu.client;
    
    import com.jack.study.netty01.customJianShu.mesage.FFMessage;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class ClientHandler extends SimpleChannelInboundHandler<FFMessage> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, FFMessage message) throws Exception {
            System.out.println(message);
        }
    }
    
    

    这里为什么发送100000条消息,而不是1条,主要是为了测试消息处理的确性,只发送1条是无法暴露拆包粘包问题的。

    测试

    1. 启动Server.java
    2. 启动Client.java
      运行截图:
    image.png image.png

    参考文章:http://www.jianshu.com/p/ba21eb32ae97

    相关文章

      网友评论

        本文标题:用Netty构建自定义协议

        本文链接:https://www.haomeiwen.com/subject/mtbwdxtx.html