Netty4(三):快速入门

作者: 聪明的奇瑞 | 来源:发表于2018-03-15 13:54 被阅读166次
    • 案例代码下载
    • 初学者的话推荐直接套用 all-in-one 的 jar 包,若熟悉 Netty 可以根据需求添加不同的 jar 包
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.22.Final</version>
    </dependency>
    

    TIME 协议(服务器)

    目标:编写一个 TIME 协议,服务器端在接收到客户端的连接时会向客户端发送一个 32 位的时间戳,并且一旦消息发送成功就会立即关闭

    编写 Handler

    • 首先编写 Handler(处理器),继承 ChannelInboundHandlerAdapter 类(该类默认将事件自动传播到下一个入站处理器)并重写其两个事件方法:
      • channelActive():Channel激活,当有客户端连接时触发
      • exceptionCaught():捕获到异常时触发
    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(final ChannelHandlerContext ctx) {
            final ByteBuf time = ctx.alloc().buffer(4);
            time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
            final ChannelFuture f = ctx.writeAndFlush(time);
            f.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    ctx.close();
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 代码分析:
      1. 我们需要写入一个 32位 的时间戳,因此需要一个至少有 4 个字节的 ByteBuf,通过 ChannelHandlerContext.alloc() 得到一个当前的 ByteBufAllocator,然后分配一个新的缓冲
      2. 通过 ByteBuf 的 write() 方法写入时间戳
      3. 通过 ChannelHandlerContext 对象的 writeAndFlush() 方法将字节容器的数据写入缓冲区并刷新,它会返回一个 ChannelFuture 对象
      4. 因为 Netty 的操作都是异步的,例如下面代码中的消息在被发送之前可能会被先关闭连接
      Channel ch = ...;
      ch.writeAndFlush(message);
      ch.close();
      
      1. 因此 close() 方法需要在数据通过 write() 发送到客户端之后在调用,因此为 ChannelFuture 增加一个 ChannelFutureListener 来监听操作完成事件,并关闭 Channel
      2. 也可以使用简单的预定义监听代码
      f.addListener(ChannelFutureListener.CLOSE);
      
    • ctx.write(Object) 方法不会使消息写入到通道上,它被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者你可以用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目的

    编写服务器类

    • 服务端需要两个 NioEventLoopGroup,它本质是一个线程池:
      • bossGroup:处理客户端连接事件的线程池
      • workerGroup:处理连接后所有事件的线程池
    • ServerBootstrap 是服务端的辅助启动类,用于创建服务端
    • 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
    • 通过 ChannelInitializer 辅助配置客户端连接生成的 Channel,指定需要执行的 Handler
    • 设置 EventLoopGroup 参数:
      • .option:用于设置 bossGroup 的相关参数
      • .childOption:用于设置workerGroup相关参数
    • 绑定端口,并调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture,调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
    public class Server {
    
        private int port;
    
        public Server(int port) {
            this.port = port;
        }
    
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();         // 处理客户端连接事件的线程池
            EventLoopGroup workerGroup = new NioEventLoopGroup();       // 处理连接后所有事件的线程池
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();      // NIO 服务的辅助启动类
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)          // 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new TimeServerHandler());              // 指定需要执行的 Handler
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)          // 设置 bossGroup 的相关参数
                        .childOption(ChannelOption.SO_KEEPALIVE, true);         // 设置 workerGroup 相关参数
    
                ChannelFuture f = bootstrap.bind(port).sync();          // 绑定端口,调用 ChannelFuture 的 sync() 阻塞方法等待绑定完成
                // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
                // 调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 优雅退出机制。。。退出线程池(该方法源码没读过,也不知怎么个优雅方式)
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
    
        public static void main(String[] args) {
            int port = (args.length > 0) ? Integer.parseInt(args[0]) : 8080;
            new Server(port).run();
        }
    }
    

    TIME 协议(客户端)

    目标:连接服务端并接收服务端发送的时间戳消息,输出到控制台

    编写 Handler

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg; 
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    编写客户端类

    • 与服务端唯一不同的是使用 BootStrap 和 Channel 的实现,并调用 connect() 方法连接服务端
    public class Client {
        public static void main(String[] args) throws Exception {
            String host = (args.length == 1) ? args[0] : "localhost";
            int port = (args.length == 2) ? Integer.parseInt(args[1]) : 8080;
    
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(workerGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                    }
                });
    
                ChannelFuture f = bootstrap.connect(host, port).sync();     // 连接服务端,调用 ChannelFuture 的 sync() 阻塞方法等待连接完成
                // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
                // 调用 ChannelFuture 的 sync() 阻塞方法直到客户端关闭链路之后才退出 main() 函数
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    处理基于流的传输

    回到 TIME 客户端例子,服务端发送的数据是一个 32位 的时间戳,如果服务端发送了 16位 的数据呢,那客户端读取的数据就不准确了

    解决方法一

    • 构造一个内部的缓冲,只有直到 4 个字节全部接收到内部缓冲,才进行处理
    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
        private ByteBuf byteBuf;
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            byteBuf = ctx.alloc().buffer(4);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            byteBuf.release();
            byteBuf = null;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            byteBuf.writeBytes(m);
            m.release();
    
            if (byteBuf.readableBytes() >= 4){
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 代码分析:
      • ChannelHandler 有 2 个生命周期监听方法:handlerAdded()、handlerRemoved() ,你可以完成任意初始化任务,只要它不会阻塞很长时间
      • 分配一个 4 个字节的字节容器,将读取的数据写入该字节容器
      • 判断容器中是否有足够的数据(4个字节),如果有在进行业务处理

    解决方法二

    • 方法一虽然解决了问题但修改后的处理器并不简洁,可以把一整个 ChannelHandler 拆分成多个 ChannelHandler 以减少应用复杂度,多个 ChannelHandler 构成一个处理链
    • 因此可以将 TimeClientHandler 拆分成2个处理器:
      • TimeDecoder:解析数据
      • TimeClientHandler:处理业务,跟初始实现一样
    • Netty 提供了 ByteToMessageDecoder 可以帮助完成 TimeDecoder 的开发,ByteToMessageDecoder 是 ChannelInboundHandler 的一个实现类,它可以让数据解析变得更简单
    public class TimeDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            if (byteBuf.readableBytes() < 4) {
                return;
            }
            list.add(byteBuf.readBytes(4));
        }
    }
    
    • 代码分析:
      1. 每当有新数据接收时,ByteToMessageDecoder 都会调用 decode() 方法来处理字节容器 ByteBuf 对象
      2. 如果在 decode() 方法里增加一个对象到 list 对象里面,则意味着解码消息成功,ByteToMessageDecoder 将会丢弃在字节容器 ByteBuf 里已经被读过的数据
    • 修改 ChannelInitializer 将另外一个 ChannelHandler 加入到 ChannelPipeline
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
        }
    });
    

    用 POJO 代替 ByteBuf

    之前例子使用 ByteBuf 作为协议消息的数据结构,目前读取的仅仅是一个 32 位 的数据,直接使用 ByteBuf 不是问题,然而在真实的协议中,数据量肯定不止如此,通过 ByteBuf 处理数据将变的复杂困难,因此下面介绍如何使用 POJO(普通 Java 对象) 代替 ByteBuf

    • 首先定义新类型 UnixTime
    public class UnixTime {
        private final long value;
    
        public UnixTime() {
            this(System.currentTimeMillis() / 1000L + 2208988800L);
        }
    
        public UnixTime(long value) {
            this.value = value;
        }
    
        public long value() {
            return value;
        }
    
        @Override
        public String toString() {
            return new Date((value() - 2208988800L) * 1000L).toString();
        }
    }
    
    • 修改 TimeDecoder 类,返回一个 UnixTime 以代替 ByteBuf
    protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        list.add(new UnixTime(byteBuf.readInt()));
    }
    
    • 修改 TimeClientHandler
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime unixTime = (UnixTime) msg;
        ctx.close();
    }
    
    • 修改 TimeServerHandler
    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }
    
    • 最后还需要实现一个编码器,通过实现 ChannelOutboundHandler 来将 UnixTime 对象转换为 ByteBuf,这里有两个点要注意:
      • 当编码后的数据被写到了通道上 Netty 可以通过 ChannelPromise 对象的标记确认成功或失败
      • 不需要调用 cxt.flush(),因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),如果想自己实现 flush() 方法内容可以自行覆盖这个方法
    public class TimeEncoder extends ChannelOutboundHandlerAdapter{
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            UnixTime m = (UnixTime) msg;
            ByteBuf encoded = ctx.alloc().buffer(4);
            encoded.writeInt((int)m.value());
            ctx.write(encoded, promise); 
        }
    }
    
    • 你可以使用 MessageToByteEncode
    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }
    
    • 最后将 TimeEncoder 加入到 ChannelPipeline,并位于 TimeServerHandler 之前

    关闭应用

    • 关闭一个 Netty 应用只需通过 shutdownGracefully() 方法来关闭所有的 EventLoopGroup
    • 当所有的 EventLoopGroup 被完全地终止,并且对应的所有 channel 都已经被关闭时,Netty 会返回一个 Future 对象来通知你

    相关文章

      网友评论

        本文标题:Netty4(三):快速入门

        本文链接:https://www.haomeiwen.com/subject/rpkhqftx.html