Netty4(三):快速入门

作者: 聪明的奇瑞 | 来源:发表于2018-03-15 13:54 被阅读166次
  • 案例代码下载
  • 初学者的话推荐直接套用 all-in-one 的 jar 包,若熟悉 Netty 可以根据需求添加不同的 jar 包
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>

TIME 协议(服务器)

目标:编写一个 TIME 协议,服务器端在接收到客户端的连接时会向客户端发送一个 32 位的时间戳,并且一旦消息发送成功就会立即关闭

编写 Handler

  • 首先编写 Handler(处理器),继承 ChannelInboundHandlerAdapter 类(该类默认将事件自动传播到下一个入站处理器)并重写其两个事件方法:
    • channelActive():Channel激活,当有客户端连接时触发
    • exceptionCaught():捕获到异常时触发
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 代码分析:
    1. 我们需要写入一个 32位 的时间戳,因此需要一个至少有 4 个字节的 ByteBuf,通过 ChannelHandlerContext.alloc() 得到一个当前的 ByteBufAllocator,然后分配一个新的缓冲
    2. 通过 ByteBuf 的 write() 方法写入时间戳
    3. 通过 ChannelHandlerContext 对象的 writeAndFlush() 方法将字节容器的数据写入缓冲区并刷新,它会返回一个 ChannelFuture 对象
    4. 因为 Netty 的操作都是异步的,例如下面代码中的消息在被发送之前可能会被先关闭连接
    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();
    
    1. 因此 close() 方法需要在数据通过 write() 发送到客户端之后在调用,因此为 ChannelFuture 增加一个 ChannelFutureListener 来监听操作完成事件,并关闭 Channel
    2. 也可以使用简单的预定义监听代码
    f.addListener(ChannelFutureListener.CLOSE);
    
  • ctx.write(Object) 方法不会使消息写入到通道上,它被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者你可以用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目的

编写服务器类

  • 服务端需要两个 NioEventLoopGroup,它本质是一个线程池:
    • bossGroup:处理客户端连接事件的线程池
    • workerGroup:处理连接后所有事件的线程池
  • ServerBootstrap 是服务端的辅助启动类,用于创建服务端
  • 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
  • 通过 ChannelInitializer 辅助配置客户端连接生成的 Channel,指定需要执行的 Handler
  • 设置 EventLoopGroup 参数:
    • .option:用于设置 bossGroup 的相关参数
    • .childOption:用于设置workerGroup相关参数
  • 绑定端口,并调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture,调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
public class Server {

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();         // 处理客户端连接事件的线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup();       // 处理连接后所有事件的线程池
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();      // NIO 服务的辅助启动类
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)          // 指定连接该服务器的 Channel 类型为 NioServerSocketChannel
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());              // 指定需要执行的 Handler
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // 设置 bossGroup 的相关参数
                    .childOption(ChannelOption.SO_KEEPALIVE, true);         // 设置 workerGroup 相关参数

            ChannelFuture f = bootstrap.bind(port).sync();          // 绑定端口,调用 ChannelFuture 的 sync() 阻塞方法等待绑定完成
            // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
            // 调用 ChannelFuture 的 sync() 阻塞方法直到服务端关闭链路之后才退出 main() 函数
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 优雅退出机制。。。退出线程池(该方法源码没读过,也不知怎么个优雅方式)
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        int port = (args.length > 0) ? Integer.parseInt(args[0]) : 8080;
        new Server(port).run();
    }
}

TIME 协议(客户端)

目标:连接服务端并接收服务端发送的时间戳消息,输出到控制台

编写 Handler

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; 
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

编写客户端类

  • 与服务端唯一不同的是使用 BootStrap 和 Channel 的实现,并调用 connect() 方法连接服务端
public class Client {
    public static void main(String[] args) throws Exception {
        String host = (args.length == 1) ? args[0] : "localhost";
        int port = (args.length == 2) ? Integer.parseInt(args[1]) : 8080;

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            ChannelFuture f = bootstrap.connect(host, port).sync();     // 连接服务端,调用 ChannelFuture 的 sync() 阻塞方法等待连接完成
            // 调用 closeFuture() 方法返回此通道关闭时的 ChannelFuture
            // 调用 ChannelFuture 的 sync() 阻塞方法直到客户端关闭链路之后才退出 main() 函数
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

处理基于流的传输

回到 TIME 客户端例子,服务端发送的数据是一个 32位 的时间戳,如果服务端发送了 16位 的数据呢,那客户端读取的数据就不准确了

解决方法一

  • 构造一个内部的缓冲,只有直到 4 个字节全部接收到内部缓冲,才进行处理
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf byteBuf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        byteBuf = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        byteBuf.release();
        byteBuf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        byteBuf.writeBytes(m);
        m.release();

        if (byteBuf.readableBytes() >= 4){
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 代码分析:
    • ChannelHandler 有 2 个生命周期监听方法:handlerAdded()、handlerRemoved() ,你可以完成任意初始化任务,只要它不会阻塞很长时间
    • 分配一个 4 个字节的字节容器,将读取的数据写入该字节容器
    • 判断容器中是否有足够的数据(4个字节),如果有在进行业务处理

解决方法二

  • 方法一虽然解决了问题但修改后的处理器并不简洁,可以把一整个 ChannelHandler 拆分成多个 ChannelHandler 以减少应用复杂度,多个 ChannelHandler 构成一个处理链
  • 因此可以将 TimeClientHandler 拆分成2个处理器:
    • TimeDecoder:解析数据
    • TimeClientHandler:处理业务,跟初始实现一样
  • Netty 提供了 ByteToMessageDecoder 可以帮助完成 TimeDecoder 的开发,ByteToMessageDecoder 是 ChannelInboundHandler 的一个实现类,它可以让数据解析变得更简单
public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        list.add(byteBuf.readBytes(4));
    }
}
  • 代码分析:
    1. 每当有新数据接收时,ByteToMessageDecoder 都会调用 decode() 方法来处理字节容器 ByteBuf 对象
    2. 如果在 decode() 方法里增加一个对象到 list 对象里面,则意味着解码消息成功,ByteToMessageDecoder 将会丢弃在字节容器 ByteBuf 里已经被读过的数据
  • 修改 ChannelInitializer 将另外一个 ChannelHandler 加入到 ChannelPipeline
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
    }
});

用 POJO 代替 ByteBuf

之前例子使用 ByteBuf 作为协议消息的数据结构,目前读取的仅仅是一个 32 位 的数据,直接使用 ByteBuf 不是问题,然而在真实的协议中,数据量肯定不止如此,通过 ByteBuf 处理数据将变的复杂困难,因此下面介绍如何使用 POJO(普通 Java 对象) 代替 ByteBuf

  • 首先定义新类型 UnixTime
public class UnixTime {
    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}
  • 修改 TimeDecoder 类,返回一个 UnixTime 以代替 ByteBuf
protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf, List<Object> list) throws Exception {
    if (byteBuf.readableBytes() < 4) {
        return;
    }
    list.add(new UnixTime(byteBuf.readInt()));
}
  • 修改 TimeClientHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime unixTime = (UnixTime) msg;
    ctx.close();
}
  • 修改 TimeServerHandler
@Override
public void channelActive(final ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}
  • 最后还需要实现一个编码器,通过实现 ChannelOutboundHandler 来将 UnixTime 对象转换为 ByteBuf,这里有两个点要注意:
    • 当编码后的数据被写到了通道上 Netty 可以通过 ChannelPromise 对象的标记确认成功或失败
    • 不需要调用 cxt.flush(),因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),如果想自己实现 flush() 方法内容可以自行覆盖这个方法
public class TimeEncoder extends ChannelOutboundHandlerAdapter{
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); 
    }
}
  • 你可以使用 MessageToByteEncode
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
  • 最后将 TimeEncoder 加入到 ChannelPipeline,并位于 TimeServerHandler 之前

关闭应用

  • 关闭一个 Netty 应用只需通过 shutdownGracefully() 方法来关闭所有的 EventLoopGroup
  • 当所有的 EventLoopGroup 被完全地终止,并且对应的所有 channel 都已经被关闭时,Netty 会返回一个 Future 对象来通知你

相关文章

  • Netty4(三):快速入门

    案例代码下载 初学者的话推荐直接套用 all-in-one 的 jar 包,若熟悉 Netty 可以根据需求添加不...

  • Netty4入门

    先从整体上看一下Netty的构成。 核心组件 Bootstrap&ServerBootstrap netty程序的...

  • 2020-10-22_Nio单线程模型入门

    20201022_Nio单线程模型入门 1概述 现在稳定推荐使用的主流版本还是Netty4,Netty5 中使用了...

  • C语言快速入门 - Hello World 详解

    目录 C语言快速入门 C语言快速入门 - Hello World 详解 C语言快速入门 - 变量 C语言快速入门 ...

  • C语言快速入门 - 简单运算符

    目录 C语言快速入门 C语言快速入门 - Hello World 详解 C语言快速入门 - 变量 C语言快速入门 ...

  • C语言快速入门 - 控制语句

    目录 C语言快速入门 C语言快速入门 - Hello World 详解 C语言快速入门 - 变量 C语言快速入门 ...

  • C语言快速入门 - 变量

    目录 C语言快速入门 C语言快速入门 - Hello World 详解 C语言快速入门 - 变量 C语言快速入门 ...

  • C语言快速入门

    目录 C语言快速入门 C语言快速入门 - Hello World 详解 C语言快速入门 - 变量 C语言快速入门 ...

  • 阅读笔记80

    三. 起步时最重要的就是接受自己的笨拙 009 不可快速成功但可以快速入门,快速入门不仅绝对有可能,而且绝对必要。...

  • 《分布式_Dubbo》_Dubbo快速入门和配置汇总

    Dubbo 快速入门,记录下 概要: Dubbo 快速入门 Dubbo 常规配置说明 一、Dubbo 快速入门 D...

网友评论

    本文标题:Netty4(三):快速入门

    本文链接:https://www.haomeiwen.com/subject/rpkhqftx.html