Netty入门

作者: Real_man | 来源:发表于2018-12-25 18:52 被阅读37次

    Netty是一个异步的事件驱动网络框架,使用Netty可以研发高性能的私有协议,将业务逻辑和网络进行解耦,通过Netty我们可以实现一些常用的协议,如HTTP。

    基本概念

    Channel

    Channel是NIO的基础,它代表一个连接,通过这个链接可以进行IO操作,例如读和写。

    Future

    在Netty的Channel中的每一个IO操作都是非阻塞的。
    这就意味着每一个操作都是立刻返回结果的。在Java标准库中有Future接口,但是我们使用Future的时候只能询问这个操作是否执行完成,或者阻塞当前的线程直到结果完成,这不是Netty想要的。

    Netty实现了自己的ChannelFuture接口,我们可以传递一个回调到ChannelFuture,当操作完成的时候才会执行回调。

    Events 和 Handlers

    Netty使用的是事件驱动的应用设计,因此Handler处理的数据流,在管道中是链式的事件。事件和Handler可以被 输入 和 输出的数据流进行关联。

    输入(Inbound)事件可以如下:

    • Channel激活和灭活
    • 读操作事件
    • 异常事件
    • 用户事件

    输出(Outbound)事件比较简单,一般是打开和关闭连接,写入和刷新数据。

    Encoder 和 Decoder

    因为我们要处理网络协议,需要操作数据的序列化和反序列化。

    代码

    来个实际的案例:

    1. 新建项目,添加maven依赖
        <properties>
            <netty-all.version>4.1.6.Final</netty-all.version>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>${netty-all.version}</version>
            </dependency>
        </dependencies>
    
    1. 创建数据的pojo
    public class RequestData {
        private int intValue;
        private String stringValue;
    
        // getter 和 setter
        // toString方法
    }
    
    public class ResponseData {
        private int intValue;
        
        // getter 和 setter
        // toString方法    
    }
    
    
    1. 创建Encoder和Decoder
    public class RequestDataEncoder extends MessageToByteEncoder<RequestData> {
    
        private final Charset charset = Charset.forName("UTF-8");
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, RequestData msg, ByteBuf out) throws Exception {
            out.writeInt(msg.getIntValue());
            out.writeInt(msg.getStringValue().length());
            out.writeCharSequence(msg.getStringValue(), charset);
        }
    }
    
    public class ResponseDataDecoder extends ReplayingDecoder<ResponseData> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx,
                              ByteBuf in, List<Object> out) throws Exception {
    
            ResponseData data = new ResponseData();
            data.setIntValue(in.readInt());
            out.add(data);
        }
    }
    
    
    public class RequestDecoder extends ReplayingDecoder<RequestData> {
    
        private final Charset charset = Charset.forName("UTF-8");
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
    
                RequestData data = new RequestData();
                data.setIntValue(in.readInt());
                int strLen = in.readInt();
                data.setStringValue(
                        in.readCharSequence(strLen, charset).toString());
                out.add(data);
        }
    }
    
    
    public class ResponseDataEncoder extends MessageToByteEncoder<ResponseData> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, ResponseData msg, ByteBuf out) throws Exception {
            out.writeInt(msg.getIntValue());
        }
    }
    
    
    1. 创建请求的处理器
    public class ProcessingHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            RequestData requestData = (RequestData) msg;
            ResponseData responseData = new ResponseData();
            responseData.setIntValue(requestData.getIntValue() * 2);
            ChannelFuture future = ctx.writeAndFlush(responseData);
            future.addListener(ChannelFutureListener.CLOSE);
            System.out.println(requestData);
        }
    }
    
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx)
                throws Exception {
    
            RequestData msg = new RequestData();
            msg.setIntValue(123);
            msg.setStringValue(
                    "正常工作");
            ChannelFuture future = ctx.writeAndFlush(msg);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println((ResponseData)msg);
            ctx.close();
        }
    }
    
    1. 创建服务端应用
    public class NettyServer {
        private int port;
        
    
        public NettyServer(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) throws Exception {
    
            int port = args.length > 0
                    ? Integer.parseInt(args[0]) : 9003;
    
            new NettyServer(port).run();
        }
    
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(new RequestDecoder(),
                                        new ResponseDataEncoder(),
                                        new ProcessingHandler());
                            }
                        }).option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture f = b.bind(port).sync();
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    
    
    1. 创建客户端应用
    public class NettyClient {
        public static void main(String[] args) throws Exception {
    
            String host = "127.0.0.1";
            int port = 9003;
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(new RequestDataEncoder(),
                                        new ResponseDataDecoder(), new ClientHandler());
                            }
                        });
    
                ChannelFuture f = b.connect(host, port).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    1. 运行服务端和客户端
    image image

    可见正常工作

    最后

    这里我们只是对Netty进行简单的介绍,介绍了它一些基本的概念,然后演示了一个例子。后续我们会对Netty进行更深入的研究

    参考

    相关文章

      网友评论

        本文标题:Netty入门

        本文链接:https://www.haomeiwen.com/subject/gkhqlqtx.html