美文网首页程序员
利用Netty构建自定义协议的通信

利用Netty构建自定义协议的通信

作者: whthomas | 来源:发表于2016-04-15 23:37 被阅读6185次

    在复杂的网络世界中,各种应用之间通信需要依赖各种各样的协议,比如:HTTP,Telnet,FTP,SMTP等等。

    在开发过程中,有时候我们需要构建一些适应自己业务的应用层协议,Netty作为一个非常优秀的网络通信框架,可以帮助我们完成自定义协议的通信。

    一般而言,我们制定的协议需要两个部分:

    • Header : 协议头部,放置一些Meta信息。
    • Content : 应用之间交互的信息主体。

    例如:

    | Version | Content-Length | SessionId | Content |
    

    其中Version,Content-Length,SessionId就是Header信息,Content就是交互的主体。给这个协议起一个名字叫做luck,依照luck协议,我们构建一个类。

    // 消息的头部
    public class LuckHeader {
    
        // 协议版本
        private int version;
        // 消息内容长度
        private int contentLength;
        // 服务名称
        private String sessionId;
    
        public LuckHeader(int version, int contentLength, String sessionId) {
            this.version = version;
            this.contentLength = contentLength;
            this.sessionId = sessionId;
        }
    
        public int getVersion() {
            return version;
        }
    
        public void setVersion(int version) {
            this.version = version;
        }
    
        public int getContentLength() {
            return contentLength;
        }
    
        public void setContentLength(int contentLength) {
            this.contentLength = contentLength;
        }
    
        public String getSessionId() {
            return sessionId;
        }
    
        public void setSessionId(String sessionId) {
            this.sessionId = sessionId;
        }
    }
    
    // 消息的主体
    public class LuckMessage {
    
        private LuckHeader luckHeader;
        private String content;
    
        public LuckMessage(LuckHeader luckHeader, String content) {
            this.luckHeader = luckHeader;
            this.content = content;
        }
    
        public LuckHeader getLuckHeader() {
            return luckHeader;
        }
    
        public void setLuckHeader(LuckHeader luckHeader) {
            this.luckHeader = luckHeader;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        @Override
        public String toString() {
            return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]",
                    luckHeader.getVersion(),
                    luckHeader.getContentLength(),
                    luckHeader.getSessionId(),
                    content);
        }
    }
    

    那么我们在Netty中如何去对这种自定义的协议编码(Encode)呢?

    Netty中对数据进行编码解码需要利用Codec组件,Codec组件中分为:

    • Encoder : 编码器,将出站的数据从一种格式转换成另外一种格式。
    • Decoder : 解码器,将入站的数据从一种格式转换成另外一种格式。

    LuckDecoder.java

    public class LuckDecoder extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    
            // 获取协议的版本
            int version = in.readInt();
            // 获取消息长度
            int contentLength = in.readInt();
            // 获取SessionId
            byte[] sessionByte = new byte[36];
            in.readBytes(sessionByte);
            String sessionId = new String(sessionByte);
    
            // 组装协议头
            LuckHeader header = new LuckHeader(version, contentLength, sessionId);
    
            // 读取消息内容
            byte[] content = in.readBytes(in.readableBytes()).array();
    
            LuckMessage message = new LuckMessage(header, new String(content));
    
            out.add(message);
        }
    }
    

    LuckEncoder.java

    @ChannelHandler.Sharable
    public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf out) throws Exception {
    
            // 将Message转换成二进制数据
            LuckHeader header = message.getLuckHeader();
    
            // 这里写入的顺序就是协议的顺序.
    
            // 写入Header信息
            out.writeInt(header.getVersion());
            out.writeInt(message.getContent().length());
            out.writeBytes(header.getSessionId().getBytes());
    
            // 写入消息主体信息
            out.writeBytes(message.getContent().getBytes());
        }
    }
    

    编写一个逻辑控制层,展现server接收到的协议信息:

    public class NettyLuckHandler extends SimpleChannelInboundHandler<Message> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
            // 简单地打印出server接收到的消息
            System.out.println(msg.toString());
        }
    }
    

    编写完成之后,把编解码器逻辑控制器放入初始化组件中:

    public class NettyLuckInitializer extends ChannelInitializer<SocketChannel> {
    
        private static final LuckEncoder ENCODER = new LuckEncoder();
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
    
            ChannelPipeline pipeline = channel.pipeline();
    
            // 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
            // 这里必须给每个Handler都添加一个独立的Decoder.
            pipeline.addLast(ENCODER);
            pipeline.addLast(new LuckDecoder());
    
            // 添加逻辑控制层
            pipeline.addLast(new NettyLuckHandler());
    
        }
    }
    

    编写一个服务端启动类:

    public class NettyLuckServer {
    
        // 指定端口号
        private static final int PORT = 8888;
    
        public static void main(String args[]) throws InterruptedException {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 指定socket的一些属性
                serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)  // 指定是一个NIO连接通道
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new NettyLuckInitializer());
    
                // 绑定对应的端口号,并启动开始监听端口上的连接
                Channel ch = serverBootstrap.bind(PORT).sync().channel();
    
                System.out.printf("luck协议启动地址:127.0.0.1:%d/\n", PORT);
    
                // 等待关闭,同步端口
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    光有服务端并不行,没法测试我们的server是不是成功了。所以我们还需要编写一个客户端程序。

    LuckClientInitializer.java

    public class LuckClientInitializer extends ChannelInitializer<SocketChannel> {
    
        private static final LuckEncoder ENCODER = new LuckEncoder();
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
    
            ChannelPipeline pipeline = channel.pipeline();
    
            // 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
            // 这里必须给每个Handler都添加一个独立的Decoder.
            pipeline.addLast(ENCODER);
            pipeline.addLast(new LuckDecoder());
    
            // and then business logic.
            pipeline.addLast(new NettyLuckClientHandler());
    
        }
    }
    

    LuckClientHandler.java

    public class LuckClientHandler extends SimpleChannelInboundHandler<LuckMessage> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, LuckMessage message) throws Exception {
            System.out.println(message);
        }
    }
    

    LuckClient.java

    public class LuckClient {
    
        public static void main(String args[]) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new NettyLuckInitializer());
    
                // Start the connection attempt.
                Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
    
                int version = 1;
                String sessionId = UUID.randomUUID().toString();
                String content = "I'm the luck protocol!";
    
                LuckHeader header = new LuckHeader(version, content.length(), sessionId);
                LuckMessage message = new LuckMessage(header, content);
                ch.writeAndFlush(message);
    
                ch.close();
    
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    

    先运行NettyLuckServer.java,然后再去运行LuckClient.java可以看到控制的输出

    四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelRegistered
    信息: [id: 0x92534c29] REGISTERED
    四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler bind
    信息: [id: 0x92534c29] BIND(0.0.0.0/0.0.0.0:8888)
    luck协议启动地址:127.0.0.1:8888 
    四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelActive
    信息: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
    四月 15, 2016 11:31:54 下午 io.netty.handler.logging.LoggingHandler logMessage
    信息: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] RECEIVED: [id: 0x67a91c6b, L:/127.0.0.1:8888 - R:/127.0.0.1:53585]
    [version=1,contentLength=22,sessionId=cff7b3ea-1188-4314-abaa-de04db32d39f,content=I'm the luck protocol!]
    

    服务端顺利解析出了我们自定义的luck协议。

    相关文章

      网友评论

        本文标题:利用Netty构建自定义协议的通信

        本文链接:https://www.haomeiwen.com/subject/wwinlttx.html